1 /*****************************************************************************
2
3 Copyright (c) 1997, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2013, 2022, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file log/log0recv.cc
22 Recovery
23
24 Created 9/20/1997 Heikki Tuuri
25 *******************************************************/
26
27 #include "univ.i"
28
29 #include <map>
30 #include <string>
31 #include <my_service_manager.h>
32
33 #include "log0recv.h"
34
35 #ifdef HAVE_MY_AES_H
36 #include <my_aes.h>
37 #endif
38
39 #include "log0crypt.h"
40 #include "mem0mem.h"
41 #include "buf0buf.h"
42 #include "buf0dblwr.h"
43 #include "buf0flu.h"
44 #include "mtr0mtr.h"
45 #include "mtr0log.h"
46 #include "page0page.h"
47 #include "page0cur.h"
48 #include "trx0undo.h"
49 #include "ibuf0ibuf.h"
50 #include "trx0undo.h"
51 #include "trx0rec.h"
52 #include "fil0fil.h"
53 #include "buf0rea.h"
54 #include "srv0srv.h"
55 #include "srv0start.h"
56 #include "fil0pagecompress.h"
57
58 /** The recovery system */
59 recv_sys_t recv_sys;
60 /** TRUE when recv_init_crash_recovery() has been called. */
61 bool recv_needed_recovery;
62 #ifdef UNIV_DEBUG
63 /** TRUE if writing to the redo log (mtr_commit) is forbidden.
64 Protected by log_sys.mutex. */
65 bool recv_no_log_write = false;
66 #endif /* UNIV_DEBUG */
67
68 /** TRUE if buf_page_is_corrupted() should check if the log sequence
69 number (FIL_PAGE_LSN) is in the future. Initially FALSE, and set by
70 recv_recovery_from_checkpoint_start(). */
71 bool recv_lsn_checks_on;
72
73 /** If the following is TRUE, the buffer pool file pages must be invalidated
74 after recovery and no ibuf operations are allowed; this becomes TRUE if
75 the log record hash table becomes too full, and log records must be merged
76 to file pages already before the recovery is finished: in this case no
77 ibuf operations are allowed, as they could modify the pages read in the
78 buffer pool before the pages have been recovered to the up-to-date state.
79
80 true means that recovery is running and no operations on the log file
81 are allowed yet: the variable name is misleading. */
82 bool recv_no_ibuf_operations;
83
84 /** The maximum lsn we see for a page during the recovery process. If this
85 is bigger than the lsn we are able to scan up to, that is an indication that
86 the recovery failed and the database may be corrupt. */
87 static lsn_t recv_max_page_lsn;
88
89 /** Stored physical log record with logical LSN (@see log_t::FORMAT_10_5) */
90 struct log_phys_t : public log_rec_t
91 {
92 /** start LSN of the mini-transaction (not necessarily of this record) */
93 const lsn_t start_lsn;
94 private:
95 /** @return the start of length and data */
startlog_phys_t96 const byte *start() const
97 {
98 return my_assume_aligned<sizeof(size_t)>
99 (reinterpret_cast<const byte*>(&start_lsn + 1));
100 }
101 /** @return the start of length and data */
startlog_phys_t102 byte *start()
103 { return const_cast<byte*>(const_cast<const log_phys_t*>(this)->start()); }
104 /** @return the length of the following record */
lenlog_phys_t105 uint16_t len() const { uint16_t i; memcpy(&i, start(), 2); return i; }
106
107 /** @return start of the log records */
beginlog_phys_t108 byte *begin() { return start() + 2; }
109 /** @return end of the log records */
endlog_phys_t110 byte *end() { byte *e= begin() + len(); ut_ad(!*e); return e; }
111 public:
112 /** @return start of the log records */
beginlog_phys_t113 const byte *begin() const { return const_cast<log_phys_t*>(this)->begin(); }
114 /** @return end of the log records */
endlog_phys_t115 const byte *end() const { return const_cast<log_phys_t*>(this)->end(); }
116
117 /** Determine the allocated size of the object.
118 @param len length of recs, excluding terminating NUL byte
119 @return the total allocation size */
120 static inline size_t alloc_size(size_t len);
121
122 /** Constructor.
123 @param start_lsn start LSN of the mini-transaction
124 @param lsn mtr_t::commit_lsn() of the mini-transaction
125 @param recs the first log record for the page in the mini-transaction
126 @param size length of recs, in bytes, excluding terminating NUL byte */
log_phys_tlog_phys_t127 log_phys_t(lsn_t start_lsn, lsn_t lsn, const byte *recs, size_t size) :
128 log_rec_t(lsn), start_lsn(start_lsn)
129 {
130 ut_ad(start_lsn);
131 ut_ad(start_lsn < lsn);
132 const uint16_t len= static_cast<uint16_t>(size);
133 ut_ad(len == size);
134 memcpy(start(), &len, 2);
135 reinterpret_cast<byte*>(memcpy(begin(), recs, size))[size]= 0;
136 }
137
138 /** Append a record to the log.
139 @param recs log to append
140 @param size size of the log, in bytes */
appendlog_phys_t141 void append(const byte *recs, size_t size)
142 {
143 ut_ad(start_lsn < lsn);
144 uint16_t l= len();
145 reinterpret_cast<byte*>(memcpy(end(), recs, size))[size]= 0;
146 l= static_cast<uint16_t>(l + size);
147 memcpy(start(), &l, 2);
148 }
149
150 /** Apply an UNDO_APPEND record.
151 @see mtr_t::undo_append()
152 @param block undo log page
153 @param data undo log record
154 @param len length of the undo log record
155 @return whether the operation failed (inconcistency was noticed) */
undo_appendlog_phys_t156 static bool undo_append(const buf_block_t &block, const byte *data,
157 size_t len)
158 {
159 ut_ad(len > 2);
160 byte *free_p= my_assume_aligned<2>
161 (TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE + block.frame);
162 const uint16_t free= mach_read_from_2(free_p);
163 if (UNIV_UNLIKELY(free < TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE ||
164 free + len + 6 >= srv_page_size - FIL_PAGE_DATA_END))
165 {
166 ib::error() << "Not applying UNDO_APPEND due to corruption on "
167 << block.page.id();
168 return true;
169 }
170
171 byte *p= block.frame + free;
172 mach_write_to_2(free_p, free + 4 + len);
173 memcpy(p, free_p, 2);
174 p+= 2;
175 memcpy(p, data, len);
176 p+= len;
177 mach_write_to_2(p, free);
178 return false;
179 }
180
181 /** The status of apply() */
182 enum apply_status {
183 /** The page was not affected */
184 APPLIED_NO= 0,
185 /** The page was modified */
186 APPLIED_YES,
187 /** The page was modified, affecting the encryption parameters */
188 APPLIED_TO_ENCRYPTION,
189 /** The page was modified, affecting the tablespace header */
190 APPLIED_TO_FSP_HEADER
191 };
192
193 /** Apply log to a page frame.
194 @param[in,out] block buffer block
195 @param[in,out] last_offset last byte offset, for same_page records
196 @return whether any log was applied to the page */
applylog_phys_t197 apply_status apply(const buf_block_t &block, uint16_t &last_offset) const
198 {
199 const byte * const recs= begin();
200 byte *const frame= block.page.zip.ssize
201 ? block.page.zip.data : block.frame;
202 const size_t size= block.physical_size();
203 apply_status applied= APPLIED_NO;
204
205 for (const byte *l= recs;;)
206 {
207 const byte b= *l++;
208 if (!b)
209 return applied;
210 ut_ad((b & 0x70) != RESERVED);
211 size_t rlen= b & 0xf;
212 if (!rlen)
213 {
214 const size_t lenlen= mlog_decode_varint_length(*l);
215 const uint32_t addlen= mlog_decode_varint(l);
216 ut_ad(addlen != MLOG_DECODE_ERROR);
217 rlen= addlen + 15 - lenlen;
218 l+= lenlen;
219 }
220 if (!(b & 0x80))
221 {
222 /* Skip the page identifier. It has already been validated. */
223 size_t idlen= mlog_decode_varint_length(*l);
224 ut_ad(idlen <= 5);
225 ut_ad(idlen < rlen);
226 ut_ad(mlog_decode_varint(l) == block.page.id().space());
227 l+= idlen;
228 rlen-= idlen;
229 idlen= mlog_decode_varint_length(*l);
230 ut_ad(idlen <= 5);
231 ut_ad(idlen <= rlen);
232 ut_ad(mlog_decode_varint(l) == block.page.id().page_no());
233 l+= idlen;
234 rlen-= idlen;
235 last_offset= 0;
236 }
237
238 switch (b & 0x70) {
239 case FREE_PAGE:
240 ut_ad(last_offset == 0);
241 goto next_not_same_page;
242 case INIT_PAGE:
243 if (UNIV_LIKELY(rlen == 0))
244 {
245 memset_aligned<UNIV_ZIP_SIZE_MIN>(frame, 0, size);
246 mach_write_to_4(frame + FIL_PAGE_OFFSET, block.page.id().page_no());
247 memset_aligned<8>(FIL_PAGE_PREV + frame, 0xff, 8);
248 mach_write_to_4(frame + FIL_PAGE_SPACE_ID, block.page.id().space());
249 last_offset= FIL_PAGE_TYPE;
250 next_after_applying:
251 if (applied == APPLIED_NO)
252 applied= APPLIED_YES;
253 }
254 else
255 {
256 record_corrupted:
257 if (!srv_force_recovery)
258 {
259 recv_sys.found_corrupt_log= true;
260 return applied;
261 }
262 next_not_same_page:
263 last_offset= 1; /* the next record must not be same_page */
264 }
265 next:
266 l+= rlen;
267 continue;
268 }
269
270 ut_ad(mach_read_from_4(frame + FIL_PAGE_OFFSET) ==
271 block.page.id().page_no());
272 ut_ad(mach_read_from_4(frame + FIL_PAGE_SPACE_ID) ==
273 block.page.id().space());
274 ut_ad(last_offset <= 1 || last_offset > 8);
275 ut_ad(last_offset <= size);
276
277 switch (b & 0x70) {
278 case OPTION:
279 goto next;
280 case EXTENDED:
281 if (UNIV_UNLIKELY(block.page.id().page_no() < 3 ||
282 block.page.zip.ssize))
283 goto record_corrupted;
284 static_assert(INIT_ROW_FORMAT_REDUNDANT == 0, "compatiblity");
285 static_assert(INIT_ROW_FORMAT_DYNAMIC == 1, "compatibility");
286 if (UNIV_UNLIKELY(!rlen))
287 goto record_corrupted;
288 switch (const byte subtype= *l) {
289 uint8_t ll;
290 size_t prev_rec, hdr_size;
291 default:
292 goto record_corrupted;
293 case INIT_ROW_FORMAT_REDUNDANT:
294 case INIT_ROW_FORMAT_DYNAMIC:
295 if (UNIV_UNLIKELY(rlen != 1))
296 goto record_corrupted;
297 page_create_low(&block, *l != INIT_ROW_FORMAT_REDUNDANT);
298 break;
299 case UNDO_INIT:
300 if (UNIV_UNLIKELY(rlen != 1))
301 goto record_corrupted;
302 trx_undo_page_init(block);
303 break;
304 case UNDO_APPEND:
305 if (UNIV_UNLIKELY(rlen <= 3))
306 goto record_corrupted;
307 if (undo_append(block, ++l, --rlen) && !srv_force_recovery)
308 {
309 page_corrupted:
310 ib::error() << "Set innodb_force_recovery=1 to ignore corruption.";
311 recv_sys.found_corrupt_log= true;
312 return applied;
313 }
314 break;
315 case INSERT_HEAP_REDUNDANT:
316 case INSERT_REUSE_REDUNDANT:
317 case INSERT_HEAP_DYNAMIC:
318 case INSERT_REUSE_DYNAMIC:
319 if (UNIV_UNLIKELY(rlen < 2))
320 goto record_corrupted;
321 rlen--;
322 ll= mlog_decode_varint_length(*++l);
323 if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
324 goto record_corrupted;
325 prev_rec= mlog_decode_varint(l);
326 ut_ad(prev_rec != MLOG_DECODE_ERROR);
327 rlen-= ll;
328 l+= ll;
329 ll= mlog_decode_varint_length(*l);
330 static_assert(INSERT_HEAP_REDUNDANT == 4, "compatibility");
331 static_assert(INSERT_REUSE_REDUNDANT == 5, "compatibility");
332 static_assert(INSERT_HEAP_DYNAMIC == 6, "compatibility");
333 static_assert(INSERT_REUSE_DYNAMIC == 7, "compatibility");
334 if (subtype & 2)
335 {
336 size_t shift= 0;
337 if (subtype & 1)
338 {
339 if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
340 goto record_corrupted;
341 shift= mlog_decode_varint(l);
342 ut_ad(shift != MLOG_DECODE_ERROR);
343 rlen-= ll;
344 l+= ll;
345 ll= mlog_decode_varint_length(*l);
346 }
347 if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
348 goto record_corrupted;
349 size_t enc_hdr_l= mlog_decode_varint(l);
350 ut_ad(enc_hdr_l != MLOG_DECODE_ERROR);
351 rlen-= ll;
352 l+= ll;
353 ll= mlog_decode_varint_length(*l);
354 if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
355 goto record_corrupted;
356 size_t hdr_c= mlog_decode_varint(l);
357 ut_ad(hdr_c != MLOG_DECODE_ERROR);
358 rlen-= ll;
359 l+= ll;
360 ll= mlog_decode_varint_length(*l);
361 if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
362 goto record_corrupted;
363 size_t data_c= mlog_decode_varint(l);
364 ut_ad(data_c != MLOG_DECODE_ERROR);
365 rlen-= ll;
366 l+= ll;
367 if (page_apply_insert_dynamic(block, subtype & 1, prev_rec,
368 shift, enc_hdr_l, hdr_c, data_c,
369 l, rlen) && !srv_force_recovery)
370 goto page_corrupted;
371 }
372 else
373 {
374 if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
375 goto record_corrupted;
376 size_t header= mlog_decode_varint(l);
377 ut_ad(header != MLOG_DECODE_ERROR);
378 rlen-= ll;
379 l+= ll;
380 ll= mlog_decode_varint_length(*l);
381 if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
382 goto record_corrupted;
383 size_t hdr_c= mlog_decode_varint(l);
384 ut_ad(hdr_c != MLOG_DECODE_ERROR);
385 rlen-= ll;
386 l+= ll;
387 ll= mlog_decode_varint_length(*l);
388 if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
389 goto record_corrupted;
390 size_t data_c= mlog_decode_varint(l);
391 rlen-= ll;
392 l+= ll;
393 if (page_apply_insert_redundant(block, subtype & 1, prev_rec,
394 header, hdr_c, data_c,
395 l, rlen) && !srv_force_recovery)
396 goto page_corrupted;
397 }
398 break;
399 case DELETE_ROW_FORMAT_REDUNDANT:
400 if (UNIV_UNLIKELY(rlen < 2 || rlen > 4))
401 goto record_corrupted;
402 rlen--;
403 ll= mlog_decode_varint_length(*++l);
404 if (UNIV_UNLIKELY(ll != rlen))
405 goto record_corrupted;
406 if (page_apply_delete_redundant(block, mlog_decode_varint(l)) &&
407 !srv_force_recovery)
408 goto page_corrupted;
409 break;
410 case DELETE_ROW_FORMAT_DYNAMIC:
411 if (UNIV_UNLIKELY(rlen < 2))
412 goto record_corrupted;
413 rlen--;
414 ll= mlog_decode_varint_length(*++l);
415 if (UNIV_UNLIKELY(ll > 3 || ll >= rlen))
416 goto record_corrupted;
417 prev_rec= mlog_decode_varint(l);
418 ut_ad(prev_rec != MLOG_DECODE_ERROR);
419 rlen-= ll;
420 l+= ll;
421 ll= mlog_decode_varint_length(*l);
422 if (UNIV_UNLIKELY(ll > 2 || ll >= rlen))
423 goto record_corrupted;
424 hdr_size= mlog_decode_varint(l);
425 ut_ad(hdr_size != MLOG_DECODE_ERROR);
426 rlen-= ll;
427 l+= ll;
428 ll= mlog_decode_varint_length(*l);
429 if (UNIV_UNLIKELY(ll > 3 || ll != rlen))
430 goto record_corrupted;
431 if (page_apply_delete_dynamic(block, prev_rec, hdr_size,
432 mlog_decode_varint(l)) &&
433 !srv_force_recovery)
434 goto page_corrupted;
435 break;
436 }
437 last_offset= FIL_PAGE_TYPE;
438 goto next_after_applying;
439 case WRITE:
440 case MEMSET:
441 case MEMMOVE:
442 if (UNIV_UNLIKELY(last_offset == 1))
443 goto record_corrupted;
444 const size_t olen= mlog_decode_varint_length(*l);
445 if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3))
446 goto record_corrupted;
447 const uint32_t offset= mlog_decode_varint(l);
448 ut_ad(offset != MLOG_DECODE_ERROR);
449 static_assert(FIL_PAGE_OFFSET == 4, "compatibility");
450 if (UNIV_UNLIKELY(offset >= size))
451 goto record_corrupted;
452 if (UNIV_UNLIKELY(offset + last_offset < 8 ||
453 offset + last_offset >= size))
454 goto record_corrupted;
455 last_offset= static_cast<uint16_t>(last_offset + offset);
456 l+= olen;
457 rlen-= olen;
458 size_t llen= rlen;
459 if ((b & 0x70) == WRITE)
460 {
461 if (UNIV_UNLIKELY(rlen + last_offset > size))
462 goto record_corrupted;
463 memcpy(frame + last_offset, l, llen);
464 if (UNIV_LIKELY(block.page.id().page_no()));
465 else if (llen == 11 + MY_AES_BLOCK_SIZE &&
466 last_offset == FSP_HEADER_OFFSET + MAGIC_SZ +
467 fsp_header_get_encryption_offset(block.zip_size()))
468 applied= APPLIED_TO_ENCRYPTION;
469 else if (last_offset < FSP_HEADER_OFFSET + FSP_FREE + FLST_LEN + 4 &&
470 last_offset + llen >= FSP_HEADER_OFFSET + FSP_SIZE)
471 applied= APPLIED_TO_FSP_HEADER;
472 next_after_applying_write:
473 ut_ad(llen + last_offset <= size);
474 last_offset= static_cast<uint16_t>(last_offset + llen);
475 goto next_after_applying;
476 }
477 llen= mlog_decode_varint_length(*l);
478 if (UNIV_UNLIKELY(llen > rlen || llen > 3))
479 goto record_corrupted;
480 const uint32_t len= mlog_decode_varint(l);
481 ut_ad(len != MLOG_DECODE_ERROR);
482 if (UNIV_UNLIKELY(len + last_offset > size))
483 goto record_corrupted;
484 l+= llen;
485 rlen-= llen;
486 llen= len;
487 if ((b & 0x70) == MEMSET)
488 {
489 ut_ad(rlen <= llen);
490 if (UNIV_UNLIKELY(rlen != 1))
491 {
492 size_t s;
493 for (s= 0; s < llen; s+= rlen)
494 memcpy(frame + last_offset + s, l, rlen);
495 memcpy(frame + last_offset + s, l, llen - s);
496 }
497 else
498 memset(frame + last_offset, *l, llen);
499 goto next_after_applying_write;
500 }
501 const size_t slen= mlog_decode_varint_length(*l);
502 if (UNIV_UNLIKELY(slen != rlen || slen > 3))
503 goto record_corrupted;
504 uint32_t s= mlog_decode_varint(l);
505 ut_ad(slen != MLOG_DECODE_ERROR);
506 if (s & 1)
507 s= last_offset - (s >> 1) - 1;
508 else
509 s= last_offset + (s >> 1) + 1;
510 if (UNIV_LIKELY(s >= 8 && s + llen <= size))
511 {
512 memmove(frame + last_offset, frame + s, llen);
513 goto next_after_applying_write;
514 }
515 }
516 goto record_corrupted;
517 }
518 }
519 };
520
521
alloc_size(size_t len)522 inline size_t log_phys_t::alloc_size(size_t len)
523 {
524 return len + (1 + 2 + sizeof(log_phys_t));
525 }
526
527
528 /** Tablespace item during recovery */
529 struct file_name_t {
530 /** Tablespace file name (FILE_MODIFY) */
531 std::string name;
532 /** Tablespace object (NULL if not valid or not found) */
533 fil_space_t* space = nullptr;
534
535 /** Tablespace status. */
536 enum fil_status {
537 /** Normal tablespace */
538 NORMAL,
539 /** Deleted tablespace */
540 DELETED,
541 /** Missing tablespace */
542 MISSING
543 };
544
545 /** Status of the tablespace */
546 fil_status status;
547
548 /** FSP_SIZE of tablespace */
549 uint32_t size = 0;
550
551 /** Freed pages of tablespace */
552 range_set freed_ranges;
553
554 /** Dummy flags before they have been read from the .ibd file */
555 static constexpr uint32_t initial_flags = FSP_FLAGS_FCRC32_MASK_MARKER;
556 /** FSP_SPACE_FLAGS of tablespace */
557 uint32_t flags = initial_flags;
558
559 /** Constructor */
file_name_tfile_name_t560 file_name_t(std::string name_, bool deleted)
561 : name(std::move(name_)), status(deleted ? DELETED: NORMAL) {}
562
563 /** Add the freed pages */
add_freed_pagefile_name_t564 void add_freed_page(uint32_t page_no) { freed_ranges.add_value(page_no); }
565
566 /** Remove the freed pages */
remove_freed_pagefile_name_t567 void remove_freed_page(uint32_t page_no)
568 {
569 if (freed_ranges.empty()) return;
570 freed_ranges.remove_value(page_no);
571 }
572 };
573
574 /** Map of dirty tablespaces during recovery */
575 typedef std::map<
576 ulint,
577 file_name_t,
578 std::less<ulint>,
579 ut_allocator<std::pair<const ulint, file_name_t> > > recv_spaces_t;
580
581 static recv_spaces_t recv_spaces;
582
583 /** The last parsed FILE_RENAME records */
584 static std::map<uint32_t,std::string> renamed_spaces;
585
586 /** Report an operation to create, delete, or rename a file during backup.
587 @param[in] space_id tablespace identifier
588 @param[in] create whether the file is being created
589 @param[in] name file name (not NUL-terminated)
590 @param[in] len length of name, in bytes
591 @param[in] new_name new file name (NULL if not rename)
592 @param[in] new_len length of new_name, in bytes (0 if NULL) */
593 void (*log_file_op)(ulint space_id, bool create,
594 const byte* name, ulint len,
595 const byte* new_name, ulint new_len);
596
597 /** Information about initializing page contents during redo log processing.
598 FIXME: Rely on recv_sys.pages! */
599 class mlog_init_t
600 {
601 public:
602 /** A page initialization operation that was parsed from
603 the redo log */
604 struct init {
605 /** log sequence number of the page initialization */
606 lsn_t lsn;
607 /** Whether btr_page_create() avoided a read of the page.
608
609 At the end of the last recovery batch, mark_ibuf_exist()
610 will mark pages for which this flag is set. */
611 bool created;
612 };
613
614 private:
615 typedef std::map<const page_id_t, init,
616 std::less<const page_id_t>,
617 ut_allocator<std::pair<const page_id_t, init> > >
618 map;
619 /** Map of page initialization operations.
620 FIXME: Merge this to recv_sys.pages! */
621 map inits;
622 public:
623 /** Record that a page will be initialized by the redo log.
624 @param[in] page_id page identifier
625 @param[in] lsn log sequence number
626 @return whether the state was changed */
add(const page_id_t page_id,lsn_t lsn)627 bool add(const page_id_t page_id, lsn_t lsn)
628 {
629 ut_ad(mutex_own(&recv_sys.mutex));
630 const init init = { lsn, false };
631 std::pair<map::iterator, bool> p = inits.insert(
632 map::value_type(page_id, init));
633 ut_ad(!p.first->second.created);
634 if (p.second) return true;
635 if (p.first->second.lsn >= init.lsn) return false;
636 p.first->second = init;
637 return true;
638 }
639
640 /** Get the last stored lsn of the page id and its respective
641 init/load operation.
642 @param[in] page_id page id
643 @param[in,out] init initialize log or load log
644 @return the latest page initialization;
645 not valid after releasing recv_sys.mutex. */
last(page_id_t page_id)646 init& last(page_id_t page_id)
647 {
648 ut_ad(mutex_own(&recv_sys.mutex));
649 return inits.find(page_id)->second;
650 }
651
652 /** Determine if a page will be initialized or freed after a time.
653 @param page_id page identifier
654 @param lsn log sequence number
655 @return whether page_id will be freed or initialized after lsn */
will_avoid_read(page_id_t page_id,lsn_t lsn) const656 bool will_avoid_read(page_id_t page_id, lsn_t lsn) const
657 {
658 ut_ad(mutex_own(&recv_sys.mutex));
659 auto i= inits.find(page_id);
660 return i != inits.end() && i->second.lsn > lsn;
661 }
662
663 /** At the end of each recovery batch, reset the 'created' flags. */
reset()664 void reset()
665 {
666 ut_ad(mutex_own(&recv_sys.mutex));
667 ut_ad(recv_no_ibuf_operations);
668 for (map::value_type& i : inits) {
669 i.second.created = false;
670 }
671 }
672
673 /** On the last recovery batch, mark whether there exist
674 buffered changes for the pages that were initialized
675 by buf_page_create() and still reside in the buffer pool.
676 @param[in,out] mtr dummy mini-transaction */
mark_ibuf_exist(mtr_t & mtr)677 void mark_ibuf_exist(mtr_t& mtr)
678 {
679 ut_ad(mutex_own(&recv_sys.mutex));
680 mtr.start();
681
682 for (const map::value_type& i : inits) {
683 if (!i.second.created) {
684 continue;
685 }
686 if (buf_block_t* block = buf_page_get_low(
687 i.first, 0, RW_X_LATCH, nullptr,
688 BUF_GET_IF_IN_POOL, __FILE__, __LINE__,
689 &mtr, nullptr, false)) {
690 if (UNIV_LIKELY_NULL(block->page.zip.data)) {
691 switch (fil_page_get_type(
692 block->page.zip.data)) {
693 case FIL_PAGE_INDEX:
694 case FIL_PAGE_RTREE:
695 if (page_zip_decompress(
696 &block->page.zip,
697 block->frame,
698 true)) {
699 break;
700 }
701 ib::error() << "corrupted "
702 << block->page.id();
703 }
704 }
705 if (recv_no_ibuf_operations) {
706 mtr.commit();
707 mtr.start();
708 continue;
709 }
710 mutex_exit(&recv_sys.mutex);
711 block->page.ibuf_exist = ibuf_page_exists(
712 block->page.id(), block->zip_size());
713 mtr.commit();
714 mtr.start();
715 mutex_enter(&recv_sys.mutex);
716 }
717 }
718
719 mtr.commit();
720 }
721
722 /** Clear the data structure */
clear()723 void clear() { inits.clear(); }
724 };
725
726 static mlog_init_t mlog_init;
727
728 /** Process a record that indicates that a tablespace is
729 being shrunk in size.
730 @param page_id first page identifier that is not in the file
731 @param lsn log sequence number of the shrink operation */
trim(const page_id_t page_id,lsn_t lsn)732 inline void recv_sys_t::trim(const page_id_t page_id, lsn_t lsn)
733 {
734 DBUG_ENTER("recv_sys_t::trim");
735 DBUG_LOG("ib_log",
736 "discarding log beyond end of tablespace "
737 << page_id << " before LSN " << lsn);
738 ut_ad(mutex_own(&mutex));
739 for (recv_sys_t::map::iterator p = pages.lower_bound(page_id);
740 p != pages.end() && p->first.space() == page_id.space();) {
741 recv_sys_t::map::iterator r = p++;
742 if (r->second.trim(lsn)) {
743 pages.erase(r);
744 }
745 }
746 if (fil_space_t* space = fil_space_get(page_id.space())) {
747 ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
748 fil_node_t* file = UT_LIST_GET_FIRST(space->chain);
749 ut_ad(file->is_open());
750 os_file_truncate(file->name, file->handle,
751 os_offset_t{page_id.page_no()}
752 << srv_page_size_shift, true);
753 }
754 DBUG_VOID_RETURN;
755 }
756
open_log_files_if_needed()757 void recv_sys_t::open_log_files_if_needed()
758 {
759 if (!recv_sys.files.empty())
760 return;
761
762 for (auto &&path : get_existing_log_files_paths())
763 {
764 recv_sys.files.emplace_back(std::move(path));
765 ut_a(recv_sys.files.back().open(true) == DB_SUCCESS);
766 }
767 }
768
read(os_offset_t total_offset,span<byte> buf)769 void recv_sys_t::read(os_offset_t total_offset, span<byte> buf)
770 {
771 open_log_files_if_needed();
772
773 size_t file_idx= static_cast<size_t>(total_offset / log_sys.log.file_size);
774 os_offset_t offset= total_offset % log_sys.log.file_size;
775 dberr_t err= recv_sys.files[file_idx].read(offset, buf);
776 ut_a(err == DB_SUCCESS);
777 }
778
files_size()779 inline size_t recv_sys_t::files_size()
780 {
781 open_log_files_if_needed();
782 return files.size();
783 }
784
785 /** Process a file name from a FILE_* record.
786 @param[in,out] name file name
787 @param[in] len length of the file name
788 @param[in] space_id the tablespace ID
789 @param[in] deleted whether this is a FILE_DELETE record */
790 static
791 void
fil_name_process(char * name,ulint len,ulint space_id,bool deleted)792 fil_name_process(char* name, ulint len, ulint space_id, bool deleted)
793 {
794 if (srv_operation == SRV_OPERATION_BACKUP) {
795 return;
796 }
797
798 ut_ad(srv_operation == SRV_OPERATION_NORMAL
799 || srv_operation == SRV_OPERATION_RESTORE
800 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
801
802 /* We will also insert space=NULL into the map, so that
803 further checks can ensure that a FILE_MODIFY record was
804 scanned before applying any page records for the space_id. */
805
806 os_normalize_path(name);
807 const file_name_t fname(std::string(name, len), deleted);
808 std::pair<recv_spaces_t::iterator,bool> p = recv_spaces.emplace(
809 space_id, fname);
810 ut_ad(p.first->first == space_id);
811
812 file_name_t& f = p.first->second;
813
814 if (deleted) {
815 /* Got FILE_DELETE */
816
817 if (!p.second && f.status != file_name_t::DELETED) {
818 f.status = file_name_t::DELETED;
819 if (f.space != NULL) {
820 fil_space_free(space_id, false);
821 f.space = NULL;
822 }
823 }
824
825 ut_ad(f.space == NULL);
826 } else if (p.second // the first FILE_MODIFY or FILE_RENAME
827 || f.name != fname.name) {
828 fil_space_t* space;
829
830 /* Check if the tablespace file exists and contains
831 the space_id. If not, ignore the file after displaying
832 a note. Abort if there are multiple files with the
833 same space_id. */
834 switch (fil_ibd_load(space_id, name, space)) {
835 case FIL_LOAD_OK:
836 ut_ad(space != NULL);
837
838 if (!f.space) {
839 if (f.size
840 || f.flags != f.initial_flags) {
841 fil_space_set_recv_size_and_flags(
842 space->id, f.size, f.flags);
843 }
844
845 f.space = space;
846 goto same_space;
847 } else if (f.space == space) {
848 same_space:
849 f.name = fname.name;
850 f.status = file_name_t::NORMAL;
851 } else {
852 ib::error() << "Tablespace " << space_id
853 << " has been found in two places: '"
854 << f.name << "' and '" << name << "'."
855 " You must delete one of them.";
856 recv_sys.found_corrupt_fs = true;
857 }
858 break;
859
860 case FIL_LOAD_ID_CHANGED:
861 ut_ad(space == NULL);
862 break;
863
864 case FIL_LOAD_NOT_FOUND:
865 /* No matching tablespace was found; maybe it
866 was renamed, and we will find a subsequent
867 FILE_* record. */
868 ut_ad(space == NULL);
869
870 if (srv_force_recovery) {
871 /* Without innodb_force_recovery,
872 missing tablespaces will only be
873 reported in
874 recv_init_crash_recovery_spaces().
875 Enable some more diagnostics when
876 forcing recovery. */
877
878 ib::info()
879 << "At LSN: " << recv_sys.recovered_lsn
880 << ": unable to open file " << name
881 << " for tablespace " << space_id;
882 }
883 break;
884
885 case FIL_LOAD_INVALID:
886 ut_ad(space == NULL);
887 if (srv_force_recovery == 0) {
888 ib::warn() << "We do not continue the crash"
889 " recovery, because the table may"
890 " become corrupt if we cannot apply"
891 " the log records in the InnoDB log to"
892 " it. To fix the problem and start"
893 " mysqld:";
894 ib::info() << "1) If there is a permission"
895 " problem in the file and mysqld"
896 " cannot open the file, you should"
897 " modify the permissions.";
898 ib::info() << "2) If the tablespace is not"
899 " needed, or you can restore an older"
900 " version from a backup, then you can"
901 " remove the .ibd file, and use"
902 " --innodb_force_recovery=1 to force"
903 " startup without this file.";
904 ib::info() << "3) If the file system or the"
905 " disk is broken, and you cannot"
906 " remove the .ibd file, you can set"
907 " --innodb_force_recovery.";
908 recv_sys.found_corrupt_fs = true;
909 break;
910 }
911
912 ib::info() << "innodb_force_recovery was set to "
913 << srv_force_recovery << ". Continuing crash"
914 " recovery even though we cannot access the"
915 " files for tablespace " << space_id << ".";
916 break;
917 }
918 }
919 }
920
921 /** Clean up after recv_sys_t::create() */
close()922 void recv_sys_t::close()
923 {
924 ut_ad(this == &recv_sys);
925
926 if (is_initialised())
927 {
928 dblwr.pages.clear();
929 ut_d(mutex_enter(&mutex));
930 clear();
931 ut_d(mutex_exit(&mutex));
932
933 if (buf)
934 {
935 ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
936 buf= nullptr;
937 }
938
939 last_stored_lsn= 0;
940 mutex_free(&mutex);
941 }
942
943 recv_spaces.clear();
944 renamed_spaces.clear();
945 mlog_init.clear();
946
947 close_files();
948 }
949
950 /** Initialize the redo log recovery subsystem. */
create()951 void recv_sys_t::create()
952 {
953 ut_ad(this == &recv_sys);
954 ut_ad(!is_initialised());
955 mutex_create(LATCH_ID_RECV_SYS, &mutex);
956
957 apply_log_recs = false;
958 apply_batch_on = false;
959
960 buf = static_cast<byte*>(ut_malloc_dontdump(RECV_PARSING_BUF_SIZE,
961 PSI_INSTRUMENT_ME));
962 len = 0;
963 parse_start_lsn = 0;
964 scanned_lsn = 0;
965 scanned_checkpoint_no = 0;
966 recovered_offset = 0;
967 recovered_lsn = 0;
968 found_corrupt_log = false;
969 found_corrupt_fs = false;
970 mlog_checkpoint_lsn = 0;
971
972 progress_time = time(NULL);
973 recv_max_page_lsn = 0;
974
975 memset(truncated_undo_spaces, 0, sizeof truncated_undo_spaces);
976 last_stored_lsn = 1;
977 UT_LIST_INIT(blocks, &buf_block_t::unzip_LRU);
978 }
979
980 /** Clear a fully processed set of stored redo log records. */
clear()981 inline void recv_sys_t::clear()
982 {
983 ut_ad(mutex_own(&mutex));
984 apply_log_recs= false;
985 apply_batch_on= false;
986 ut_ad(!after_apply || !UT_LIST_GET_LAST(blocks));
987 pages.clear();
988
989 for (buf_block_t *block= UT_LIST_GET_LAST(blocks); block; )
990 {
991 buf_block_t *prev_block= UT_LIST_GET_PREV(unzip_LRU, block);
992 ut_ad(block->page.state() == BUF_BLOCK_MEMORY);
993 UT_LIST_REMOVE(blocks, block);
994 MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size);
995 buf_block_free(block);
996 block= prev_block;
997 }
998 }
999
1000 /** Free most recovery data structures. */
debug_free()1001 void recv_sys_t::debug_free()
1002 {
1003 ut_ad(this == &recv_sys);
1004 ut_ad(is_initialised());
1005 mutex_enter(&mutex);
1006
1007 recovery_on= false;
1008 pages.clear();
1009 ut_free_dodump(buf, RECV_PARSING_BUF_SIZE);
1010
1011 buf= nullptr;
1012
1013 mutex_exit(&mutex);
1014 }
1015
alloc(size_t len)1016 inline void *recv_sys_t::alloc(size_t len)
1017 {
1018 ut_ad(mutex_own(&mutex));
1019 ut_ad(len);
1020 ut_ad(len <= srv_page_size);
1021
1022 buf_block_t *block= UT_LIST_GET_FIRST(blocks);
1023 if (UNIV_UNLIKELY(!block))
1024 {
1025 create_block:
1026 block= buf_block_alloc();
1027 block->page.access_time= 1U << 16 |
1028 ut_calc_align<uint16_t>(static_cast<uint16_t>(len), ALIGNMENT);
1029 static_assert(ut_is_2pow(ALIGNMENT), "ALIGNMENT must be a power of 2");
1030 UT_LIST_ADD_FIRST(blocks, block);
1031 MEM_MAKE_ADDRESSABLE(block->frame, len);
1032 MEM_NOACCESS(block->frame + len, srv_page_size - len);
1033 return my_assume_aligned<ALIGNMENT>(block->frame);
1034 }
1035
1036 size_t free_offset= static_cast<uint16_t>(block->page.access_time);
1037 ut_ad(!ut_2pow_remainder(free_offset, ALIGNMENT));
1038 if (UNIV_UNLIKELY(!free_offset))
1039 {
1040 ut_ad(srv_page_size == 65536);
1041 goto create_block;
1042 }
1043 ut_ad(free_offset <= srv_page_size);
1044 free_offset+= len;
1045
1046 if (free_offset > srv_page_size)
1047 goto create_block;
1048
1049 block->page.access_time= ((block->page.access_time >> 16) + 1) << 16 |
1050 ut_calc_align<uint16_t>(static_cast<uint16_t>(free_offset), ALIGNMENT);
1051 MEM_MAKE_ADDRESSABLE(block->frame + free_offset - len, len);
1052 return my_assume_aligned<ALIGNMENT>(block->frame + free_offset - len);
1053 }
1054
1055
1056 /** Free a redo log snippet.
1057 @param data buffer returned by alloc() */
free(const void * data)1058 inline void recv_sys_t::free(const void *data)
1059 {
1060 ut_ad(!ut_align_offset(data, ALIGNMENT));
1061 data= page_align(data);
1062 ut_ad(mutex_own(&mutex));
1063
1064 /* MDEV-14481 FIXME: To prevent race condition with buf_pool.resize(),
1065 we must acquire and hold the buffer pool mutex here. */
1066 ut_ad(!buf_pool.resize_in_progress());
1067
1068 auto *chunk= buf_pool.chunks;
1069 for (auto i= buf_pool.n_chunks; i--; chunk++)
1070 {
1071 if (data < chunk->blocks->frame)
1072 continue;
1073 const size_t offs= (reinterpret_cast<const byte*>(data) -
1074 chunk->blocks->frame) >> srv_page_size_shift;
1075 if (offs >= chunk->size)
1076 continue;
1077 buf_block_t *block= &chunk->blocks[offs];
1078 ut_ad(block->frame == data);
1079 ut_ad(block->page.state() == BUF_BLOCK_MEMORY);
1080 ut_ad(static_cast<uint16_t>(block->page.access_time - 1) <
1081 srv_page_size);
1082 ut_ad(block->page.access_time >= 1U << 16);
1083 if (!((block->page.access_time -= 1U << 16) >> 16))
1084 {
1085 UT_LIST_REMOVE(blocks, block);
1086 MEM_MAKE_ADDRESSABLE(block->frame, srv_page_size);
1087 buf_block_free(block);
1088 }
1089 return;
1090 }
1091 ut_ad(0);
1092 }
1093
1094
1095 /** Read a log segment to log_sys.buf.
1096 @param[in,out] start_lsn in: read area start,
1097 out: the last read valid lsn
1098 @param[in] end_lsn read area end
1099 @return whether no invalid blocks (e.g checksum mismatch) were found */
read_log_seg(lsn_t * start_lsn,lsn_t end_lsn)1100 bool log_t::file::read_log_seg(lsn_t* start_lsn, lsn_t end_lsn)
1101 {
1102 ulint len;
1103 bool success = true;
1104 mysql_mutex_assert_owner(&log_sys.mutex);
1105 ut_ad(!(*start_lsn % OS_FILE_LOG_BLOCK_SIZE));
1106 ut_ad(!(end_lsn % OS_FILE_LOG_BLOCK_SIZE));
1107 byte* buf = log_sys.buf;
1108 loop:
1109 lsn_t source_offset = calc_lsn_offset_old(*start_lsn);
1110
1111 ut_a(end_lsn - *start_lsn <= ULINT_MAX);
1112 len = (ulint) (end_lsn - *start_lsn);
1113
1114 ut_ad(len != 0);
1115
1116 const bool at_eof = (source_offset % file_size) + len > file_size;
1117 if (at_eof) {
1118 /* If the above condition is true then len (which is ulint)
1119 is > the expression below, so the typecast is ok */
1120 len = ulint(file_size - (source_offset % file_size));
1121 }
1122
1123 log_sys.n_log_ios++;
1124
1125 ut_a((source_offset >> srv_page_size_shift) <= ULINT_MAX);
1126
1127 recv_sys.read(source_offset, {buf, len});
1128
1129 for (ulint l = 0; l < len; l += OS_FILE_LOG_BLOCK_SIZE,
1130 buf += OS_FILE_LOG_BLOCK_SIZE,
1131 (*start_lsn) += OS_FILE_LOG_BLOCK_SIZE) {
1132 const ulint block_number = log_block_get_hdr_no(buf);
1133
1134 if (block_number != log_block_convert_lsn_to_no(*start_lsn)) {
1135 /* Garbage or an incompletely written log block.
1136 We will not report any error, because this can
1137 happen when InnoDB was killed while it was
1138 writing redo log. We simply treat this as an
1139 abrupt end of the redo log. */
1140 fail:
1141 end_lsn = *start_lsn;
1142 success = false;
1143 break;
1144 }
1145
1146 ulint crc = log_block_calc_checksum_crc32(buf);
1147 ulint cksum = log_block_get_checksum(buf);
1148
1149 DBUG_EXECUTE_IF("log_intermittent_checksum_mismatch", {
1150 static int block_counter;
1151 if (block_counter++ == 0) {
1152 cksum = crc + 1;
1153 }
1154 });
1155
1156 DBUG_EXECUTE_IF("log_checksum_mismatch", { cksum = crc + 1; });
1157
1158 if (UNIV_UNLIKELY(crc != cksum)) {
1159 ib::error_or_warn(srv_operation!=SRV_OPERATION_BACKUP)
1160 << "Invalid log block checksum. block: "
1161 << block_number
1162 << " checkpoint no: "
1163 << log_block_get_checkpoint_no(buf)
1164 << " expected: " << crc
1165 << " found: " << cksum;
1166 goto fail;
1167 }
1168
1169 if (is_encrypted()
1170 && !log_crypt(buf, *start_lsn,
1171 OS_FILE_LOG_BLOCK_SIZE,
1172 LOG_DECRYPT)) {
1173 goto fail;
1174 }
1175
1176 ulint dl = log_block_get_data_len(buf);
1177 if (dl < LOG_BLOCK_HDR_SIZE
1178 || (dl != OS_FILE_LOG_BLOCK_SIZE
1179 && dl > log_sys.trailer_offset())) {
1180 recv_sys.found_corrupt_log = true;
1181 goto fail;
1182 }
1183 }
1184
1185 if (recv_sys.report(time(NULL))) {
1186 ib::info() << "Read redo log up to LSN=" << *start_lsn;
1187 service_manager_extend_timeout(INNODB_EXTEND_TIMEOUT_INTERVAL,
1188 "Read redo log up to LSN=" LSN_PF,
1189 *start_lsn);
1190 }
1191
1192 if (*start_lsn != end_lsn) {
1193 goto loop;
1194 }
1195
1196 return(success);
1197 }
1198
1199
1200
1201 /********************************************************//**
1202 Copies a log segment from the most up-to-date log group to the other log
1203 groups, so that they all contain the latest log data. Also writes the info
1204 about the latest checkpoint to the groups, and inits the fields in the group
1205 memory structs to up-to-date values. */
1206 static
1207 void
recv_synchronize_groups()1208 recv_synchronize_groups()
1209 {
1210 const lsn_t recovered_lsn = recv_sys.recovered_lsn;
1211
1212 /* Read the last recovered log block to the recovery system buffer:
1213 the block is always incomplete */
1214
1215 lsn_t start_lsn = ut_uint64_align_down(recovered_lsn,
1216 OS_FILE_LOG_BLOCK_SIZE);
1217 log_sys.log.read_log_seg(&start_lsn,
1218 start_lsn + OS_FILE_LOG_BLOCK_SIZE);
1219 log_sys.log.set_fields(recovered_lsn);
1220
1221 /* Copy the checkpoint info to the log; remember that we have
1222 incremented checkpoint_no by one, and the info will not be written
1223 over the max checkpoint info, thus making the preservation of max
1224 checkpoint info on disk certain */
1225
1226 if (!srv_read_only_mode) {
1227 log_write_checkpoint_info(0);
1228 mysql_mutex_lock(&log_sys.mutex);
1229 }
1230 }
1231
1232 /** Check the consistency of a log header block.
1233 @param[in] log header block
1234 @return true if ok */
1235 static
1236 bool
recv_check_log_header_checksum(const byte * buf)1237 recv_check_log_header_checksum(
1238 const byte* buf)
1239 {
1240 return(log_block_get_checksum(buf)
1241 == log_block_calc_checksum_crc32(buf));
1242 }
1243
redo_file_sizes_are_correct()1244 static bool redo_file_sizes_are_correct()
1245 {
1246 auto paths= get_existing_log_files_paths();
1247 auto get_size= [](const std::string &path) {
1248 return os_file_get_size(path.c_str()).m_total_size;
1249 };
1250 os_offset_t size= get_size(paths[0]);
1251
1252 auto it=
1253 std::find_if(paths.begin(), paths.end(), [&](const std::string &path) {
1254 return get_size(path) != size;
1255 });
1256
1257 if (it == paths.end())
1258 return true;
1259
1260 ib::error() << "Log file " << *it << " is of different size "
1261 << get_size(*it) << " bytes than other log files " << size
1262 << " bytes!";
1263 return false;
1264 }
1265
1266 /** Calculate the checksum for a log block using the pre-10.2.2 algorithm. */
log_block_calc_checksum_format_0(const byte * b)1267 inline uint32_t log_block_calc_checksum_format_0(const byte *b)
1268 {
1269 uint32_t sum= 1;
1270 const byte *const end= &b[512 - 4];
1271
1272 for (uint32_t sh= 0; b < end; )
1273 {
1274 sum&= 0x7FFFFFFFUL;
1275 sum+= uint32_t{*b} << sh++;
1276 sum+= *b++;
1277 if (sh > 24)
1278 sh= 0;
1279 }
1280
1281 return sum;
1282 }
1283
1284 /** Determine if a redo log from before MariaDB 10.2.2 is clean.
1285 @return error code
1286 @retval DB_SUCCESS if the redo log is clean
1287 @retval DB_CORRUPTION if the redo log is corrupted
1288 @retval DB_ERROR if the redo log is not empty */
recv_log_recover_pre_10_2()1289 ATTRIBUTE_COLD static dberr_t recv_log_recover_pre_10_2()
1290 {
1291 uint64_t max_no= 0;
1292 byte *buf= log_sys.buf;
1293
1294 ut_ad(log_sys.log.format == 0);
1295
1296 if (!redo_file_sizes_are_correct())
1297 return DB_CORRUPTION;
1298
1299 /** Offset of the first checkpoint checksum */
1300 constexpr uint CHECKSUM_1= 288;
1301 /** Offset of the second checkpoint checksum */
1302 constexpr uint CHECKSUM_2= CHECKSUM_1 + 4;
1303 /** the checkpoint LSN field */
1304 constexpr uint CHECKPOINT_LSN= 8;
1305 /** Most significant bits of the checkpoint offset */
1306 constexpr uint OFFS_HI= CHECKSUM_2 + 12;
1307 /** Least significant bits of the checkpoint offset */
1308 constexpr uint OFFS_LO= 16;
1309
1310 lsn_t lsn= 0;
1311
1312 for (ulint field= LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1313 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1)
1314 {
1315 log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
1316
1317 if (static_cast<uint32_t>(ut_fold_binary(buf, CHECKSUM_1)) !=
1318 mach_read_from_4(buf + CHECKSUM_1) ||
1319 static_cast<uint32_t>(ut_fold_binary(buf + CHECKPOINT_LSN,
1320 CHECKSUM_2 - CHECKPOINT_LSN)) !=
1321 mach_read_from_4(buf + CHECKSUM_2))
1322 {
1323 DBUG_LOG("ib_log", "invalid pre-10.2.2 checkpoint " << field);
1324 continue;
1325 }
1326
1327 if (!log_crypt_101_read_checkpoint(buf))
1328 {
1329 ib::error() << "Decrypting checkpoint failed";
1330 continue;
1331 }
1332
1333 const uint64_t checkpoint_no= mach_read_from_8(buf);
1334
1335 DBUG_PRINT("ib_log", ("checkpoint " UINT64PF " at " LSN_PF " found",
1336 checkpoint_no,
1337 mach_read_from_8(buf + CHECKPOINT_LSN)));
1338
1339 if (checkpoint_no >= max_no)
1340 {
1341 max_no= checkpoint_no;
1342 lsn= mach_read_from_8(buf + CHECKPOINT_LSN);
1343 log_sys.log.set_lsn(lsn);
1344 log_sys.log.set_lsn_offset(lsn_t{mach_read_from_4(buf + OFFS_HI)} << 32 |
1345 mach_read_from_4(buf + OFFS_LO));
1346 }
1347 }
1348
1349 if (!lsn)
1350 {
1351 ib::error() << "Upgrade after a crash is not supported."
1352 " This redo log was created before MariaDB 10.2.2,"
1353 " and we did not find a valid checkpoint."
1354 " Please follow the instructions at"
1355 " https://mariadb.com/kb/en/library/upgrading/";
1356 return DB_ERROR;
1357 }
1358
1359 log_sys.set_lsn(lsn);
1360 log_sys.set_flushed_lsn(lsn);
1361 const lsn_t source_offset= log_sys.log.calc_lsn_offset_old(lsn);
1362
1363 static constexpr char NO_UPGRADE_RECOVERY_MSG[]=
1364 "Upgrade after a crash is not supported."
1365 " This redo log was created before MariaDB 10.2.2";
1366
1367 recv_sys.read(source_offset & ~511, {buf, 512});
1368
1369 if (log_block_calc_checksum_format_0(buf) != log_block_get_checksum(buf) &&
1370 !log_crypt_101_read_block(buf, lsn))
1371 {
1372 ib::error() << NO_UPGRADE_RECOVERY_MSG << ", and it appears corrupted.";
1373 return DB_CORRUPTION;
1374 }
1375
1376 if (mach_read_from_2(buf + 4) == (source_offset & 511))
1377 {
1378 /* Mark the redo log for upgrading. */
1379 srv_log_file_size= 0;
1380 recv_sys.parse_start_lsn= recv_sys.recovered_lsn= recv_sys.scanned_lsn=
1381 recv_sys.mlog_checkpoint_lsn = lsn;
1382 log_sys.last_checkpoint_lsn= log_sys.next_checkpoint_lsn=
1383 log_sys.write_lsn= log_sys.current_flush_lsn= lsn;
1384 log_sys.next_checkpoint_no= 0;
1385 return DB_SUCCESS;
1386 }
1387
1388 if (buf[20 + 32 * 9] == 2)
1389 ib::error() << "Cannot decrypt log for upgrading."
1390 " The encrypted log was created before MariaDB 10.2.2.";
1391 else
1392 ib::error() << NO_UPGRADE_RECOVERY_MSG << ".";
1393
1394 return DB_ERROR;
1395 }
1396
1397 /** Calculate the offset of a log sequence number
1398 in an old redo log file (during upgrade check).
1399 @param[in] lsn log sequence number
1400 @return byte offset within the log */
calc_lsn_offset_old(lsn_t lsn) const1401 inline lsn_t log_t::file::calc_lsn_offset_old(lsn_t lsn) const
1402 {
1403 const lsn_t size= capacity() * recv_sys.files_size();
1404 lsn_t l= lsn - this->lsn;
1405 if (longlong(l) < 0)
1406 {
1407 l= lsn_t(-longlong(l)) % size;
1408 l= size - l;
1409 }
1410
1411 l+= lsn_offset - LOG_FILE_HDR_SIZE * (1 + lsn_offset / file_size);
1412 l%= size;
1413 return l + LOG_FILE_HDR_SIZE * (1 + l / (file_size - LOG_FILE_HDR_SIZE));
1414 }
1415
1416 /** Determine if a redo log from MariaDB 10.2.2+, 10.3, or 10.4 is clean.
1417 @return error code
1418 @retval DB_SUCCESS if the redo log is clean
1419 @retval DB_CORRUPTION if the redo log is corrupted
1420 @retval DB_ERROR if the redo log is not empty */
recv_log_recover_10_4()1421 static dberr_t recv_log_recover_10_4()
1422 {
1423 const lsn_t lsn = log_sys.log.get_lsn();
1424 const lsn_t source_offset = log_sys.log.calc_lsn_offset_old(lsn);
1425 byte* buf = log_sys.buf;
1426
1427 if (!redo_file_sizes_are_correct()) {
1428 return DB_CORRUPTION;
1429 }
1430
1431 recv_sys.read(source_offset & ~(OS_FILE_LOG_BLOCK_SIZE - 1),
1432 {buf, OS_FILE_LOG_BLOCK_SIZE});
1433
1434 ulint crc = log_block_calc_checksum_crc32(buf);
1435 ulint cksum = log_block_get_checksum(buf);
1436
1437 if (UNIV_UNLIKELY(crc != cksum)) {
1438 ib::error() << "Invalid log block checksum."
1439 << " block: "
1440 << log_block_get_hdr_no(buf)
1441 << " checkpoint no: "
1442 << log_block_get_checkpoint_no(buf)
1443 << " expected: " << crc
1444 << " found: " << cksum;
1445 return DB_CORRUPTION;
1446 }
1447
1448 if (log_sys.log.is_encrypted()
1449 && !log_crypt(buf, lsn & ~511, 512, LOG_DECRYPT)) {
1450 return DB_ERROR;
1451 }
1452
1453 /* On a clean shutdown, the redo log will be logically empty
1454 after the checkpoint lsn. */
1455
1456 if (log_block_get_data_len(buf)
1457 != (source_offset & (OS_FILE_LOG_BLOCK_SIZE - 1))) {
1458 return DB_ERROR;
1459 }
1460
1461 /* Mark the redo log for upgrading. */
1462 srv_log_file_size = 0;
1463 recv_sys.parse_start_lsn = recv_sys.recovered_lsn
1464 = recv_sys.scanned_lsn
1465 = recv_sys.mlog_checkpoint_lsn = lsn;
1466 log_sys.set_lsn(lsn);
1467 log_sys.set_flushed_lsn(lsn);
1468 log_sys.last_checkpoint_lsn = log_sys.next_checkpoint_lsn
1469 = log_sys.write_lsn = log_sys.current_flush_lsn = lsn;
1470 log_sys.next_checkpoint_no = 0;
1471 return DB_SUCCESS;
1472 }
1473
1474 /** Find the latest checkpoint in the log header.
1475 @param[out] max_field LOG_CHECKPOINT_1 or LOG_CHECKPOINT_2
1476 @return error code or DB_SUCCESS */
1477 dberr_t
recv_find_max_checkpoint(ulint * max_field)1478 recv_find_max_checkpoint(ulint* max_field)
1479 {
1480 ib_uint64_t max_no;
1481 ib_uint64_t checkpoint_no;
1482 ulint field;
1483 byte* buf;
1484
1485 max_no = 0;
1486 *max_field = 0;
1487
1488 buf = log_sys.checkpoint_buf;
1489
1490 log_sys.log.read(0, {buf, OS_FILE_LOG_BLOCK_SIZE});
1491 /* Check the header page checksum. There was no
1492 checksum in the first redo log format (version 0). */
1493 log_sys.log.format = mach_read_from_4(buf + LOG_HEADER_FORMAT);
1494 log_sys.log.subformat = log_sys.log.format != log_t::FORMAT_3_23
1495 ? mach_read_from_4(buf + LOG_HEADER_SUBFORMAT)
1496 : 0;
1497 if (log_sys.log.format != log_t::FORMAT_3_23
1498 && !recv_check_log_header_checksum(buf)) {
1499 ib::error() << "Invalid redo log header checksum.";
1500 return(DB_CORRUPTION);
1501 }
1502
1503 char creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR + 1];
1504
1505 memcpy(creator, buf + LOG_HEADER_CREATOR, sizeof creator);
1506 /* Ensure that the string is NUL-terminated. */
1507 creator[LOG_HEADER_CREATOR_END - LOG_HEADER_CREATOR] = 0;
1508
1509 switch (log_sys.log.format) {
1510 case log_t::FORMAT_3_23:
1511 return recv_log_recover_pre_10_2();
1512 case log_t::FORMAT_10_2:
1513 case log_t::FORMAT_10_2 | log_t::FORMAT_ENCRYPTED:
1514 case log_t::FORMAT_10_3:
1515 case log_t::FORMAT_10_3 | log_t::FORMAT_ENCRYPTED:
1516 case log_t::FORMAT_10_4:
1517 case log_t::FORMAT_10_4 | log_t::FORMAT_ENCRYPTED:
1518 case log_t::FORMAT_10_5:
1519 case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED:
1520 break;
1521 default:
1522 ib::error() << "Unsupported redo log format."
1523 " The redo log was created with " << creator << ".";
1524 return(DB_ERROR);
1525 }
1526
1527 for (field = LOG_CHECKPOINT_1; field <= LOG_CHECKPOINT_2;
1528 field += LOG_CHECKPOINT_2 - LOG_CHECKPOINT_1) {
1529 log_sys.log.read(field, {buf, OS_FILE_LOG_BLOCK_SIZE});
1530
1531 const ulint crc32 = log_block_calc_checksum_crc32(buf);
1532 const ulint cksum = log_block_get_checksum(buf);
1533
1534 if (crc32 != cksum) {
1535 DBUG_PRINT("ib_log",
1536 ("invalid checkpoint,"
1537 " at " ULINTPF
1538 ", checksum " ULINTPFx
1539 " expected " ULINTPFx,
1540 field, cksum, crc32));
1541 continue;
1542 }
1543
1544 if (log_sys.is_encrypted()
1545 && !log_crypt_read_checkpoint_buf(buf)) {
1546 ib::error() << "Reading checkpoint"
1547 " encryption info failed.";
1548 continue;
1549 }
1550
1551 checkpoint_no = mach_read_from_8(
1552 buf + LOG_CHECKPOINT_NO);
1553
1554 DBUG_PRINT("ib_log",
1555 ("checkpoint " UINT64PF " at " LSN_PF " found",
1556 checkpoint_no, mach_read_from_8(
1557 buf + LOG_CHECKPOINT_LSN)));
1558
1559 if (checkpoint_no >= max_no) {
1560 *max_field = field;
1561 max_no = checkpoint_no;
1562 log_sys.log.set_lsn(mach_read_from_8(
1563 buf + LOG_CHECKPOINT_LSN));
1564 log_sys.log.set_lsn_offset(mach_read_from_8(
1565 buf + LOG_CHECKPOINT_OFFSET));
1566 log_sys.next_checkpoint_no = checkpoint_no;
1567 }
1568 }
1569
1570 if (*max_field == 0) {
1571 /* Before 10.2.2, we could get here during database
1572 initialization if we created an LOG_FILE_NAME file that
1573 was filled with zeroes, and were killed. After
1574 10.2.2, we would reject such a file already earlier,
1575 when checking the file header. */
1576 ib::error() << "No valid checkpoint found"
1577 " (corrupted redo log)."
1578 " You can try --innodb-force-recovery=6"
1579 " as a last resort.";
1580 return(DB_ERROR);
1581 }
1582
1583 switch (log_sys.log.format) {
1584 case log_t::FORMAT_10_5:
1585 case log_t::FORMAT_10_5 | log_t::FORMAT_ENCRYPTED:
1586 break;
1587 default:
1588 if (dberr_t err = recv_log_recover_10_4()) {
1589 ib::error()
1590 << "Upgrade after a crash is not supported."
1591 " The redo log was created with " << creator
1592 << (err == DB_ERROR
1593 ? "." : ", and it appears corrupted.");
1594 return err;
1595 }
1596 }
1597
1598 return(DB_SUCCESS);
1599 }
1600
1601 /*******************************************************//**
1602 Calculates the new value for lsn when more data is added to the log. */
1603 static
1604 lsn_t
recv_calc_lsn_on_data_add(lsn_t lsn,ib_uint64_t len)1605 recv_calc_lsn_on_data_add(
1606 /*======================*/
1607 lsn_t lsn, /*!< in: old lsn */
1608 ib_uint64_t len) /*!< in: this many bytes of data is
1609 added, log block headers not included */
1610 {
1611 unsigned frag_len = static_cast<unsigned>(lsn % OS_FILE_LOG_BLOCK_SIZE)
1612 - LOG_BLOCK_HDR_SIZE;
1613 unsigned payload_size = log_sys.payload_size();
1614 ut_ad(frag_len < payload_size);
1615 lsn_t lsn_len = len;
1616 lsn_len += (lsn_len + frag_len) / payload_size
1617 * (OS_FILE_LOG_BLOCK_SIZE - payload_size);
1618
1619 return(lsn + lsn_len);
1620 }
1621
1622 /** Trim old log records for a page.
1623 @param start_lsn oldest log sequence number to preserve
1624 @return whether all the log for the page was trimmed */
trim(lsn_t start_lsn)1625 inline bool page_recv_t::trim(lsn_t start_lsn)
1626 {
1627 while (log.head)
1628 {
1629 if (log.head->lsn >= start_lsn) return false;
1630 last_offset= 1; /* the next record must not be same_page */
1631 log_rec_t *next= log.head->next;
1632 recv_sys.free(log.head);
1633 log.head= next;
1634 }
1635 log.tail= nullptr;
1636 return true;
1637 }
1638
1639
clear()1640 inline void page_recv_t::recs_t::clear()
1641 {
1642 ut_ad(mutex_own(&recv_sys.mutex));
1643 for (const log_rec_t *l= head; l; )
1644 {
1645 const log_rec_t *next= l->next;
1646 recv_sys.free(l);
1647 l= next;
1648 }
1649 head= tail= nullptr;
1650 }
1651
1652
1653 /** Ignore any earlier redo log records for this page. */
will_not_read()1654 inline void page_recv_t::will_not_read()
1655 {
1656 ut_ad(state == RECV_NOT_PROCESSED || state == RECV_WILL_NOT_READ);
1657 state= RECV_WILL_NOT_READ;
1658 log.clear();
1659 }
1660
1661
1662 /** Register a redo log snippet for a page.
1663 @param it page iterator
1664 @param start_lsn start LSN of the mini-transaction
1665 @param lsn @see mtr_t::commit_lsn()
1666 @param recs redo log snippet @see log_t::FORMAT_10_5
1667 @param len length of l, in bytes */
add(map::iterator it,lsn_t start_lsn,lsn_t lsn,const byte * l,size_t len)1668 inline void recv_sys_t::add(map::iterator it, lsn_t start_lsn, lsn_t lsn,
1669 const byte *l, size_t len)
1670 {
1671 ut_ad(mutex_own(&mutex));
1672 page_id_t page_id = it->first;
1673 page_recv_t &recs= it->second;
1674
1675 switch (*l & 0x70) {
1676 case FREE_PAGE: case INIT_PAGE:
1677 recs.will_not_read();
1678 mlog_init.add(page_id, start_lsn); /* FIXME: remove this! */
1679 /* fall through */
1680 default:
1681 log_phys_t *tail= static_cast<log_phys_t*>(recs.log.last());
1682 if (!tail)
1683 break;
1684 if (tail->start_lsn != start_lsn)
1685 break;
1686 ut_ad(tail->lsn == lsn);
1687 buf_block_t *block= UT_LIST_GET_LAST(blocks);
1688 ut_ad(block);
1689 const size_t used= static_cast<uint16_t>(block->page.access_time - 1) + 1;
1690 ut_ad(used >= ALIGNMENT);
1691 const byte *end= const_cast<const log_phys_t*>(tail)->end();
1692 if (!((reinterpret_cast<size_t>(end + len) ^
1693 reinterpret_cast<size_t>(end)) & ~(ALIGNMENT - 1)))
1694 {
1695 /* Use already allocated 'padding' bytes */
1696 append:
1697 MEM_MAKE_ADDRESSABLE(end + 1, len);
1698 /* Append to the preceding record for the page */
1699 tail->append(l, len);
1700 return;
1701 }
1702 if (end <= &block->frame[used - ALIGNMENT] || &block->frame[used] >= end)
1703 break; /* Not the last allocated record in the page */
1704 const size_t new_used= static_cast<size_t>(end - block->frame + len + 1);
1705 ut_ad(new_used > used);
1706 if (new_used > srv_page_size)
1707 break;
1708 block->page.access_time= (block->page.access_time & ~0U << 16) |
1709 ut_calc_align<uint16_t>(static_cast<uint16_t>(new_used), ALIGNMENT);
1710 goto append;
1711 }
1712 recs.log.append(new (alloc(log_phys_t::alloc_size(len)))
1713 log_phys_t(start_lsn, lsn, l, len));
1714 }
1715
1716 /** Store/remove the freed pages in fil_name_t of recv_spaces.
1717 @param[in] page_id freed or init page_id
1718 @param[in] freed TRUE if page is freed */
store_freed_or_init_rec(page_id_t page_id,bool freed)1719 static void store_freed_or_init_rec(page_id_t page_id, bool freed)
1720 {
1721 uint32_t space_id= page_id.space();
1722 uint32_t page_no= page_id.page_no();
1723 if (is_predefined_tablespace(space_id))
1724 {
1725 if (!srv_immediate_scrub_data_uncompressed)
1726 return;
1727 fil_space_t *space;
1728 if (space_id == TRX_SYS_SPACE)
1729 space= fil_system.sys_space;
1730 else
1731 space= fil_space_get(space_id);
1732
1733 space->free_page(page_no, freed);
1734 return;
1735 }
1736
1737 recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
1738 if (i != recv_spaces.end() && i->first == space_id)
1739 {
1740 if (freed)
1741 i->second.add_freed_page(page_no);
1742 else
1743 i->second.remove_freed_page(page_no);
1744 }
1745 }
1746
1747 /** Parse and register one mini-transaction in log_t::FORMAT_10_5.
1748 @param checkpoint_lsn the log sequence number of the latest checkpoint
1749 @param store whether to store the records
1750 @param apply whether to apply file-level log records
1751 @return whether FILE_CHECKPOINT record was seen the first time,
1752 or corruption was noticed */
parse(lsn_t checkpoint_lsn,store_t * store,bool apply)1753 bool recv_sys_t::parse(lsn_t checkpoint_lsn, store_t *store, bool apply)
1754 {
1755 mysql_mutex_assert_owner(&log_sys.mutex);
1756 ut_ad(mutex_own(&mutex));
1757 ut_ad(parse_start_lsn);
1758 ut_ad(log_sys.is_physical());
1759
1760 bool last_phase= (*store == STORE_IF_EXISTS);
1761 const byte *const end= buf + len;
1762 loop:
1763 const byte *const log= buf + recovered_offset;
1764 const lsn_t start_lsn= recovered_lsn;
1765 map::iterator cached_pages_it = pages.end();
1766
1767 /* Check that the entire mini-transaction is included within the buffer */
1768 const byte *l;
1769 uint32_t rlen;
1770 for (l= log; l < end; l+= rlen)
1771 {
1772 if (!*l)
1773 goto eom_found;
1774 if (UNIV_LIKELY((*l & 0x70) != RESERVED));
1775 else if (srv_force_recovery)
1776 ib::warn() << "Ignoring unknown log record at LSN " << recovered_lsn;
1777 else
1778 {
1779 malformed:
1780 ib::error() << "Malformed log record;"
1781 " set innodb_force_recovery=1 to ignore.";
1782 corrupted:
1783 const size_t trailing_bytes= std::min<size_t>(100, size_t(end - l));
1784 ib::info() << "Dump from the start of the mini-transaction (LSN="
1785 << start_lsn << ") to "
1786 << trailing_bytes << " bytes after the record:";
1787 ut_print_buf(stderr, log, l - log + trailing_bytes);
1788 putc('\n', stderr);
1789 found_corrupt_log= true;
1790 return true;
1791 }
1792 rlen= *l++ & 0xf;
1793 if (l + (rlen ? rlen : 16) >= end)
1794 break;
1795 if (!rlen)
1796 {
1797 rlen= mlog_decode_varint_length(*l);
1798 if (l + rlen >= end)
1799 break;
1800 const uint32_t addlen= mlog_decode_varint(l);
1801 if (UNIV_UNLIKELY(addlen == MLOG_DECODE_ERROR))
1802 {
1803 ib::error() << "Corrupted record length";
1804 goto corrupted;
1805 }
1806 rlen= addlen + 15;
1807 }
1808 }
1809
1810 /* Not the entire mini-transaction was present. */
1811 return false;
1812
1813 eom_found:
1814 ut_ad(!*l);
1815 ut_d(const byte *const el= l + 1);
1816
1817 const lsn_t end_lsn= recv_calc_lsn_on_data_add(start_lsn, l + 1 - log);
1818 if (UNIV_UNLIKELY(end_lsn > scanned_lsn))
1819 /* The log record filled a log block, and we require that also the
1820 next log block should have been scanned in */
1821 return false;
1822
1823 ut_d(std::set<page_id_t> freed);
1824 #if 0 && defined UNIV_DEBUG /* MDEV-21727 FIXME: enable this */
1825 /* Pages that have been modified in this mini-transaction.
1826 If a mini-transaction writes INIT_PAGE for a page, it should not have
1827 written any log records for the page. Unfortunately, this does not
1828 hold for ROW_FORMAT=COMPRESSED pages, because page_zip_compress()
1829 can be invoked in a pessimistic operation, even after log has
1830 been written for other pages. */
1831 ut_d(std::set<page_id_t> modified);
1832 #endif
1833
1834 uint32_t space_id= 0, page_no= 0, last_offset= 0;
1835 bool got_page_op= false;
1836 for (l= log; l < end; l+= rlen)
1837 {
1838 const byte *const recs= l;
1839 const byte b= *l++;
1840
1841 if (!b)
1842 break;
1843 ut_ad(UNIV_LIKELY(b & 0x70) != RESERVED || srv_force_recovery);
1844 rlen= b & 0xf;
1845 ut_ad(l + rlen < end);
1846 ut_ad(rlen || l + 16 < end);
1847 if (!rlen)
1848 {
1849 const uint32_t lenlen= mlog_decode_varint_length(*l);
1850 ut_ad(l + lenlen < end);
1851 const uint32_t addlen= mlog_decode_varint(l);
1852 ut_ad(addlen != MLOG_DECODE_ERROR);
1853 rlen= addlen + 15 - lenlen;
1854 l+= lenlen;
1855 }
1856 ut_ad(l + rlen < end);
1857 uint32_t idlen;
1858 if ((b & 0x80) && got_page_op)
1859 {
1860 /* This record is for the same page as the previous one. */
1861 if (UNIV_UNLIKELY((b & 0x70) <= INIT_PAGE))
1862 {
1863 record_corrupted:
1864 /* FREE_PAGE,INIT_PAGE cannot be with same_page flag */
1865 if (!srv_force_recovery)
1866 goto malformed;
1867 ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn;
1868 last_offset= 1; /* the next record must not be same_page */
1869 continue;
1870 }
1871 goto same_page;
1872 }
1873 last_offset= 0;
1874 idlen= mlog_decode_varint_length(*l);
1875 if (UNIV_UNLIKELY(idlen > 5 || idlen >= rlen))
1876 {
1877 page_id_corrupted:
1878 if (!srv_force_recovery)
1879 {
1880 ib::error() << "Corrupted page identifier at " << recovered_lsn
1881 << "; set innodb_force_recovery=1 to ignore the record.";
1882 goto corrupted;
1883 }
1884 ib::warn() << "Ignoring corrupted page identifier at LSN "
1885 << recovered_lsn;
1886 continue;
1887 }
1888 space_id= mlog_decode_varint(l);
1889 if (UNIV_UNLIKELY(space_id == MLOG_DECODE_ERROR))
1890 goto page_id_corrupted;
1891 l+= idlen;
1892 rlen-= idlen;
1893 idlen= mlog_decode_varint_length(*l);
1894 if (UNIV_UNLIKELY(idlen > 5 || idlen > rlen))
1895 goto page_id_corrupted;
1896 page_no= mlog_decode_varint(l);
1897 if (UNIV_UNLIKELY(page_no == MLOG_DECODE_ERROR))
1898 goto page_id_corrupted;
1899 l+= idlen;
1900 rlen-= idlen;
1901 got_page_op = !(b & 0x80);
1902 if (got_page_op && apply && !is_predefined_tablespace(space_id))
1903 {
1904 recv_spaces_t::iterator i= recv_spaces.lower_bound(space_id);
1905 if (i != recv_spaces.end() && i->first == space_id);
1906 else if (recovered_lsn < mlog_checkpoint_lsn)
1907 /* We have not seen all records between the checkpoint and
1908 FILE_CHECKPOINT. There should be a FILE_DELETE for this
1909 tablespace later. */
1910 recv_spaces.emplace_hint(i, space_id, file_name_t("", false));
1911 else
1912 {
1913 const page_id_t id(space_id, page_no);
1914 if (!srv_force_recovery)
1915 {
1916 ib::error() << "Missing FILE_DELETE or FILE_MODIFY for " << id
1917 << " at " << recovered_lsn
1918 << "; set innodb_force_recovery=1 to ignore the record.";
1919 goto corrupted;
1920 }
1921 ib::warn() << "Ignoring record for " << id << " at " << recovered_lsn;
1922 continue;
1923 }
1924 }
1925 same_page:
1926 DBUG_PRINT("ib_log",
1927 ("scan " LSN_PF ": rec %x len %zu page %u:%u",
1928 recovered_lsn, b, static_cast<size_t>(l + rlen - recs),
1929 space_id, page_no));
1930
1931 if (got_page_op)
1932 {
1933 const page_id_t id(space_id, page_no);
1934 ut_d(if ((b & 0x70) == INIT_PAGE) freed.erase(id));
1935 ut_ad(freed.find(id) == freed.end());
1936 switch (b & 0x70) {
1937 case FREE_PAGE:
1938 ut_ad(freed.emplace(id).second);
1939 last_offset= 1; /* the next record must not be same_page */
1940 goto free_or_init_page;
1941 case INIT_PAGE:
1942 last_offset= FIL_PAGE_TYPE;
1943 free_or_init_page:
1944 store_freed_or_init_rec(id, (b & 0x70) == FREE_PAGE);
1945 if (UNIV_UNLIKELY(rlen != 0))
1946 goto record_corrupted;
1947 break;
1948 case EXTENDED:
1949 if (UNIV_UNLIKELY(!rlen))
1950 goto record_corrupted;
1951 if (rlen == 1 && *l == TRIM_PAGES)
1952 {
1953 #if 0 /* For now, we can only truncate an undo log tablespace */
1954 if (UNIV_UNLIKELY(!space_id || !page_no))
1955 goto record_corrupted;
1956 #else
1957 if (!srv_is_undo_tablespace(space_id) ||
1958 page_no != SRV_UNDO_TABLESPACE_SIZE_IN_PAGES)
1959 goto record_corrupted;
1960 static_assert(UT_ARR_SIZE(truncated_undo_spaces) ==
1961 TRX_SYS_MAX_UNDO_SPACES, "compatibility");
1962 truncated_undo_spaces[space_id - srv_undo_space_id_start]=
1963 { recovered_lsn, page_no };
1964 #endif
1965 last_offset= 1; /* the next record must not be same_page */
1966 continue;
1967 }
1968 last_offset= FIL_PAGE_TYPE;
1969 break;
1970 case RESERVED:
1971 case OPTION:
1972 continue;
1973 case WRITE:
1974 case MEMMOVE:
1975 case MEMSET:
1976 if (UNIV_UNLIKELY(rlen == 0 || last_offset == 1))
1977 goto record_corrupted;
1978 const uint32_t olen= mlog_decode_varint_length(*l);
1979 if (UNIV_UNLIKELY(olen >= rlen) || UNIV_UNLIKELY(olen > 3))
1980 goto record_corrupted;
1981 const uint32_t offset= mlog_decode_varint(l);
1982 ut_ad(offset != MLOG_DECODE_ERROR);
1983 static_assert(FIL_PAGE_OFFSET == 4, "compatibility");
1984 if (UNIV_UNLIKELY(offset >= srv_page_size))
1985 goto record_corrupted;
1986 last_offset+= offset;
1987 if (UNIV_UNLIKELY(last_offset < 8 || last_offset >= srv_page_size))
1988 goto record_corrupted;
1989 l+= olen;
1990 rlen-= olen;
1991 if ((b & 0x70) == WRITE)
1992 {
1993 if (UNIV_UNLIKELY(rlen + last_offset > srv_page_size))
1994 goto record_corrupted;
1995 if (UNIV_UNLIKELY(!page_no) && apply)
1996 {
1997 const bool has_size= last_offset <= FSP_HEADER_OFFSET + FSP_SIZE &&
1998 last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SIZE + 4;
1999 const bool has_flags= last_offset <=
2000 FSP_HEADER_OFFSET + FSP_SPACE_FLAGS &&
2001 last_offset + rlen >= FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + 4;
2002 if (has_size || has_flags)
2003 {
2004 recv_spaces_t::iterator it= recv_spaces.find(space_id);
2005 const uint32_t size= has_size
2006 ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SIZE + l -
2007 last_offset)
2008 : 0;
2009 const uint32_t flags= has_flags
2010 ? mach_read_from_4(FSP_HEADER_OFFSET + FSP_SPACE_FLAGS + l -
2011 last_offset)
2012 : file_name_t::initial_flags;
2013 if (it == recv_spaces.end())
2014 ut_ad(!mlog_checkpoint_lsn || space_id == TRX_SYS_SPACE ||
2015 srv_is_undo_tablespace(space_id));
2016 else if (!it->second.space)
2017 {
2018 if (has_size)
2019 it->second.size= size;
2020 if (has_flags)
2021 it->second.flags= flags;
2022 }
2023 fil_space_set_recv_size_and_flags(space_id, size, flags);
2024 }
2025 }
2026 last_offset+= rlen;
2027 break;
2028 }
2029 uint32_t llen= mlog_decode_varint_length(*l);
2030 if (UNIV_UNLIKELY(llen > rlen || llen > 3))
2031 goto record_corrupted;
2032 const uint32_t len= mlog_decode_varint(l);
2033 ut_ad(len != MLOG_DECODE_ERROR);
2034 if (UNIV_UNLIKELY(last_offset + len > srv_page_size))
2035 goto record_corrupted;
2036 l+= llen;
2037 rlen-= llen;
2038 llen= len;
2039 if ((b & 0x70) == MEMSET)
2040 {
2041 if (UNIV_UNLIKELY(rlen > llen))
2042 goto record_corrupted;
2043 last_offset+= llen;
2044 break;
2045 }
2046 const uint32_t slen= mlog_decode_varint_length(*l);
2047 if (UNIV_UNLIKELY(slen != rlen || slen > 3))
2048 goto record_corrupted;
2049 uint32_t s= mlog_decode_varint(l);
2050 ut_ad(slen != MLOG_DECODE_ERROR);
2051 if (s & 1)
2052 s= last_offset - (s >> 1) - 1;
2053 else
2054 s= last_offset + (s >> 1) + 1;
2055 if (UNIV_UNLIKELY(s < 8 || s + llen > srv_page_size))
2056 goto record_corrupted;
2057 last_offset+= llen;
2058 break;
2059 }
2060 #if 0 && defined UNIV_DEBUG
2061 switch (b & 0x70) {
2062 case RESERVED:
2063 case OPTION:
2064 ut_ad(0); /* we did "continue" earlier */
2065 break;
2066 case FREE_PAGE:
2067 break;
2068 default:
2069 ut_ad(modified.emplace(id).second || (b & 0x70) != INIT_PAGE);
2070 }
2071 #endif
2072 const bool is_init= (b & 0x70) <= INIT_PAGE;
2073 switch (*store) {
2074 case STORE_IF_EXISTS:
2075 if (fil_space_t *space= fil_space_t::get(space_id))
2076 {
2077 const auto size= space->get_size();
2078 space->release();
2079 if (!size)
2080 continue;
2081 }
2082 else
2083 continue;
2084 /* fall through */
2085 case STORE_YES:
2086 if (!mlog_init.will_avoid_read(id, start_lsn))
2087 {
2088 if (cached_pages_it == pages.end() || cached_pages_it->first != id)
2089 cached_pages_it= pages.emplace(id, page_recv_t()).first;
2090 add(cached_pages_it, start_lsn, end_lsn, recs,
2091 static_cast<size_t>(l + rlen - recs));
2092 }
2093 continue;
2094 case STORE_NO:
2095 if (!is_init)
2096 continue;
2097 mlog_init.add(id, start_lsn);
2098 map::iterator i= pages.find(id);
2099 if (i == pages.end())
2100 continue;
2101 i->second.log.clear();
2102 pages.erase(i);
2103 }
2104 }
2105 else if (rlen)
2106 {
2107 switch (b & 0xf0) {
2108 case FILE_CHECKPOINT:
2109 if (space_id == 0 && page_no == 0 && rlen == 8)
2110 {
2111 const lsn_t lsn= mach_read_from_8(l);
2112
2113 if (UNIV_UNLIKELY(srv_print_verbose_log == 2))
2114 fprintf(stderr, "FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF "\n",
2115 lsn, lsn != checkpoint_lsn
2116 ? "ignored"
2117 : mlog_checkpoint_lsn ? "reread" : "read",
2118 recovered_lsn);
2119
2120 DBUG_PRINT("ib_log", ("FILE_CHECKPOINT(" LSN_PF ") %s at " LSN_PF,
2121 lsn, lsn != checkpoint_lsn
2122 ? "ignored"
2123 : mlog_checkpoint_lsn ? "reread" : "read",
2124 recovered_lsn));
2125
2126 if (lsn == checkpoint_lsn)
2127 {
2128 /* There can be multiple FILE_CHECKPOINT for the same LSN. */
2129 if (mlog_checkpoint_lsn)
2130 continue;
2131 mlog_checkpoint_lsn= recovered_lsn;
2132 l+= 8;
2133 recovered_offset= l - buf;
2134 return true;
2135 }
2136 continue;
2137 }
2138 /* fall through */
2139 default:
2140 if (!srv_force_recovery)
2141 goto malformed;
2142 ib::warn() << "Ignoring malformed log record at LSN " << recovered_lsn;
2143 continue;
2144 case FILE_DELETE:
2145 case FILE_MODIFY:
2146 case FILE_RENAME:
2147 if (UNIV_UNLIKELY(page_no != 0))
2148 {
2149 file_rec_error:
2150 if (!srv_force_recovery)
2151 {
2152 ib::error() << "Corrupted file-level record;"
2153 " set innodb_force_recovery=1 to ignore.";
2154 goto corrupted;
2155 }
2156
2157 ib::warn() << "Ignoring corrupted file-level record at LSN "
2158 << recovered_lsn;
2159 continue;
2160 }
2161 /* fall through */
2162 case FILE_CREATE:
2163 if (UNIV_UNLIKELY(!space_id || page_no))
2164 goto file_rec_error;
2165 /* There is no terminating NUL character. Names must end in .ibd.
2166 For FILE_RENAME, there is a NUL between the two file names. */
2167 const char * const fn= reinterpret_cast<const char*>(l);
2168 const char *fn2= static_cast<const char*>(memchr(fn, 0, rlen));
2169
2170 if (UNIV_UNLIKELY((fn2 == nullptr) == ((b & 0xf0) == FILE_RENAME)))
2171 goto file_rec_error;
2172
2173 const char * const fnend= fn2 ? fn2 : fn + rlen;
2174 const char * const fn2end= fn2 ? fn + rlen : nullptr;
2175
2176 if (fn2)
2177 {
2178 fn2++;
2179 if (memchr(fn2, 0, fn2end - fn2))
2180 goto file_rec_error;
2181 if (fn2end - fn2 < 4 || memcmp(fn2end - 4, DOT_IBD, 4))
2182 goto file_rec_error;
2183 }
2184
2185 if (is_predefined_tablespace(space_id))
2186 goto file_rec_error;
2187 if (fnend - fn < 4 || memcmp(fnend - 4, DOT_IBD, 4))
2188 goto file_rec_error;
2189
2190 const char saved_end= fn[rlen];
2191 const_cast<char&>(fn[rlen])= '\0';
2192 fil_name_process(const_cast<char*>(fn), fnend - fn, space_id,
2193 (b & 0xf0) == FILE_DELETE);
2194 if (fn2)
2195 fil_name_process(const_cast<char*>(fn2), fn2end - fn2, space_id,
2196 false);
2197 if ((b & 0xf0) < FILE_MODIFY && log_file_op)
2198 log_file_op(space_id, (b & 0xf0) == FILE_CREATE,
2199 l, static_cast<ulint>(fnend - fn),
2200 reinterpret_cast<const byte*>(fn2),
2201 fn2 ? static_cast<ulint>(fn2end - fn2) : 0);
2202 const_cast<char&>(fn[rlen])= saved_end;
2203
2204 if (fn2 && apply)
2205 {
2206 const size_t len= fn2end - fn2;
2207 auto r= renamed_spaces.emplace(space_id, std::string{fn2, len});
2208 if (!r.second)
2209 r.first->second= std::string{fn2, len};
2210 }
2211 if (UNIV_UNLIKELY(found_corrupt_fs))
2212 return true;
2213 }
2214 }
2215 else
2216 goto malformed;
2217 }
2218
2219 ut_ad(l == el);
2220 recovered_offset= l - buf;
2221 recovered_lsn= end_lsn;
2222 if (is_memory_exhausted(store) && last_phase)
2223 return false;
2224 goto loop;
2225 }
2226
2227 /** Apply the hashed log records to the page, if the page lsn is less than the
2228 lsn of a log record.
2229 @param[in,out] block buffer pool page
2230 @param[in,out] mtr mini-transaction
2231 @param[in,out] p recovery address
2232 @param[in,out] space tablespace, or NULL if not looked up yet
2233 @param[in,out] init page initialization operation, or NULL */
recv_recover_page(buf_block_t * block,mtr_t & mtr,const recv_sys_t::map::iterator & p,fil_space_t * space=NULL,mlog_init_t::init * init=NULL)2234 static void recv_recover_page(buf_block_t* block, mtr_t& mtr,
2235 const recv_sys_t::map::iterator& p,
2236 fil_space_t* space = NULL,
2237 mlog_init_t::init* init = NULL)
2238 {
2239 ut_ad(mutex_own(&recv_sys.mutex));
2240 ut_ad(recv_sys.apply_log_recs);
2241 ut_ad(recv_needed_recovery);
2242 ut_ad(!init || init->created);
2243 ut_ad(!init || init->lsn);
2244 ut_ad(block->page.id() == p->first);
2245 ut_ad(!p->second.is_being_processed());
2246 ut_ad(!space || space->id == block->page.id().space());
2247 ut_ad(log_sys.is_physical());
2248
2249 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2250 ib::info() << "Applying log to page " << block->page.id();
2251 }
2252
2253 DBUG_PRINT("ib_log", ("Applying log to page %u:%u",
2254 block->page.id().space(),
2255 block->page.id().page_no()));
2256
2257 p->second.state = page_recv_t::RECV_BEING_PROCESSED;
2258
2259 mutex_exit(&recv_sys.mutex);
2260
2261 byte *frame = UNIV_LIKELY_NULL(block->page.zip.data)
2262 ? block->page.zip.data
2263 : block->frame;
2264 const lsn_t page_lsn = init
2265 ? 0
2266 : mach_read_from_8(frame + FIL_PAGE_LSN);
2267 bool free_page = false;
2268 lsn_t start_lsn = 0, end_lsn = 0;
2269 ut_d(lsn_t recv_start_lsn = 0);
2270 const lsn_t init_lsn = init ? init->lsn : 0;
2271
2272 bool skipped_after_init = false;
2273
2274 for (const log_rec_t* recv : p->second.log) {
2275 const log_phys_t* l = static_cast<const log_phys_t*>(recv);
2276 ut_ad(l->lsn);
2277 ut_ad(end_lsn <= l->lsn);
2278 ut_ad(l->lsn <= log_sys.log.scanned_lsn);
2279
2280 ut_ad(l->start_lsn);
2281 ut_ad(recv_start_lsn <= l->start_lsn);
2282 ut_d(recv_start_lsn = l->start_lsn);
2283
2284 if (l->start_lsn < page_lsn) {
2285 /* This record has already been applied. */
2286 DBUG_PRINT("ib_log", ("apply skip %u:%u LSN " LSN_PF
2287 " < " LSN_PF,
2288 block->page.id().space(),
2289 block->page.id().page_no(),
2290 l->start_lsn, page_lsn));
2291 skipped_after_init = true;
2292 end_lsn = l->lsn;
2293 continue;
2294 }
2295
2296 if (l->start_lsn < init_lsn) {
2297 DBUG_PRINT("ib_log", ("init skip %u:%u LSN " LSN_PF
2298 " < " LSN_PF,
2299 block->page.id().space(),
2300 block->page.id().page_no(),
2301 l->start_lsn, init_lsn));
2302 skipped_after_init = false;
2303 end_lsn = l->lsn;
2304 continue;
2305 }
2306
2307 /* There is no need to check LSN for just initialized pages. */
2308 if (skipped_after_init) {
2309 skipped_after_init = false;
2310 ut_ad(end_lsn == page_lsn);
2311 if (end_lsn != page_lsn)
2312 ib::warn()
2313 << "The last skipped log record LSN "
2314 << end_lsn
2315 << " is not equal to page LSN "
2316 << page_lsn;
2317 }
2318
2319 end_lsn = l->lsn;
2320
2321 if (UNIV_UNLIKELY(srv_print_verbose_log == 2)) {
2322 ib::info() << "apply " << l->start_lsn
2323 << ": " << block->page.id();
2324 }
2325
2326 DBUG_PRINT("ib_log", ("apply " LSN_PF ": %u:%u",
2327 l->start_lsn,
2328 block->page.id().space(),
2329 block->page.id().page_no()));
2330
2331 log_phys_t::apply_status a= l->apply(*block,
2332 p->second.last_offset);
2333
2334 switch (a) {
2335 case log_phys_t::APPLIED_NO:
2336 ut_ad(!mtr.has_modifications());
2337 free_page = true;
2338 start_lsn = 0;
2339 continue;
2340 case log_phys_t::APPLIED_YES:
2341 goto set_start_lsn;
2342 case log_phys_t::APPLIED_TO_FSP_HEADER:
2343 case log_phys_t::APPLIED_TO_ENCRYPTION:
2344 break;
2345 }
2346
2347 if (fil_space_t* s = space
2348 ? space
2349 : fil_space_t::get(block->page.id().space())) {
2350 switch (a) {
2351 case log_phys_t::APPLIED_TO_FSP_HEADER:
2352 s->flags = mach_read_from_4(
2353 FSP_HEADER_OFFSET
2354 + FSP_SPACE_FLAGS + frame);
2355 s->size_in_header = mach_read_from_4(
2356 FSP_HEADER_OFFSET + FSP_SIZE
2357 + frame);
2358 s->free_limit = mach_read_from_4(
2359 FSP_HEADER_OFFSET
2360 + FSP_FREE_LIMIT + frame);
2361 s->free_len = mach_read_from_4(
2362 FSP_HEADER_OFFSET + FSP_FREE
2363 + FLST_LEN + frame);
2364 break;
2365 default:
2366 byte* b= frame
2367 + fsp_header_get_encryption_offset(
2368 block->zip_size())
2369 + FSP_HEADER_OFFSET;
2370 if (memcmp(b, CRYPT_MAGIC, MAGIC_SZ)) {
2371 break;
2372 }
2373 b += MAGIC_SZ;
2374 if (*b != CRYPT_SCHEME_UNENCRYPTED
2375 && *b != CRYPT_SCHEME_1) {
2376 break;
2377 }
2378 if (b[1] != MY_AES_BLOCK_SIZE) {
2379 break;
2380 }
2381 if (b[2 + MY_AES_BLOCK_SIZE + 4 + 4]
2382 > FIL_ENCRYPTION_OFF) {
2383 break;
2384 }
2385 fil_crypt_parse(s, b);
2386 }
2387
2388 if (!space) {
2389 s->release();
2390 }
2391 }
2392
2393 set_start_lsn:
2394 if (recv_sys.found_corrupt_log && !srv_force_recovery) {
2395 break;
2396 }
2397
2398 if (!start_lsn) {
2399 start_lsn = l->start_lsn;
2400 }
2401 }
2402
2403 if (start_lsn) {
2404 ut_ad(end_lsn >= start_lsn);
2405 mach_write_to_8(FIL_PAGE_LSN + frame, end_lsn);
2406 if (UNIV_LIKELY(frame == block->frame)) {
2407 mach_write_to_8(srv_page_size
2408 - FIL_PAGE_END_LSN_OLD_CHKSUM
2409 + frame, end_lsn);
2410 } else {
2411 buf_zip_decompress(block, false);
2412 }
2413
2414 buf_block_modify_clock_inc(block);
2415 mysql_mutex_lock(&log_sys.flush_order_mutex);
2416 buf_flush_note_modification(block, start_lsn, end_lsn);
2417 mysql_mutex_unlock(&log_sys.flush_order_mutex);
2418 } else if (free_page && init) {
2419 /* There have been no operations that modify the page.
2420 Any buffered changes must not be merged. A subsequent
2421 buf_page_create() from a user thread should discard
2422 any buffered changes. */
2423 init->created = false;
2424 ut_ad(!mtr.has_modifications());
2425 block->page.status = buf_page_t::FREED;
2426 }
2427
2428 /* Make sure that committing mtr does not change the modification
2429 lsn values of page */
2430
2431 mtr.discard_modifications();
2432 mtr.commit();
2433
2434 time_t now = time(NULL);
2435
2436 mutex_enter(&recv_sys.mutex);
2437
2438 if (recv_max_page_lsn < page_lsn) {
2439 recv_max_page_lsn = page_lsn;
2440 }
2441
2442 ut_ad(p->second.is_being_processed());
2443 ut_ad(!recv_sys.pages.empty());
2444
2445 if (recv_sys.report(now)) {
2446 const ulint n = recv_sys.pages.size();
2447 ib::info() << "To recover: " << n << " pages from log";
2448 service_manager_extend_timeout(
2449 INNODB_EXTEND_TIMEOUT_INTERVAL, "To recover: " ULINTPF " pages from log", n);
2450 }
2451 }
2452
2453 /** Remove records for a corrupted page.
2454 This function should only be called when innodb_force_recovery is set.
2455 @param page_id corrupted page identifier */
free_corrupted_page(page_id_t page_id)2456 ATTRIBUTE_COLD void recv_sys_t::free_corrupted_page(page_id_t page_id)
2457 {
2458 mutex_enter(&mutex);
2459 map::iterator p= pages.find(page_id);
2460 if (p != pages.end())
2461 {
2462 p->second.log.clear();
2463 pages.erase(p);
2464 }
2465 mutex_exit(&mutex);
2466 }
2467
2468 /** Apply any buffered redo log to a page that was just read from a data file.
2469 @param[in,out] space tablespace
2470 @param[in,out] bpage buffer pool page */
recv_recover_page(fil_space_t * space,buf_page_t * bpage)2471 void recv_recover_page(fil_space_t* space, buf_page_t* bpage)
2472 {
2473 mtr_t mtr;
2474 mtr.start();
2475 mtr.set_log_mode(MTR_LOG_NO_REDO);
2476
2477 ut_ad(bpage->state() == BUF_BLOCK_FILE_PAGE);
2478 buf_block_t* block = reinterpret_cast<buf_block_t*>(bpage);
2479
2480 /* Move the ownership of the x-latch on the page to
2481 this OS thread, so that we can acquire a second
2482 x-latch on it. This is needed for the operations to
2483 the page to pass the debug checks. */
2484 rw_lock_x_lock_move_ownership(&block->lock);
2485 buf_block_buf_fix_inc(block, __FILE__, __LINE__);
2486 rw_lock_x_lock(&block->lock);
2487 mtr.memo_push(block, MTR_MEMO_PAGE_X_FIX);
2488
2489 mutex_enter(&recv_sys.mutex);
2490 if (recv_sys.apply_log_recs) {
2491 recv_sys_t::map::iterator p = recv_sys.pages.find(bpage->id());
2492 if (p != recv_sys.pages.end()
2493 && !p->second.is_being_processed()) {
2494 recv_recover_page(block, mtr, p, space);
2495 p->second.log.clear();
2496 recv_sys.pages.erase(p);
2497 goto func_exit;
2498 }
2499 }
2500
2501 mtr.commit();
2502 func_exit:
2503 mutex_exit(&recv_sys.mutex);
2504 ut_ad(mtr.has_committed());
2505 }
2506
2507 /** Read pages for which log needs to be applied.
2508 @param page_id first page identifier to read
2509 @param i iterator to recv_sys.pages */
recv_read_in_area(page_id_t page_id,recv_sys_t::map::iterator i)2510 static void recv_read_in_area(page_id_t page_id, recv_sys_t::map::iterator i)
2511 {
2512 uint32_t page_nos[32];
2513 ut_ad(page_id == i->first);
2514 page_id.set_page_no(ut_2pow_round(page_id.page_no(), 32U));
2515 const page_id_t up_limit{page_id + 31};
2516 uint32_t* p= page_nos;
2517
2518 for (; i != recv_sys.pages.end() && i->first <= up_limit; i++)
2519 {
2520 if (i->second.state == page_recv_t::RECV_NOT_PROCESSED)
2521 {
2522 i->second.state= page_recv_t::RECV_BEING_READ;
2523 *p++= i->first.page_no();
2524 }
2525 }
2526
2527 if (p != page_nos)
2528 {
2529 mutex_exit(&recv_sys.mutex);
2530 buf_read_recv_pages(page_id.space(), page_nos, ulint(p - page_nos));
2531 mutex_enter(&recv_sys.mutex);
2532 }
2533 }
2534
2535 /** Attempt to initialize a page based on redo log records.
2536 @param page_id page identifier
2537 @param p iterator pointing to page_id
2538 @param mtr mini-transaction
2539 @param b pre-allocated buffer pool block
2540 @return whether the page was successfully initialized */
recover_low(const page_id_t page_id,map::iterator & p,mtr_t & mtr,buf_block_t * b)2541 inline buf_block_t *recv_sys_t::recover_low(const page_id_t page_id,
2542 map::iterator &p, mtr_t &mtr,
2543 buf_block_t *b)
2544 {
2545 ut_ad(mutex_own(&mutex));
2546 ut_ad(p->first == page_id);
2547 page_recv_t &recs= p->second;
2548 ut_ad(recs.state == page_recv_t::RECV_WILL_NOT_READ);
2549 buf_block_t* block= nullptr;
2550 mlog_init_t::init &i= mlog_init.last(page_id);
2551 const lsn_t end_lsn = recs.log.last()->lsn;
2552 if (end_lsn < i.lsn)
2553 DBUG_LOG("ib_log", "skip log for page " << page_id
2554 << " LSN " << end_lsn << " < " << i.lsn);
2555 else if (fil_space_t *space= fil_space_t::get(page_id.space()))
2556 {
2557 mtr.start();
2558 mtr.set_log_mode(MTR_LOG_NO_REDO);
2559 block= buf_page_create(space, page_id.page_no(), space->zip_size(), &mtr,
2560 b);
2561 if (UNIV_UNLIKELY(block != b))
2562 {
2563 /* The page happened to exist in the buffer pool, or it was just
2564 being read in. Before buf_page_get_with_no_latch() returned to
2565 buf_page_create(), all changes must have been applied to the
2566 page already. */
2567 ut_ad(recv_sys.pages.find(page_id) == recv_sys.pages.end());
2568 mtr.commit();
2569 block= nullptr;
2570 }
2571 else
2572 {
2573 ut_ad(&recs == &recv_sys.pages.find(page_id)->second);
2574 i.created= true;
2575 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
2576 recv_recover_page(block, mtr, p, space, &i);
2577 ut_ad(mtr.has_committed());
2578 recs.log.clear();
2579 map::iterator r= p++;
2580 recv_sys.pages.erase(r);
2581 }
2582 space->release();
2583 }
2584
2585 return block;
2586 }
2587
2588 /** Attempt to initialize a page based on redo log records.
2589 @param page_id page identifier
2590 @return whether the page was successfully initialized */
recover_low(const page_id_t page_id)2591 buf_block_t *recv_sys_t::recover_low(const page_id_t page_id)
2592 {
2593 buf_block_t *free_block= buf_LRU_get_free_block(false);
2594 buf_block_t *block= nullptr;
2595
2596 mutex_enter(&mutex);
2597 map::iterator p= pages.find(page_id);
2598
2599 if (p != pages.end() && p->second.state == page_recv_t::RECV_WILL_NOT_READ)
2600 {
2601 mtr_t mtr;
2602 block= recover_low(page_id, p, mtr, free_block);
2603 ut_ad(!block || block == free_block);
2604 }
2605
2606 mutex_exit(&mutex);
2607 if (UNIV_UNLIKELY(!block))
2608 buf_pool.free_block(free_block);
2609 return block;
2610 }
2611
2612 /** Thread-safe function which sorts flush_list by oldest_modification */
log_sort_flush_list()2613 static void log_sort_flush_list()
2614 {
2615 mysql_mutex_lock(&buf_pool.flush_list_mutex);
2616
2617 const size_t size= UT_LIST_GET_LEN(buf_pool.flush_list);
2618 std::unique_ptr<buf_page_t *[]> list(new buf_page_t *[size]);
2619
2620 size_t idx= 0;
2621 for (buf_page_t *p= UT_LIST_GET_FIRST(buf_pool.flush_list); p;
2622 p= UT_LIST_GET_NEXT(list, p))
2623 list.get()[idx++]= p;
2624
2625 std::sort(list.get(), list.get() + size,
2626 [](const buf_page_t *lhs, const buf_page_t *rhs) {
2627 return rhs->oldest_modification() < lhs->oldest_modification();
2628 });
2629
2630 UT_LIST_INIT(buf_pool.flush_list, &buf_page_t::list);
2631
2632 for (size_t i= 0; i < size; i++)
2633 UT_LIST_ADD_LAST(buf_pool.flush_list, list[i]);
2634
2635 mysql_mutex_unlock(&buf_pool.flush_list_mutex);
2636 }
2637
2638 /** Apply buffered log to persistent data pages.
2639 @param last_batch whether it is possible to write more redo log */
apply(bool last_batch)2640 void recv_sys_t::apply(bool last_batch)
2641 {
2642 ut_ad(srv_operation == SRV_OPERATION_NORMAL ||
2643 srv_operation == SRV_OPERATION_RESTORE ||
2644 srv_operation == SRV_OPERATION_RESTORE_EXPORT);
2645
2646 mutex_enter(&mutex);
2647
2648 while (apply_batch_on)
2649 {
2650 bool abort= found_corrupt_log;
2651 mutex_exit(&mutex);
2652
2653 if (abort)
2654 return;
2655
2656 os_thread_sleep(500000);
2657 mutex_enter(&mutex);
2658 }
2659
2660 #ifdef SAFE_MUTEX
2661 DBUG_ASSERT(!last_batch == mysql_mutex_is_owner(&log_sys.mutex));
2662 #endif /* SAFE_MUTEX */
2663
2664 recv_no_ibuf_operations = !last_batch ||
2665 srv_operation == SRV_OPERATION_RESTORE ||
2666 srv_operation == SRV_OPERATION_RESTORE_EXPORT;
2667
2668 mtr_t mtr;
2669
2670 if (!pages.empty())
2671 {
2672 const char *msg= last_batch
2673 ? "Starting final batch to recover "
2674 : "Starting a batch to recover ";
2675 const ulint n= pages.size();
2676 ib::info() << msg << n << " pages from redo log.";
2677 sd_notifyf(0, "STATUS=%s" ULINTPF " pages from redo log", msg, n);
2678
2679 apply_log_recs= true;
2680 apply_batch_on= true;
2681
2682 for (auto id= srv_undo_tablespaces_open; id--;)
2683 {
2684 const trunc& t= truncated_undo_spaces[id];
2685 if (t.lsn)
2686 trim(page_id_t(id + srv_undo_space_id_start, t.pages), t.lsn);
2687 }
2688
2689 fil_system.extend_to_recv_size();
2690
2691 buf_block_t *free_block= buf_LRU_get_free_block(false);
2692
2693 for (map::iterator p= pages.begin(); p != pages.end(); )
2694 {
2695 const page_id_t page_id= p->first;
2696 ut_ad(!p->second.log.empty());
2697
2698 switch (p->second.state) {
2699 case page_recv_t::RECV_BEING_READ:
2700 case page_recv_t::RECV_BEING_PROCESSED:
2701 p++;
2702 continue;
2703 case page_recv_t::RECV_WILL_NOT_READ:
2704 if (UNIV_LIKELY(!!recover_low(page_id, p, mtr, free_block)))
2705 {
2706 mutex_exit(&mutex);
2707 free_block= buf_LRU_get_free_block(false);
2708 mutex_enter(&mutex);
2709 break;
2710 }
2711 ut_ad(p == pages.end() || p->first > page_id);
2712 continue;
2713 case page_recv_t::RECV_NOT_PROCESSED:
2714 recv_read_in_area(page_id, p);
2715 }
2716 p= pages.lower_bound(page_id);
2717 /* Ensure that progress will be made. */
2718 ut_ad(p == pages.end() || p->first > page_id ||
2719 p->second.state >= page_recv_t::RECV_BEING_READ);
2720 }
2721
2722 buf_pool.free_block(free_block);
2723
2724 /* Wait until all the pages have been processed */
2725 while (!pages.empty() || buf_pool.n_pend_reads)
2726 {
2727 const bool abort= found_corrupt_log || found_corrupt_fs;
2728
2729 if (found_corrupt_fs && !srv_force_recovery)
2730 ib::info() << "Set innodb_force_recovery=1 to ignore corrupted pages.";
2731
2732 mutex_exit(&mutex);
2733
2734 if (abort)
2735 return;
2736 os_thread_sleep(500000);
2737 mutex_enter(&mutex);
2738 }
2739 }
2740
2741 if (last_batch)
2742 /* We skipped this in buf_page_create(). */
2743 mlog_init.mark_ibuf_exist(mtr);
2744 else
2745 {
2746 mlog_init.reset();
2747 mysql_mutex_unlock(&log_sys.mutex);
2748 }
2749
2750 mysql_mutex_assert_not_owner(&log_sys.mutex);
2751 mutex_exit(&mutex);
2752
2753 if (last_batch && srv_operation != SRV_OPERATION_RESTORE &&
2754 srv_operation != SRV_OPERATION_RESTORE_EXPORT)
2755 log_sort_flush_list();
2756 else
2757 {
2758 /* Instead of flushing, last_batch could sort the buf_pool.flush_list
2759 in ascending order of buf_page_t::oldest_modification. */
2760 buf_flush_sync_batch(recovered_lsn);
2761 }
2762
2763 if (!last_batch)
2764 {
2765 buf_pool_invalidate();
2766 mysql_mutex_lock(&log_sys.mutex);
2767 }
2768 #if 1 /* Mariabackup FIXME: Remove or adjust rename_table_in_prepare() */
2769 else if (srv_operation != SRV_OPERATION_NORMAL);
2770 #endif
2771 else
2772 {
2773 /* In the last batch, we will apply any rename operations. */
2774 for (auto r : renamed_spaces)
2775 {
2776 const uint32_t id= r.first;
2777 fil_space_t *space= fil_space_t::get(id);
2778 if (!space)
2779 continue;
2780 ut_ad(UT_LIST_GET_LEN(space->chain) == 1);
2781 const char *old= space->chain.start->name;
2782 if (r.second != old)
2783 {
2784 bool exists;
2785 os_file_type_t ftype;
2786 const char *new_name= r.second.c_str();
2787 if (!os_file_status(new_name, &exists, &ftype) || exists)
2788 {
2789 ib::error() << "Cannot replay rename of tablespace " << id
2790 << " from '" << old << "' to '" << r.second <<
2791 (exists ? "' because the target file exists" : "'");
2792 found_corrupt_fs= true;
2793 }
2794 else
2795 {
2796 size_t base= r.second.rfind(OS_PATH_SEPARATOR);
2797 ut_ad(base != std::string::npos);
2798 size_t start= r.second.rfind(OS_PATH_SEPARATOR, base - 1);
2799 if (start == std::string::npos)
2800 start= 0;
2801 else
2802 ++start;
2803 /* Keep only databasename/tablename without .ibd suffix */
2804 std::string space_name(r.second, start, r.second.size() - start - 4);
2805 ut_ad(space_name[base - start] == OS_PATH_SEPARATOR);
2806 #if OS_PATH_SEPARATOR != '/'
2807 space_name[base - start]= '/';
2808 #endif
2809 mysql_mutex_lock(&log_sys.mutex);
2810 if (dberr_t err= space->rename(space_name.c_str(), r.second.c_str(),
2811 false))
2812 {
2813 ib::error() << "Cannot replay rename of tablespace " << id
2814 << " to '" << r.second << "': " << err;
2815 found_corrupt_fs= true;
2816 }
2817 mysql_mutex_unlock(&log_sys.mutex);
2818 }
2819 }
2820 space->release();
2821 }
2822 renamed_spaces.clear();
2823 }
2824
2825 mutex_enter(&mutex);
2826
2827 ut_d(after_apply= true);
2828 clear();
2829 mutex_exit(&mutex);
2830 }
2831
2832 /** Check whether the number of read redo log blocks exceeds the maximum.
2833 Store last_stored_lsn if the recovery is not in the last phase.
2834 @param[in,out] store whether to store page operations
2835 @return whether the memory is exhausted */
is_memory_exhausted(store_t * store)2836 inline bool recv_sys_t::is_memory_exhausted(store_t *store)
2837 {
2838 if (*store == STORE_NO ||
2839 UT_LIST_GET_LEN(blocks) * 3 < buf_pool.get_n_pages())
2840 return false;
2841 if (*store == STORE_YES)
2842 last_stored_lsn= recovered_lsn;
2843 *store= STORE_NO;
2844 DBUG_PRINT("ib_log",("Ran out of memory and last stored lsn " LSN_PF
2845 " last stored offset " ULINTPF "\n",
2846 recovered_lsn, recovered_offset));
2847 return true;
2848 }
2849
2850 /** Adds data from a new log block to the parsing buffer of recv_sys if
2851 recv_sys.parse_start_lsn is non-zero.
2852 @param[in] log_block log block to add
2853 @param[in] scanned_lsn lsn of how far we were able to find
2854 data in this log block
2855 @return true if more data added */
recv_sys_add_to_parsing_buf(const byte * log_block,lsn_t scanned_lsn)2856 bool recv_sys_add_to_parsing_buf(const byte* log_block, lsn_t scanned_lsn)
2857 {
2858 ulint more_len;
2859 ulint data_len;
2860 ulint start_offset;
2861 ulint end_offset;
2862
2863 ut_ad(scanned_lsn >= recv_sys.scanned_lsn);
2864
2865 if (!recv_sys.parse_start_lsn) {
2866 /* Cannot start parsing yet because no start point for
2867 it found */
2868 return(false);
2869 }
2870
2871 data_len = log_block_get_data_len(log_block);
2872
2873 if (recv_sys.parse_start_lsn >= scanned_lsn) {
2874
2875 return(false);
2876
2877 } else if (recv_sys.scanned_lsn >= scanned_lsn) {
2878
2879 return(false);
2880
2881 } else if (recv_sys.parse_start_lsn > recv_sys.scanned_lsn) {
2882 more_len = (ulint) (scanned_lsn - recv_sys.parse_start_lsn);
2883 } else {
2884 more_len = (ulint) (scanned_lsn - recv_sys.scanned_lsn);
2885 }
2886
2887 if (more_len == 0) {
2888 return(false);
2889 }
2890
2891 ut_ad(data_len >= more_len);
2892
2893 start_offset = data_len - more_len;
2894
2895 if (start_offset < LOG_BLOCK_HDR_SIZE) {
2896 start_offset = LOG_BLOCK_HDR_SIZE;
2897 }
2898
2899 end_offset = std::min<ulint>(data_len, log_sys.trailer_offset());
2900
2901 ut_ad(start_offset <= end_offset);
2902
2903 if (start_offset < end_offset) {
2904 memcpy(recv_sys.buf + recv_sys.len,
2905 log_block + start_offset, end_offset - start_offset);
2906
2907 recv_sys.len += end_offset - start_offset;
2908
2909 ut_a(recv_sys.len <= RECV_PARSING_BUF_SIZE);
2910 }
2911
2912 return(true);
2913 }
2914
2915 /** Moves the parsing buffer data left to the buffer start. */
recv_sys_justify_left_parsing_buf()2916 void recv_sys_justify_left_parsing_buf()
2917 {
2918 memmove(recv_sys.buf, recv_sys.buf + recv_sys.recovered_offset,
2919 recv_sys.len - recv_sys.recovered_offset);
2920
2921 recv_sys.len -= recv_sys.recovered_offset;
2922
2923 recv_sys.recovered_offset = 0;
2924 }
2925
2926 /** Scan redo log from a buffer and stores new log data to the parsing buffer.
2927 Parse and hash the log records if new data found.
2928 Apply log records automatically when the hash table becomes full.
2929 @param[in,out] store whether the records should be
2930 stored into recv_sys.pages; this is
2931 reset if just debug checking is
2932 needed, or when the num_max_blocks in
2933 recv_sys runs out
2934 @param[in] log_block log segment
2935 @param[in] checkpoint_lsn latest checkpoint LSN
2936 @param[in] start_lsn buffer start LSN
2937 @param[in] end_lsn buffer end LSN
2938 @param[in,out] contiguous_lsn it is known that all groups contain
2939 contiguous log data upto this lsn
2940 @param[out] group_scanned_lsn scanning succeeded upto this lsn
2941 @return true if not able to scan any more in this log group */
recv_scan_log_recs(store_t * store,const byte * log_block,lsn_t checkpoint_lsn,lsn_t start_lsn,lsn_t end_lsn,lsn_t * contiguous_lsn,lsn_t * group_scanned_lsn)2942 static bool recv_scan_log_recs(
2943 store_t* store,
2944 const byte* log_block,
2945 lsn_t checkpoint_lsn,
2946 lsn_t start_lsn,
2947 lsn_t end_lsn,
2948 lsn_t* contiguous_lsn,
2949 lsn_t* group_scanned_lsn)
2950 {
2951 lsn_t scanned_lsn = start_lsn;
2952 bool finished = false;
2953 ulint data_len;
2954 bool more_data = false;
2955 bool apply = recv_sys.mlog_checkpoint_lsn != 0;
2956 ulint recv_parsing_buf_size = RECV_PARSING_BUF_SIZE;
2957 const bool last_phase = (*store == STORE_IF_EXISTS);
2958 ut_ad(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2959 ut_ad(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0);
2960 ut_ad(end_lsn >= start_lsn + OS_FILE_LOG_BLOCK_SIZE);
2961 ut_ad(log_sys.is_physical());
2962
2963 const byte* const log_end = log_block
2964 + ulint(end_lsn - start_lsn);
2965 constexpr ulint sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT;
2966
2967 do {
2968 ut_ad(!finished);
2969
2970 if (log_block_get_flush_bit(log_block)) {
2971 /* This block was a start of a log flush operation:
2972 we know that the previous flush operation must have
2973 been completed for all log groups before this block
2974 can have been flushed to any of the groups. Therefore,
2975 we know that log data is contiguous up to scanned_lsn
2976 in all non-corrupt log groups. */
2977
2978 if (scanned_lsn > *contiguous_lsn) {
2979 *contiguous_lsn = scanned_lsn;
2980 }
2981 }
2982
2983 data_len = log_block_get_data_len(log_block);
2984
2985 if (scanned_lsn + data_len > recv_sys.scanned_lsn
2986 && log_block_get_checkpoint_no(log_block)
2987 < recv_sys.scanned_checkpoint_no
2988 && (recv_sys.scanned_checkpoint_no
2989 - log_block_get_checkpoint_no(log_block)
2990 > 0x80000000UL)) {
2991
2992 /* Garbage from a log buffer flush which was made
2993 before the most recent database recovery */
2994 finished = true;
2995 break;
2996 }
2997
2998 if (!recv_sys.parse_start_lsn
2999 && (log_block_get_first_rec_group(log_block) > 0)) {
3000
3001 /* We found a point from which to start the parsing
3002 of log records */
3003
3004 recv_sys.parse_start_lsn = scanned_lsn
3005 + log_block_get_first_rec_group(log_block);
3006 recv_sys.scanned_lsn = recv_sys.parse_start_lsn;
3007 recv_sys.recovered_lsn = recv_sys.parse_start_lsn;
3008 }
3009
3010 scanned_lsn += data_len;
3011
3012 if (data_len == LOG_BLOCK_HDR_SIZE + sizeof_checkpoint
3013 && scanned_lsn == checkpoint_lsn + sizeof_checkpoint
3014 && log_block[LOG_BLOCK_HDR_SIZE]
3015 == (FILE_CHECKPOINT | (SIZE_OF_FILE_CHECKPOINT - 2))
3016 && checkpoint_lsn == mach_read_from_8(
3017 (LOG_BLOCK_HDR_SIZE + 1 + 2)
3018 + log_block)) {
3019 /* The redo log is logically empty. */
3020 ut_ad(recv_sys.mlog_checkpoint_lsn == 0
3021 || recv_sys.mlog_checkpoint_lsn
3022 == checkpoint_lsn);
3023 recv_sys.mlog_checkpoint_lsn = checkpoint_lsn;
3024 DBUG_PRINT("ib_log", ("found empty log; LSN=" LSN_PF,
3025 scanned_lsn));
3026 finished = true;
3027 break;
3028 }
3029
3030 if (scanned_lsn > recv_sys.scanned_lsn) {
3031 ut_ad(!srv_log_file_created);
3032 if (!recv_needed_recovery) {
3033 recv_needed_recovery = true;
3034
3035 if (srv_read_only_mode) {
3036 ib::warn() << "innodb_read_only"
3037 " prevents crash recovery";
3038 return(true);
3039 }
3040
3041 ib::info() << "Starting crash recovery from"
3042 " checkpoint LSN=" << checkpoint_lsn
3043 << "," << recv_sys.scanned_lsn;
3044 }
3045
3046 /* We were able to find more log data: add it to the
3047 parsing buffer if parse_start_lsn is already
3048 non-zero */
3049
3050 DBUG_EXECUTE_IF(
3051 "reduce_recv_parsing_buf",
3052 recv_parsing_buf_size = RECV_SCAN_SIZE * 2;
3053 );
3054
3055 if (recv_sys.len + 4 * OS_FILE_LOG_BLOCK_SIZE
3056 >= recv_parsing_buf_size) {
3057 ib::error() << "Log parsing buffer overflow."
3058 " Recovery may have failed!";
3059
3060 recv_sys.found_corrupt_log = true;
3061
3062 if (!srv_force_recovery) {
3063 ib::error()
3064 << "Set innodb_force_recovery"
3065 " to ignore this error.";
3066 return(true);
3067 }
3068 } else if (!recv_sys.found_corrupt_log) {
3069 more_data = recv_sys_add_to_parsing_buf(
3070 log_block, scanned_lsn);
3071 }
3072
3073 recv_sys.scanned_lsn = scanned_lsn;
3074 recv_sys.scanned_checkpoint_no
3075 = log_block_get_checkpoint_no(log_block);
3076 }
3077
3078 /* During last phase of scanning, there can be redo logs
3079 left in recv_sys.buf to parse & store it in recv_sys.heap */
3080 if (last_phase
3081 && recv_sys.recovered_lsn < recv_sys.scanned_lsn) {
3082 more_data = true;
3083 }
3084
3085 if (data_len < OS_FILE_LOG_BLOCK_SIZE) {
3086 /* Log data for this group ends here */
3087 finished = true;
3088 break;
3089 } else {
3090 log_block += OS_FILE_LOG_BLOCK_SIZE;
3091 }
3092 } while (log_block < log_end);
3093
3094 *group_scanned_lsn = scanned_lsn;
3095
3096 mutex_enter(&recv_sys.mutex);
3097
3098 if (more_data && !recv_sys.found_corrupt_log) {
3099 /* Try to parse more log records */
3100 if (recv_sys.parse(checkpoint_lsn, store, apply)) {
3101 ut_ad(recv_sys.found_corrupt_log
3102 || recv_sys.found_corrupt_fs
3103 || recv_sys.mlog_checkpoint_lsn
3104 == recv_sys.recovered_lsn);
3105 finished = true;
3106 goto func_exit;
3107 }
3108
3109 recv_sys.is_memory_exhausted(store);
3110
3111 if (recv_sys.recovered_offset > recv_parsing_buf_size / 4
3112 || (recv_sys.recovered_offset
3113 && recv_sys.len
3114 >= recv_parsing_buf_size - RECV_SCAN_SIZE)) {
3115 /* Move parsing buffer data to the buffer start */
3116 recv_sys_justify_left_parsing_buf();
3117 }
3118
3119 /* Need to re-parse the redo log which're stored
3120 in recv_sys.buf */
3121 if (last_phase && *store == STORE_NO) {
3122 finished = false;
3123 }
3124 }
3125
3126 func_exit:
3127 mutex_exit(&recv_sys.mutex);
3128 return(finished);
3129 }
3130
3131 /** Scans log from a buffer and stores new log data to the parsing buffer.
3132 Parses and hashes the log records if new data found.
3133 @param[in] checkpoint_lsn latest checkpoint log sequence number
3134 @param[in,out] contiguous_lsn log sequence number
3135 until which all redo log has been scanned
3136 @param[in] last_phase whether changes
3137 can be applied to the tablespaces
3138 @return whether rescan is needed (not everything was stored) */
3139 static
3140 bool
recv_group_scan_log_recs(lsn_t checkpoint_lsn,lsn_t * contiguous_lsn,bool last_phase)3141 recv_group_scan_log_recs(
3142 lsn_t checkpoint_lsn,
3143 lsn_t* contiguous_lsn,
3144 bool last_phase)
3145 {
3146 DBUG_ENTER("recv_group_scan_log_recs");
3147 DBUG_ASSERT(!last_phase || recv_sys.mlog_checkpoint_lsn > 0);
3148
3149 mutex_enter(&recv_sys.mutex);
3150 recv_sys.len = 0;
3151 recv_sys.recovered_offset = 0;
3152 recv_sys.clear();
3153 recv_sys.parse_start_lsn = *contiguous_lsn;
3154 recv_sys.scanned_lsn = *contiguous_lsn;
3155 recv_sys.recovered_lsn = *contiguous_lsn;
3156 recv_sys.scanned_checkpoint_no = 0;
3157 ut_ad(recv_max_page_lsn == 0);
3158 mutex_exit(&recv_sys.mutex);
3159
3160 lsn_t start_lsn;
3161 lsn_t end_lsn;
3162 store_t store = recv_sys.mlog_checkpoint_lsn == 0
3163 ? STORE_NO : (last_phase ? STORE_IF_EXISTS : STORE_YES);
3164
3165 log_sys.log.scanned_lsn = end_lsn = *contiguous_lsn =
3166 ut_uint64_align_down(*contiguous_lsn, OS_FILE_LOG_BLOCK_SIZE);
3167 ut_d(recv_sys.after_apply = last_phase);
3168
3169 do {
3170 if (last_phase && store == STORE_NO) {
3171 store = STORE_IF_EXISTS;
3172 recv_sys.apply(false);
3173 /* Rescan the redo logs from last stored lsn */
3174 end_lsn = recv_sys.recovered_lsn;
3175 }
3176
3177 start_lsn = ut_uint64_align_down(end_lsn,
3178 OS_FILE_LOG_BLOCK_SIZE);
3179 end_lsn = start_lsn;
3180 log_sys.log.read_log_seg(&end_lsn, start_lsn + RECV_SCAN_SIZE);
3181 } while (end_lsn != start_lsn
3182 && !recv_scan_log_recs(&store, log_sys.buf, checkpoint_lsn,
3183 start_lsn, end_lsn, contiguous_lsn,
3184 &log_sys.log.scanned_lsn));
3185
3186 if (recv_sys.found_corrupt_log || recv_sys.found_corrupt_fs) {
3187 DBUG_RETURN(false);
3188 }
3189
3190 DBUG_PRINT("ib_log", ("%s " LSN_PF " completed",
3191 last_phase ? "rescan" : "scan",
3192 log_sys.log.scanned_lsn));
3193
3194 DBUG_RETURN(store == STORE_NO);
3195 }
3196
3197 /** Report a missing tablespace for which page-redo log exists.
3198 @param[in] err previous error code
3199 @param[in] i tablespace descriptor
3200 @return new error code */
3201 static
3202 dberr_t
recv_init_missing_space(dberr_t err,const recv_spaces_t::const_iterator & i)3203 recv_init_missing_space(dberr_t err, const recv_spaces_t::const_iterator& i)
3204 {
3205 if (srv_operation == SRV_OPERATION_RESTORE
3206 || srv_operation == SRV_OPERATION_RESTORE_EXPORT) {
3207 if (i->second.name.find(TEMP_TABLE_PATH_PREFIX)
3208 != std::string::npos) {
3209 ib::warn() << "Tablespace " << i->first << " was not"
3210 " found at " << i->second.name << " when"
3211 " restoring a (partial?) backup. All redo log"
3212 " for this file will be ignored!";
3213 }
3214 return(err);
3215 }
3216
3217 if (srv_force_recovery == 0) {
3218 ib::error() << "Tablespace " << i->first << " was not"
3219 " found at " << i->second.name << ".";
3220
3221 if (err == DB_SUCCESS) {
3222 ib::error() << "Set innodb_force_recovery=1 to"
3223 " ignore this and to permanently lose"
3224 " all changes to the tablespace.";
3225 err = DB_TABLESPACE_NOT_FOUND;
3226 }
3227 } else {
3228 ib::warn() << "Tablespace " << i->first << " was not"
3229 " found at " << i->second.name << ", and"
3230 " innodb_force_recovery was set. All redo log"
3231 " for this tablespace will be ignored!";
3232 }
3233
3234 return(err);
3235 }
3236
3237 /** Report the missing tablespace and discard the redo logs for the deleted
3238 tablespace.
3239 @param[in] rescan rescan of redo logs is needed
3240 if hash table ran out of memory
3241 @param[out] missing_tablespace missing tablespace exists or not
3242 @return error code or DB_SUCCESS. */
3243 static MY_ATTRIBUTE((warn_unused_result))
3244 dberr_t
recv_validate_tablespace(bool rescan,bool & missing_tablespace)3245 recv_validate_tablespace(bool rescan, bool& missing_tablespace)
3246 {
3247 dberr_t err = DB_SUCCESS;
3248
3249 mutex_enter(&recv_sys.mutex);
3250
3251 for (recv_sys_t::map::iterator p = recv_sys.pages.begin();
3252 p != recv_sys.pages.end();) {
3253 ut_ad(!p->second.log.empty());
3254 const ulint space = p->first.space();
3255 if (is_predefined_tablespace(space)) {
3256 next:
3257 p++;
3258 continue;
3259 }
3260
3261 recv_spaces_t::iterator i = recv_spaces.find(space);
3262 ut_ad(i != recv_spaces.end());
3263
3264 switch (i->second.status) {
3265 case file_name_t::NORMAL:
3266 goto next;
3267 case file_name_t::MISSING:
3268 err = recv_init_missing_space(err, i);
3269 i->second.status = file_name_t::DELETED;
3270 /* fall through */
3271 case file_name_t::DELETED:
3272 recv_sys_t::map::iterator r = p++;
3273 r->second.log.clear();
3274 recv_sys.pages.erase(r);
3275 continue;
3276 }
3277 ut_ad(0);
3278 }
3279
3280 if (err != DB_SUCCESS) {
3281 func_exit:
3282 mutex_exit(&recv_sys.mutex);
3283 return(err);
3284 }
3285
3286 /* When rescan is not needed, recv_sys.pages will contain the
3287 entire redo log. If rescan is needed or innodb_force_recovery
3288 is set, we can ignore missing tablespaces. */
3289 for (const recv_spaces_t::value_type& rs : recv_spaces) {
3290 if (UNIV_LIKELY(rs.second.status != file_name_t::MISSING)) {
3291 continue;
3292 }
3293
3294 missing_tablespace = true;
3295
3296 if (srv_force_recovery > 0) {
3297 ib::warn() << "Tablespace " << rs.first
3298 <<" was not found at " << rs.second.name
3299 <<", and innodb_force_recovery was set."
3300 <<" All redo log for this tablespace"
3301 <<" will be ignored!";
3302 continue;
3303 }
3304
3305 if (!rescan) {
3306 ib::info() << "Tablespace " << rs.first
3307 << " was not found at '"
3308 << rs.second.name << "', but there"
3309 <<" were no modifications either.";
3310 }
3311 }
3312
3313 if (!rescan || srv_force_recovery > 0) {
3314 missing_tablespace = false;
3315 }
3316
3317 err = DB_SUCCESS;
3318 goto func_exit;
3319 }
3320
3321 /** Check if all tablespaces were found for crash recovery.
3322 @param[in] rescan rescan of redo logs is needed
3323 @param[out] missing_tablespace missing table exists
3324 @return error code or DB_SUCCESS */
3325 static MY_ATTRIBUTE((warn_unused_result))
3326 dberr_t
recv_init_crash_recovery_spaces(bool rescan,bool & missing_tablespace)3327 recv_init_crash_recovery_spaces(bool rescan, bool& missing_tablespace)
3328 {
3329 bool flag_deleted = false;
3330
3331 ut_ad(!srv_read_only_mode);
3332 ut_ad(recv_needed_recovery);
3333
3334 for (recv_spaces_t::value_type& rs : recv_spaces) {
3335 ut_ad(!is_predefined_tablespace(rs.first));
3336 ut_ad(rs.second.status != file_name_t::DELETED
3337 || !rs.second.space);
3338
3339 if (rs.second.status == file_name_t::DELETED) {
3340 /* The tablespace was deleted,
3341 so we can ignore any redo log for it. */
3342 flag_deleted = true;
3343 } else if (rs.second.space != NULL) {
3344 /* The tablespace was found, and there
3345 are some redo log records for it. */
3346 fil_names_dirty(rs.second.space);
3347
3348 /* Add the freed page ranges in the respective
3349 tablespace */
3350 if (!rs.second.freed_ranges.empty()
3351 && (srv_immediate_scrub_data_uncompressed
3352 || rs.second.space->is_compressed())) {
3353
3354 rs.second.space->add_free_ranges(
3355 std::move(rs.second.freed_ranges));
3356 }
3357 } else if (rs.second.name == "") {
3358 ib::error() << "Missing FILE_CREATE, FILE_DELETE"
3359 " or FILE_MODIFY before FILE_CHECKPOINT"
3360 " for tablespace " << rs.first;
3361 recv_sys.found_corrupt_log = true;
3362 return(DB_CORRUPTION);
3363 } else {
3364 rs.second.status = file_name_t::MISSING;
3365 flag_deleted = true;
3366 }
3367
3368 ut_ad(rs.second.status == file_name_t::DELETED
3369 || rs.second.name != "");
3370 }
3371
3372 if (flag_deleted) {
3373 return recv_validate_tablespace(rescan, missing_tablespace);
3374 }
3375
3376 return DB_SUCCESS;
3377 }
3378
3379 /** Start recovering from a redo log checkpoint.
3380 @param[in] flush_lsn FIL_PAGE_FILE_FLUSH_LSN
3381 of first system tablespace page
3382 @return error code or DB_SUCCESS */
3383 dberr_t
recv_recovery_from_checkpoint_start(lsn_t flush_lsn)3384 recv_recovery_from_checkpoint_start(lsn_t flush_lsn)
3385 {
3386 ulint max_cp_field;
3387 lsn_t checkpoint_lsn;
3388 bool rescan = false;
3389 ib_uint64_t checkpoint_no;
3390 lsn_t contiguous_lsn;
3391 byte* buf;
3392 dberr_t err = DB_SUCCESS;
3393
3394 ut_ad(srv_operation == SRV_OPERATION_NORMAL
3395 || srv_operation == SRV_OPERATION_RESTORE
3396 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
3397 ut_d(mysql_mutex_lock(&buf_pool.flush_list_mutex));
3398 ut_ad(UT_LIST_GET_LEN(buf_pool.LRU) == 0);
3399 ut_ad(UT_LIST_GET_LEN(buf_pool.unzip_LRU) == 0);
3400 ut_d(mysql_mutex_unlock(&buf_pool.flush_list_mutex));
3401
3402 if (srv_force_recovery >= SRV_FORCE_NO_LOG_REDO) {
3403
3404 ib::info() << "innodb_force_recovery=6 skips redo log apply";
3405
3406 return(DB_SUCCESS);
3407 }
3408
3409 recv_sys.recovery_on = true;
3410
3411 mysql_mutex_lock(&log_sys.mutex);
3412
3413 err = recv_find_max_checkpoint(&max_cp_field);
3414
3415 if (err != DB_SUCCESS) {
3416
3417 recv_sys.recovered_lsn = log_sys.get_lsn();
3418 mysql_mutex_unlock(&log_sys.mutex);
3419 return(err);
3420 }
3421
3422 buf = log_sys.checkpoint_buf;
3423 log_sys.log.read(max_cp_field, {buf, OS_FILE_LOG_BLOCK_SIZE});
3424
3425 checkpoint_lsn = mach_read_from_8(buf + LOG_CHECKPOINT_LSN);
3426 checkpoint_no = mach_read_from_8(buf + LOG_CHECKPOINT_NO);
3427
3428 /* Start reading the log from the checkpoint lsn. The variable
3429 contiguous_lsn contains an lsn up to which the log is known to
3430 be contiguously written. */
3431 recv_sys.mlog_checkpoint_lsn = 0;
3432
3433 ut_ad(RECV_SCAN_SIZE <= srv_log_buffer_size);
3434
3435 const lsn_t end_lsn = mach_read_from_8(
3436 buf + LOG_CHECKPOINT_END_LSN);
3437
3438 ut_ad(recv_sys.pages.empty());
3439 contiguous_lsn = checkpoint_lsn;
3440 switch (log_sys.log.format) {
3441 case 0:
3442 mysql_mutex_unlock(&log_sys.mutex);
3443 return DB_SUCCESS;
3444 default:
3445 if (end_lsn == 0) {
3446 break;
3447 }
3448 if (end_lsn >= checkpoint_lsn) {
3449 contiguous_lsn = end_lsn;
3450 break;
3451 }
3452 recv_sys.found_corrupt_log = true;
3453 mysql_mutex_unlock(&log_sys.mutex);
3454 return(DB_ERROR);
3455 }
3456
3457 size_t sizeof_checkpoint;
3458
3459 if (!log_sys.is_physical()) {
3460 sizeof_checkpoint = 9/* size of MLOG_CHECKPOINT */;
3461 goto completed;
3462 }
3463
3464 /* Look for FILE_CHECKPOINT. */
3465 recv_group_scan_log_recs(checkpoint_lsn, &contiguous_lsn, false);
3466 /* The first scan should not have stored or applied any records. */
3467 ut_ad(recv_sys.pages.empty());
3468 ut_ad(!recv_sys.found_corrupt_fs);
3469
3470 if (srv_read_only_mode && recv_needed_recovery) {
3471 mysql_mutex_unlock(&log_sys.mutex);
3472 return(DB_READ_ONLY);
3473 }
3474
3475 if (recv_sys.found_corrupt_log && !srv_force_recovery) {
3476 mysql_mutex_unlock(&log_sys.mutex);
3477 ib::warn() << "Log scan aborted at LSN " << contiguous_lsn;
3478 return(DB_ERROR);
3479 }
3480
3481 if (recv_sys.mlog_checkpoint_lsn == 0) {
3482 lsn_t scan_lsn = log_sys.log.scanned_lsn;
3483 if (!srv_read_only_mode && scan_lsn != checkpoint_lsn) {
3484 mysql_mutex_unlock(&log_sys.mutex);
3485 ib::error err;
3486 err << "Missing FILE_CHECKPOINT";
3487 if (end_lsn) {
3488 err << " at " << end_lsn;
3489 }
3490 err << " between the checkpoint " << checkpoint_lsn
3491 << " and the end " << scan_lsn << ".";
3492 return(DB_ERROR);
3493 }
3494
3495 log_sys.log.scanned_lsn = checkpoint_lsn;
3496 } else {
3497 contiguous_lsn = checkpoint_lsn;
3498 rescan = recv_group_scan_log_recs(
3499 checkpoint_lsn, &contiguous_lsn, false);
3500
3501 if ((recv_sys.found_corrupt_log && !srv_force_recovery)
3502 || recv_sys.found_corrupt_fs) {
3503 mysql_mutex_unlock(&log_sys.mutex);
3504 return(DB_ERROR);
3505 }
3506 }
3507
3508 /* NOTE: we always do a 'recovery' at startup, but only if
3509 there is something wrong we will print a message to the
3510 user about recovery: */
3511 sizeof_checkpoint= SIZE_OF_FILE_CHECKPOINT;
3512
3513 completed:
3514 if (flush_lsn == checkpoint_lsn + sizeof_checkpoint
3515 && recv_sys.mlog_checkpoint_lsn == checkpoint_lsn) {
3516 /* The redo log is logically empty. */
3517 } else if (checkpoint_lsn != flush_lsn) {
3518 ut_ad(!srv_log_file_created);
3519
3520 if (checkpoint_lsn + sizeof_checkpoint < flush_lsn) {
3521 ib::warn()
3522 << "Are you sure you are using the right "
3523 << LOG_FILE_NAME
3524 << " to start up the database? Log sequence "
3525 "number in the "
3526 << LOG_FILE_NAME << " is " << checkpoint_lsn
3527 << ", less than the log sequence number in "
3528 "the first system tablespace file header, "
3529 << flush_lsn << ".";
3530 }
3531
3532 if (!recv_needed_recovery) {
3533
3534 ib::info()
3535 << "The log sequence number " << flush_lsn
3536 << " in the system tablespace does not match"
3537 " the log sequence number "
3538 << checkpoint_lsn << " in the "
3539 << LOG_FILE_NAME << "!";
3540
3541 if (srv_read_only_mode) {
3542 ib::error() << "innodb_read_only"
3543 " prevents crash recovery";
3544 mysql_mutex_unlock(&log_sys.mutex);
3545 return(DB_READ_ONLY);
3546 }
3547
3548 recv_needed_recovery = true;
3549 }
3550 }
3551
3552 log_sys.set_lsn(recv_sys.recovered_lsn);
3553 if (UNIV_LIKELY(log_sys.get_flushed_lsn() < recv_sys.recovered_lsn)) {
3554 /* This may already have been set by create_log_file()
3555 if no logs existed when the server started up. */
3556 log_sys.set_flushed_lsn(recv_sys.recovered_lsn);
3557 }
3558
3559 if (recv_needed_recovery) {
3560 bool missing_tablespace = false;
3561
3562 err = recv_init_crash_recovery_spaces(
3563 rescan, missing_tablespace);
3564
3565 if (err != DB_SUCCESS) {
3566 mysql_mutex_unlock(&log_sys.mutex);
3567 return(err);
3568 }
3569
3570 /* If there is any missing tablespace and rescan is needed
3571 then there is a possiblity that hash table will not contain
3572 all space ids redo logs. Rescan the remaining unstored
3573 redo logs for the validation of missing tablespace. */
3574 ut_ad(rescan || !missing_tablespace);
3575
3576 while (missing_tablespace) {
3577 DBUG_PRINT("ib_log", ("Rescan of redo log to validate "
3578 "the missing tablespace. Scan "
3579 "from last stored LSN " LSN_PF,
3580 recv_sys.last_stored_lsn));
3581
3582 lsn_t recent_stored_lsn = recv_sys.last_stored_lsn;
3583 rescan = recv_group_scan_log_recs(
3584 checkpoint_lsn, &recent_stored_lsn, false);
3585
3586 ut_ad(!recv_sys.found_corrupt_fs);
3587
3588 missing_tablespace = false;
3589
3590 err = recv_sys.found_corrupt_log
3591 ? DB_ERROR
3592 : recv_validate_tablespace(
3593 rescan, missing_tablespace);
3594
3595 if (err != DB_SUCCESS) {
3596 mysql_mutex_unlock(&log_sys.mutex);
3597 return err;
3598 }
3599
3600 rescan = true;
3601 }
3602
3603 recv_sys.parse_start_lsn = checkpoint_lsn;
3604
3605 if (srv_operation == SRV_OPERATION_NORMAL) {
3606 buf_dblwr.recover();
3607 }
3608
3609 ut_ad(srv_force_recovery <= SRV_FORCE_NO_UNDO_LOG_SCAN);
3610
3611 if (rescan) {
3612 contiguous_lsn = checkpoint_lsn;
3613
3614 recv_group_scan_log_recs(
3615 checkpoint_lsn, &contiguous_lsn, true);
3616
3617 if ((recv_sys.found_corrupt_log
3618 && !srv_force_recovery)
3619 || recv_sys.found_corrupt_fs) {
3620 mysql_mutex_unlock(&log_sys.mutex);
3621 return(DB_ERROR);
3622 }
3623 }
3624 } else {
3625 ut_ad(!rescan || recv_sys.pages.empty());
3626 }
3627
3628 if (log_sys.is_physical()
3629 && (log_sys.log.scanned_lsn < checkpoint_lsn
3630 || log_sys.log.scanned_lsn < recv_max_page_lsn)) {
3631
3632 ib::error() << "We scanned the log up to "
3633 << log_sys.log.scanned_lsn
3634 << ". A checkpoint was at " << checkpoint_lsn << " and"
3635 " the maximum LSN on a database page was "
3636 << recv_max_page_lsn << ". It is possible that the"
3637 " database is now corrupt!";
3638 }
3639
3640 if (recv_sys.recovered_lsn < checkpoint_lsn) {
3641 mysql_mutex_unlock(&log_sys.mutex);
3642
3643 ib::error() << "Recovered only to lsn:"
3644 << recv_sys.recovered_lsn
3645 << " checkpoint_lsn: " << checkpoint_lsn;
3646
3647 return(DB_ERROR);
3648 }
3649
3650 log_sys.next_checkpoint_lsn = checkpoint_lsn;
3651 log_sys.next_checkpoint_no = checkpoint_no + 1;
3652
3653 recv_synchronize_groups();
3654
3655 ut_ad(recv_needed_recovery
3656 || checkpoint_lsn == recv_sys.recovered_lsn);
3657
3658 log_sys.write_lsn = log_sys.get_lsn();
3659 log_sys.buf_free = log_sys.write_lsn % OS_FILE_LOG_BLOCK_SIZE;
3660 log_sys.buf_next_to_write = log_sys.buf_free;
3661
3662 log_sys.last_checkpoint_lsn = checkpoint_lsn;
3663
3664 if (!srv_read_only_mode && srv_operation == SRV_OPERATION_NORMAL
3665 && (~log_t::FORMAT_ENCRYPTED & log_sys.log.format)
3666 == log_t::FORMAT_10_5) {
3667 /* Write a FILE_CHECKPOINT marker as the first thing,
3668 before generating any other redo log. This ensures
3669 that subsequent crash recovery will be possible even
3670 if the server were killed soon after this. */
3671 fil_names_clear(log_sys.last_checkpoint_lsn, true);
3672 }
3673
3674 log_sys.next_checkpoint_no = ++checkpoint_no;
3675
3676 mutex_enter(&recv_sys.mutex);
3677
3678 recv_sys.apply_log_recs = true;
3679 recv_no_ibuf_operations = false;
3680 ut_d(recv_no_log_write = srv_operation == SRV_OPERATION_RESTORE
3681 || srv_operation == SRV_OPERATION_RESTORE_EXPORT);
3682
3683 mutex_exit(&recv_sys.mutex);
3684
3685 mysql_mutex_unlock(&log_sys.mutex);
3686
3687 recv_lsn_checks_on = true;
3688
3689 /* The database is now ready to start almost normal processing of user
3690 transactions: transaction rollbacks and the application of the log
3691 records in the hash table can be run in background. */
3692
3693 return(DB_SUCCESS);
3694 }
3695
validate_page(const page_id_t page_id,const byte * page,const fil_space_t * space,byte * tmp_buf)3696 bool recv_dblwr_t::validate_page(const page_id_t page_id,
3697 const byte *page,
3698 const fil_space_t *space,
3699 byte *tmp_buf)
3700 {
3701 if (page_id.page_no() == 0)
3702 {
3703 ulint flags= fsp_header_get_flags(page);
3704 if (!fil_space_t::is_valid_flags(flags, page_id.space()))
3705 {
3706 ulint cflags= fsp_flags_convert_from_101(flags);
3707 if (cflags == ULINT_UNDEFINED)
3708 {
3709 ib::warn() << "Ignoring a doublewrite copy of page " << page_id
3710 << "due to invalid flags " << ib::hex(flags);
3711 return false;
3712 }
3713
3714 flags= cflags;
3715 }
3716
3717 /* Page 0 is never page_compressed or encrypted. */
3718 return !buf_page_is_corrupted(true, page, flags);
3719 }
3720
3721 ut_ad(tmp_buf);
3722 byte *tmp_frame= tmp_buf;
3723 byte *tmp_page= tmp_buf + srv_page_size;
3724 const uint16_t page_type= mach_read_from_2(page + FIL_PAGE_TYPE);
3725 const bool expect_encrypted= space->crypt_data &&
3726 space->crypt_data->type != CRYPT_SCHEME_UNENCRYPTED;
3727
3728 if (space->full_crc32())
3729 return !buf_page_is_corrupted(true, page, space->flags);
3730
3731 if (expect_encrypted &&
3732 mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))
3733 {
3734 if (!fil_space_verify_crypt_checksum(page, space->zip_size()))
3735 return false;
3736 if (page_type != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)
3737 return true;
3738 if (space->zip_size())
3739 return false;
3740 memcpy(tmp_page, page, space->physical_size());
3741 if (!fil_space_decrypt(space, tmp_frame, tmp_page))
3742 return false;
3743 }
3744
3745 switch (page_type) {
3746 case FIL_PAGE_PAGE_COMPRESSED:
3747 memcpy(tmp_page, page, space->physical_size());
3748 /* fall through */
3749 case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
3750 if (space->zip_size())
3751 return false; /* ROW_FORMAT=COMPRESSED cannot be page_compressed */
3752 ulint decomp= fil_page_decompress(tmp_frame, tmp_page, space->flags);
3753 if (!decomp)
3754 return false; /* decompression failed */
3755 if (decomp == srv_page_size)
3756 return false; /* the page was not compressed (invalid page type) */
3757 return !buf_page_is_corrupted(true, tmp_page, space->flags);
3758 }
3759
3760 return !buf_page_is_corrupted(true, page, space->flags);
3761 }
3762
find_page(const page_id_t page_id,const fil_space_t * space,byte * tmp_buf)3763 byte *recv_dblwr_t::find_page(const page_id_t page_id,
3764 const fil_space_t *space, byte *tmp_buf)
3765 {
3766 byte *result= NULL;
3767 lsn_t max_lsn= 0;
3768
3769 for (byte *page : pages)
3770 {
3771 if (page_get_page_no(page) != page_id.page_no() ||
3772 page_get_space_id(page) != page_id.space())
3773 continue;
3774 const lsn_t lsn= mach_read_from_8(page + FIL_PAGE_LSN);
3775 if (lsn <= max_lsn ||
3776 !validate_page(page_id, page, space, tmp_buf))
3777 {
3778 /* Mark processed for subsequent iterations in buf_dblwr_t::recover() */
3779 memset(page + FIL_PAGE_LSN, 0, 8);
3780 continue;
3781 }
3782 max_lsn= lsn;
3783 result= page;
3784 }
3785
3786 return result;
3787 }
3788