1 /*****************************************************************************
2
3 Copyright (c) 1995, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2016, Percona Inc. All Rights Reserved.
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation. The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License, version 2.0, for more details.
21
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25
26 *****************************************************************************/
27
28 /**************************************************//**
29 @file buf/buf0dblwr.cc
30 Doublwrite buffer module
31
32 Created 2011/12/19
33 *******************************************************/
34
35 #include "ha_prototypes.h"
36 #include "buf0dblwr.h"
37
38 #ifdef UNIV_NONINL
39 #include "buf0buf.ic"
40 #include "buf0dblrw.ic"
41 #endif
42
43 #include "buf0buf.h"
44 #include "buf0checksum.h"
45 #include "srv0start.h"
46 #include "srv0srv.h"
47 #include "page0zip.h"
48 #include "trx0sys.h"
49 #include "os0file.h"
50
51 #ifndef UNIV_HOTBACKUP
52
53 /** The doublewrite buffer */
54 buf_dblwr_t* buf_dblwr = NULL;
55
56 /** Set to TRUE when the doublewrite buffer is being created */
57 ibool buf_dblwr_being_created = FALSE;
58
59 /****************************************************************//**
60 Determines if a page number is located inside the doublewrite buffer.
61 @return TRUE if the location is inside the two blocks of the
62 doublewrite buffer */
63 ibool
buf_dblwr_page_inside(ulint page_no)64 buf_dblwr_page_inside(
65 /*==================*/
66 ulint page_no) /*!< in: page number */
67 {
68 if (buf_dblwr == NULL) {
69
70 return(FALSE);
71 }
72
73 if (page_no >= buf_dblwr->block1
74 && page_no < buf_dblwr->block1
75 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
76 return(TRUE);
77 }
78
79 if (page_no >= buf_dblwr->block2
80 && page_no < buf_dblwr->block2
81 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
82 return(TRUE);
83 }
84
85 return(FALSE);
86 }
87
88 /****************************************************************//**
89 Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
90 doublewrite buffer within it.
91 @return pointer to the doublewrite buffer within the filespace header
92 page. */
93 UNIV_INLINE
94 byte*
buf_dblwr_get(mtr_t * mtr)95 buf_dblwr_get(
96 /*==========*/
97 mtr_t* mtr) /*!< in/out: MTR to hold the page latch */
98 {
99 buf_block_t* block;
100
101 block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
102 univ_page_size, RW_X_LATCH, mtr);
103
104 buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
105
106 return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
107 }
108
109 /********************************************************************//**
110 Flush a batch of writes to the datafiles that have already been
111 written to the dblwr buffer on disk. */
112 void
buf_dblwr_sync_datafiles()113 buf_dblwr_sync_datafiles()
114 /*======================*/
115 {
116 /* Wake possible simulated aio thread to actually post the
117 writes to the operating system */
118 os_aio_simulated_wake_handler_threads();
119
120 /* Wait that all async writes to tablespaces have been posted to
121 the OS */
122 os_aio_wait_until_no_pending_writes();
123
124 /* Now we flush the data to disk (for example, with fsync) */
125 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
126 }
127
128 /****************************************************************//**
129 Creates or initialializes the doublewrite buffer at a database start. */
130 static
131 void
buf_dblwr_init(byte * doublewrite)132 buf_dblwr_init(
133 /*===========*/
134 byte* doublewrite) /*!< in: pointer to the doublewrite buf
135 header on trx sys page */
136 {
137 ulint buf_size;
138
139 buf_dblwr = static_cast<buf_dblwr_t*>(
140 ut_zalloc_nokey(sizeof(buf_dblwr_t)));
141
142 /* There are two blocks of same size in the doublewrite
143 buffer. */
144 buf_size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
145
146 /* There must be atleast one buffer for single page writes
147 and one buffer for batch writes. */
148 ut_a(srv_doublewrite_batch_size > 0
149 && srv_doublewrite_batch_size < buf_size);
150
151 mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
152
153 buf_dblwr->s_event = os_event_create("dblwr_single_event");
154 buf_dblwr->s_reserved = 0;
155
156 buf_dblwr->block1 = mach_read_from_4(
157 doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
158 buf_dblwr->block2 = mach_read_from_4(
159 doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
160
161 buf_dblwr->in_use = static_cast<bool*>(
162 ut_zalloc_nokey(buf_size * sizeof(bool)));
163
164 buf_dblwr->write_buf_unaligned = static_cast<byte*>(
165 ut_malloc_nokey((1 + buf_size) * UNIV_PAGE_SIZE));
166
167 buf_dblwr->write_buf = static_cast<byte*>(
168 ut_align(buf_dblwr->write_buf_unaligned,
169 UNIV_PAGE_SIZE));
170
171 buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
172 ut_zalloc_nokey(buf_size * sizeof(void*)));
173 }
174
175 /****************************************************************//**
176 Creates the doublewrite buffer to a new InnoDB installation. The header of the
177 doublewrite buffer is placed on the trx system header page.
178 @return true if successful, false if not. */
179 MY_ATTRIBUTE((warn_unused_result))
180 bool
buf_dblwr_create(void)181 buf_dblwr_create(void)
182 /*==================*/
183 {
184 buf_block_t* block2;
185 buf_block_t* new_block;
186 byte* doublewrite;
187 byte* fseg_header;
188 ulint page_no;
189 ulint prev_page_no;
190 ulint i;
191 mtr_t mtr;
192
193 if (buf_dblwr) {
194 /* Already inited */
195
196 return(true);
197 }
198
199 start_again:
200 mtr_start(&mtr);
201 buf_dblwr_being_created = TRUE;
202
203 doublewrite = buf_dblwr_get(&mtr);
204
205 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
206 == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
207 /* The doublewrite buffer has already been created:
208 just read in some numbers */
209
210 buf_dblwr_init(doublewrite);
211
212 mtr_commit(&mtr);
213 buf_dblwr_being_created = FALSE;
214 return(true);
215 }
216
217 ib::info() << "Doublewrite buffer not found: creating new";
218
219 ulint min_doublewrite_size =
220 ( ( 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
221 + FSP_EXTENT_SIZE / 2
222 + 100)
223 * UNIV_PAGE_SIZE);
224 if (buf_pool_get_curr_size() < min_doublewrite_size) {
225 ib::error() << "Cannot create doublewrite buffer: you must"
226 " increase your buffer pool size. Cannot continue"
227 " operation.";
228
229 return(false);
230 }
231
232 block2 = fseg_create(TRX_SYS_SPACE, TRX_SYS_PAGE_NO,
233 TRX_SYS_DOUBLEWRITE
234 + TRX_SYS_DOUBLEWRITE_FSEG, &mtr);
235
236 /* fseg_create acquires a second latch on the page,
237 therefore we must declare it: */
238
239 buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
240
241 if (block2 == NULL) {
242 ib::error() << "Cannot create doublewrite buffer: you must"
243 " increase your tablespace size."
244 " Cannot continue operation.";
245
246 /* We exit without committing the mtr to prevent
247 its modifications to the database getting to disk */
248
249 return(false);
250 }
251
252 fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
253 prev_page_no = 0;
254
255 for (i = 0; i < 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
256 + FSP_EXTENT_SIZE / 2; i++) {
257 new_block = fseg_alloc_free_page(
258 fseg_header, prev_page_no + 1, FSP_UP, &mtr);
259 if (new_block == NULL) {
260 ib::error() << "Cannot create doublewrite buffer: "
261 " you must increase your tablespace size."
262 " Cannot continue operation.";
263
264 return(false);
265 }
266
267 /* We read the allocated pages to the buffer pool;
268 when they are written to disk in a flush, the space
269 id and page number fields are also written to the
270 pages. When we at database startup read pages
271 from the doublewrite buffer, we know that if the
272 space id and page number in them are the same as
273 the page position in the tablespace, then the page
274 has not been written to in doublewrite. */
275
276 ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
277 page_no = new_block->page.id.page_no();
278
279 if (i == FSP_EXTENT_SIZE / 2) {
280 ut_a(page_no == FSP_EXTENT_SIZE);
281 mlog_write_ulint(doublewrite
282 + TRX_SYS_DOUBLEWRITE_BLOCK1,
283 page_no, MLOG_4BYTES, &mtr);
284 mlog_write_ulint(doublewrite
285 + TRX_SYS_DOUBLEWRITE_REPEAT
286 + TRX_SYS_DOUBLEWRITE_BLOCK1,
287 page_no, MLOG_4BYTES, &mtr);
288
289 } else if (i == FSP_EXTENT_SIZE / 2
290 + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
291 ut_a(page_no == 2 * FSP_EXTENT_SIZE);
292 mlog_write_ulint(doublewrite
293 + TRX_SYS_DOUBLEWRITE_BLOCK2,
294 page_no, MLOG_4BYTES, &mtr);
295 mlog_write_ulint(doublewrite
296 + TRX_SYS_DOUBLEWRITE_REPEAT
297 + TRX_SYS_DOUBLEWRITE_BLOCK2,
298 page_no, MLOG_4BYTES, &mtr);
299
300 } else if (i > FSP_EXTENT_SIZE / 2) {
301 ut_a(page_no == prev_page_no + 1);
302 }
303
304 if (((i + 1) & 15) == 0) {
305 /* rw_locks can only be recursively x-locked
306 2048 times. (on 32 bit platforms,
307 (lint) 0 - (X_LOCK_DECR * 2049)
308 is no longer a negative number, and thus
309 lock_word becomes like a shared lock).
310 For 4k page size this loop will
311 lock the fseg header too many times. Since
312 this code is not done while any other threads
313 are active, restart the MTR occasionally. */
314 mtr_commit(&mtr);
315 mtr_start(&mtr);
316 doublewrite = buf_dblwr_get(&mtr);
317 fseg_header = doublewrite
318 + TRX_SYS_DOUBLEWRITE_FSEG;
319 }
320
321 prev_page_no = page_no;
322 }
323
324 mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
325 TRX_SYS_DOUBLEWRITE_MAGIC_N,
326 MLOG_4BYTES, &mtr);
327 mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
328 + TRX_SYS_DOUBLEWRITE_REPEAT,
329 TRX_SYS_DOUBLEWRITE_MAGIC_N,
330 MLOG_4BYTES, &mtr);
331
332 mlog_write_ulint(doublewrite
333 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
334 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
335 MLOG_4BYTES, &mtr);
336 mtr_commit(&mtr);
337
338 /* Flush the modified pages to disk and make a checkpoint */
339 log_make_checkpoint_at(LSN_MAX, TRUE);
340
341 /* Remove doublewrite pages from LRU */
342 buf_pool_invalidate();
343
344 ib::info() << "Doublewrite buffer created";
345
346 goto start_again;
347 }
348
349 /** Compute the path to the parallel doublewrite buffer, if not already done */
350 MY_ATTRIBUTE((warn_unused_result))
351 static
352 dberr_t
buf_parallel_dblwr_make_path(void)353 buf_parallel_dblwr_make_path(void)
354 {
355 if (parallel_dblwr_buf.path)
356 return(DB_SUCCESS);
357
358 char path[FN_REFLEN + 1 /* OS_PATH_SEPARATOR */];
359 const char *dir = NULL;
360
361 ut_ad(srv_parallel_doublewrite_path);
362
363 if (is_absolute_path(srv_parallel_doublewrite_path)) {
364
365 my_strncpy_trunc(path, srv_parallel_doublewrite_path, sizeof(path));
366 } else {
367
368 /* A relative path to the parallel doublewrite file is based
369 either on srv_data_home, either mysql data directory if the
370 former is empty. */
371 dir = srv_data_home[0] ? srv_data_home
372 : fil_path_to_mysql_datadir;
373 if (dir[strlen(dir) - 1] == OS_PATH_SEPARATOR) {
374
375 ut_snprintf(path, sizeof(path), "%s%s",
376 dir,
377 srv_parallel_doublewrite_path);
378 } else {
379
380 ut_snprintf(path, sizeof(path), "%s%c%s",
381 dir,
382 OS_PATH_SEPARATOR,
383 srv_parallel_doublewrite_path);
384 }
385 }
386
387 os_file_type_t type;
388 bool exists = false;
389 bool ret;
390
391 ret = os_file_status(path, &exists, &type);
392
393 /* For realpath() to succeed the file must exist. */
394
395 if (ret && exists) {
396 if (my_realpath(path, path, MY_WME) != 0) {
397
398 return(DB_ERROR);
399 }
400 if (type != OS_FILE_TYPE_FILE) {
401 ib::error() << "Parallel doublewrite path "
402 << path << " must point to a regular "
403 "file";
404 return(DB_WRONG_FILE_NAME);
405 }
406 } else if (!is_absolute_path(srv_parallel_doublewrite_path)) {
407 /* If it does not exist, and is not an absolute path, then
408 resolve only the directory part and append
409 srv_parallel_doublewrite_path to it. */
410 char dir_full[FN_REFLEN];
411
412 if (my_realpath(dir_full, dir, MY_WME) != 0) {
413
414 return(DB_ERROR);
415 }
416
417 if (dir_full[strlen(dir_full) - 1] == OS_PATH_SEPARATOR) {
418
419 ut_snprintf(path, sizeof(path), "%s%s",
420 dir_full,
421 srv_parallel_doublewrite_path);
422 } else {
423
424 ut_snprintf(path, sizeof(path), "%s%c%s",
425 dir_full,
426 OS_PATH_SEPARATOR,
427 srv_parallel_doublewrite_path);
428 }
429 }
430
431 parallel_dblwr_buf.path = mem_strdup(path);
432
433 return(parallel_dblwr_buf.path ? DB_SUCCESS : DB_OUT_OF_MEMORY);
434 }
435
436 /** Close the parallel doublewrite buffer file */
437 static
438 void
buf_parallel_dblwr_close(void)439 buf_parallel_dblwr_close(void)
440 {
441 if (!parallel_dblwr_buf.file.is_closed()) {
442 os_file_close(parallel_dblwr_buf.file);
443 parallel_dblwr_buf.file.set_closed();
444 }
445 }
446
447 /** Maximum possible parallel doublewrite buffer file size in bytes */
448 #define MAX_DOUBLEWRITE_FILE_SIZE \
449 ((MAX_DOUBLEWRITE_BATCH_SIZE) * (MAX_DBLWR_SHARDS) * (UNIV_PAGE_SIZE))
450
451 /**
452 At database startup initializes the doublewrite buffer memory structure if
453 we already have a doublewrite buffer created in the data files. If we are
454 upgrading to an InnoDB version which supports multiple tablespaces, then this
455 function performs the necessary update operations. If we are in a crash
456 recovery, this function loads the pages from double write buffer into memory.
457 @param[in] file File handle
458 @param[in] path Path name of file
459 @return DB_SUCCESS or error code */
460 dberr_t
buf_dblwr_init_or_load_pages(pfs_os_file_t file,const char * path)461 buf_dblwr_init_or_load_pages(
462 pfs_os_file_t file,
463 const char* path)
464 {
465 byte* buf;
466 byte* page;
467 ulint block1;
468 ulint block2;
469 ulint space_id;
470 byte* read_buf;
471 byte* doublewrite;
472 byte* unaligned_read_buf;
473 ibool reset_space_ids = FALSE;
474 recv_dblwr_t& recv_dblwr = recv_sys->dblwr;
475
476 if (srv_read_only_mode) {
477
478 ib::info() << "Skipping doublewrite buffer processing due to "
479 "InnoDB running in read only mode";
480 return(DB_SUCCESS);
481 }
482
483 /* We do the file i/o past the buffer pool */
484
485 unaligned_read_buf = static_cast<byte*>(
486 ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
487
488 read_buf = static_cast<byte*>(
489 ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
490
491 /* Read the trx sys header to check if we are using the doublewrite
492 buffer */
493 dberr_t err;
494
495 IORequest read_request(IORequest::READ);
496
497 read_request.disable_compression();
498
499 err = os_file_read(
500 read_request,
501 file, read_buf, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE,
502 UNIV_PAGE_SIZE);
503
504 if (err != DB_SUCCESS) {
505
506 ib::error()
507 << "Failed to read the system tablespace header page";
508
509 ut_free(unaligned_read_buf);
510
511 return(err);
512 }
513
514 doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
515
516 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
517 == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
518 /* The doublewrite buffer has been created */
519
520 buf_dblwr_init(doublewrite);
521
522 block1 = buf_dblwr->block1;
523 block2 = buf_dblwr->block2;
524
525 buf = buf_dblwr->write_buf;
526 } else {
527 ut_free(unaligned_read_buf);
528 return(DB_SUCCESS);
529 }
530
531 if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
532 != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
533
534 /* We are upgrading from a version < 4.1.x to a version where
535 multiple tablespaces are supported. We must reset the space id
536 field in the pages in the doublewrite buffer because starting
537 from this version the space id is stored to
538 FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
539
540 reset_space_ids = TRUE;
541
542 ib::info() << "Resetting space id's in the doublewrite buffer";
543 }
544
545 /* Read the pages from the doublewrite buffer to memory */
546 err = os_file_read(
547 read_request,
548 file, buf, block1 * UNIV_PAGE_SIZE,
549 TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
550
551 if (err != DB_SUCCESS) {
552
553 ib::error()
554 << "Failed to read the first double write buffer "
555 "extent";
556
557 ut_free(unaligned_read_buf);
558
559 return(err);
560 }
561
562 err = os_file_read(
563 read_request,
564 file,
565 buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
566 block2 * UNIV_PAGE_SIZE,
567 TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
568
569 if (err != DB_SUCCESS) {
570
571 ib::error()
572 << "Failed to read the second double write buffer "
573 "extent";
574
575 ut_free(unaligned_read_buf);
576
577 return(err);
578 }
579
580 /* Check if any of these pages is half-written in data files, in the
581 intended position */
582
583 page = buf;
584
585 for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
586 if (reset_space_ids) {
587 ulint source_page_no;
588
589 space_id = 0;
590 mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
591 space_id);
592 /* We do not need to calculate new checksums for the
593 pages because the field .._SPACE_ID does not affect
594 them. Write the page back to where we read it from. */
595
596 if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
597 source_page_no = block1 + i;
598 } else {
599 source_page_no = block2
600 + i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
601 }
602
603 IORequest write_request(IORequest::WRITE);
604
605 /* Recovered data file pages are written out
606 as uncompressed. */
607
608 write_request.disable_compression();
609
610 err = os_file_write(
611 write_request, path, file, page,
612 source_page_no * UNIV_PAGE_SIZE,
613 UNIV_PAGE_SIZE);
614
615 if (err != DB_SUCCESS) {
616
617 ib::error()
618 << "Failed to write to the double write"
619 " buffer";
620
621 ut_free(unaligned_read_buf);
622
623 return(err);
624 }
625
626 } else {
627 recv_dblwr.add_to_sys(page);
628 }
629
630 page += univ_page_size.physical();
631 }
632
633 err = buf_parallel_dblwr_make_path();
634 if (err != DB_SUCCESS) {
635
636 ut_free(unaligned_read_buf);
637 return(err);
638 }
639
640 ut_ad(parallel_dblwr_buf.file.is_closed());
641 bool success;
642 parallel_dblwr_buf.file
643 = os_file_create_simple_no_error_handling(
644 innodb_parallel_dblwrite_file_key,
645 parallel_dblwr_buf.path,
646 OS_FILE_OPEN, OS_FILE_READ_ONLY, true, &success);
647 if (!success) {
648 /* We are not supposed to check errno != ENOENT directly, but
649 os_file_get_last_error will spam error log if it's handled
650 there. */
651 if (errno != ENOENT) {
652 os_file_get_last_error(true);
653 ib::error()
654 << "Failed to open the parallel doublewrite "
655 "buffer at " << parallel_dblwr_buf.path;
656 ut_free(unaligned_read_buf);
657 return(DB_CANNOT_OPEN_FILE);
658 }
659 /* Failed to open because the file did not exist: OK */
660 ib::info() << "Crash recovery did not find the parallel "
661 "doublewrite buffer at "
662 << parallel_dblwr_buf.path;
663 } else {
664 /* Cannot possibly be upgrading from 4.1 */
665 ut_ad(!reset_space_ids);
666
667 os_file_set_nocache(parallel_dblwr_buf.file,
668 parallel_dblwr_buf.path,
669 "open", false);
670
671 os_offset_t size = os_file_get_size(parallel_dblwr_buf.file);
672
673 if (size > MAX_DOUBLEWRITE_FILE_SIZE) {
674 ib::error() << "Parallel doublewrite buffer size "
675 << size
676 << " bytes is larger than the maximum "
677 "size " << MAX_DOUBLEWRITE_FILE_SIZE
678 << " bytes supported by this server "
679 "version";
680 buf_parallel_dblwr_close();
681 ut_free(unaligned_read_buf);
682 return(DB_CORRUPTION);
683 }
684
685 if (size % UNIV_PAGE_SIZE) {
686 ib::error() << "Parallel doublewrite buffer size "
687 << size << " bytes is not a multiple of "
688 "a page size "
689 << UNIV_PAGE_SIZE << " bytes";
690 buf_parallel_dblwr_close();
691 ut_free(unaligned_read_buf);
692 return(DB_CORRUPTION);
693 }
694
695 if (size == 0) {
696 ib::info()
697 << "Parallel doublewrite buffer is zero-sized";
698 buf_parallel_dblwr_close();
699 ut_free(unaligned_read_buf);
700 return(DB_SUCCESS);
701 }
702
703 ib::info() << "Recovering partial pages from the parallel "
704 "doublewrite buffer at " << parallel_dblwr_buf.path;
705
706 parallel_dblwr_buf.recovery_buf_unaligned
707 = static_cast<byte *>(
708 ut_malloc(size + UNIV_PAGE_SIZE,
709 mem_key_parallel_doublewrite));
710 if (!parallel_dblwr_buf.recovery_buf_unaligned) {
711 buf_parallel_dblwr_close();
712 ut_free(unaligned_read_buf);
713 return(DB_OUT_OF_MEMORY);
714 }
715 byte* recovery_buf = static_cast<byte *>
716 (ut_align(parallel_dblwr_buf.recovery_buf_unaligned,
717 UNIV_PAGE_SIZE));
718
719 err = os_file_read(read_request, parallel_dblwr_buf.file,
720 recovery_buf, 0, size);
721 if (err != DB_SUCCESS) {
722 ib::error() << "Failed to read the parallel "
723 "doublewrite buffer";
724 buf_parallel_dblwr_close();
725 ut_free(unaligned_read_buf);
726 ut_free(parallel_dblwr_buf.recovery_buf_unaligned);
727 return(DB_ERROR);
728 }
729
730 byte zero_page[UNIV_PAGE_SIZE_MAX] = {0};
731 for (page = recovery_buf; page < recovery_buf + size;
732 page += UNIV_PAGE_SIZE) {
733
734 /* Skip all zero pages */
735 const ulint checksum = mach_read_from_4(
736 page + FIL_PAGE_SPACE_OR_CHKSUM);
737
738 if (checksum != 0
739 || memcmp(page, zero_page, UNIV_PAGE_SIZE) != 0) {
740 recv_dblwr.add(page);
741 }
742 }
743 buf_parallel_dblwr_close();
744 }
745
746 if (reset_space_ids) {
747 os_file_flush(file);
748 }
749
750 ut_free(unaligned_read_buf);
751
752 return(DB_SUCCESS);
753 }
754
755 /** Delete the parallel doublewrite file, if its path already has been
756 computed. It is up to the caller to ensure that this called at safe point */
757 void
buf_parallel_dblwr_delete(void)758 buf_parallel_dblwr_delete(void)
759 {
760 if (parallel_dblwr_buf.path) {
761
762 os_file_delete_if_exists(innodb_parallel_dblwrite_file_key,
763 parallel_dblwr_buf.path, NULL);
764 }
765 }
766
767 /** Release any unused parallel doublewrite pages and free their underlying
768 buffer at the end of crash recovery */
769 void
buf_parallel_dblwr_finish_recovery(void)770 buf_parallel_dblwr_finish_recovery(void)
771 {
772 recv_sys->dblwr.pages.clear();
773 ut_free(parallel_dblwr_buf.recovery_buf_unaligned);
774 parallel_dblwr_buf.recovery_buf_unaligned = NULL;
775 }
776
777 /** Process and remove the double write buffer pages for all tablespaces. */
778 void
buf_dblwr_process(void)779 buf_dblwr_process(void)
780 {
781 ulint page_no_dblwr = 0;
782 byte* read_buf;
783 byte* unaligned_read_buf;
784 recv_dblwr_t& recv_dblwr = recv_sys->dblwr;
785
786 ut_ad(!srv_read_only_mode);
787
788 unaligned_read_buf = static_cast<byte*>(
789 ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
790
791 read_buf = static_cast<byte*>(
792 ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
793
794 for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
795 i != recv_dblwr.pages.end();
796 ++i, ++page_no_dblwr) {
797
798 byte* page = *i;
799 ulint page_no = page_get_page_no(page);
800 ulint space_id = page_get_space_id(page);
801
802 fil_space_t* space = fil_space_get(space_id);
803
804 if (space == NULL) {
805 /* Maybe we have dropped the tablespace
806 and this page once belonged to it: do nothing */
807 continue;
808 }
809
810 fil_space_open_if_needed(space);
811
812 if (page_no >= space->size) {
813
814 /* Do not report the warning if the tablespace is
815 schedule for truncate or was truncated and we have live
816 MLOG_TRUNCATE record in redo. */
817 bool skip_warning =
818 srv_is_tablespace_truncated(space_id)
819 || srv_was_tablespace_truncated(space);
820
821 if (!skip_warning) {
822 ib::warn() << "Page " << page_no_dblwr
823 << " in the doublewrite buffer is"
824 " not within space bounds: page "
825 << page_id_t(space_id, page_no);
826 }
827 } else {
828 const page_size_t page_size(space->flags);
829 const page_id_t page_id(space_id, page_no);
830
831 /* We want to ensure that for partial reads the
832 unread portion of the page is NUL. */
833 memset(read_buf, 0x0, page_size.physical());
834
835 IORequest request;
836
837 request.dblwr_recover();
838
839 /* Read in the actual page from the file */
840 dberr_t err = fil_io(
841 request, true,
842 page_id, page_size,
843 0, page_size.physical(), read_buf, NULL);
844
845 if (err != DB_SUCCESS) {
846
847 ib::warn()
848 << "Double write buffer recovery: "
849 << page_id << " read failed with "
850 << "error: " << ut_strerr(err);
851 }
852
853 /* Check if the page is corrupt */
854 if (buf_page_is_corrupted(
855 true, read_buf, page_size,
856 fsp_is_checksum_disabled(space_id))) {
857
858 ib::info() << "Database page corruption or"
859 << " a failed file read of page "
860 << page_id
861 << ". Trying to recover it from the"
862 << " doublewrite buffer.";
863
864 dberr_t err = DB_SUCCESS;
865
866 if (space->crypt_data == NULL) // if it was crypt_data encrypted it was already decrypted
867 err = os_dblwr_decrypt_page(
868 space, page);
869
870 if (err != DB_SUCCESS || buf_page_is_corrupted(
871 true, page, page_size,
872 fsp_is_checksum_disabled(space_id))) {
873
874 ib::error() << "Dump of the page:";
875 buf_page_print(
876 read_buf, page_size,
877 BUF_PAGE_PRINT_NO_CRASH);
878 ib::error() << "Dump of corresponding"
879 " page in doublewrite buffer:";
880
881 buf_page_print(
882 page, page_size,
883 BUF_PAGE_PRINT_NO_CRASH);
884
885 ib::fatal() << "The page in the"
886 " doublewrite buffer is"
887 " corrupt. Cannot continue"
888 " operation. You can try to"
889 " recover the database with"
890 " innodb_force_recovery=6";
891 }
892 } else if (buf_page_is_zeroes(read_buf, page_size)
893 && !buf_page_is_zeroes(page, page_size)
894 && !buf_page_is_corrupted(
895 true, page, page_size,
896 fsp_is_checksum_disabled(space_id))) {
897
898 /* Database page contained only zeroes, while
899 a valid copy is available in dblwr buffer. */
900
901 } else {
902
903 bool t1 = buf_page_is_zeroes(
904 read_buf, page_size);
905
906 bool t2 = buf_page_is_zeroes(page, page_size);
907
908 bool t3 = buf_page_is_corrupted(
909 true, page, page_size,
910 fsp_is_checksum_disabled(space_id));
911
912 if (t1 && !(t2 || t3)) {
913
914 /* Database page contained only
915 zeroes, while a valid copy is
916 available in dblwr buffer. */
917
918 } else {
919 continue;
920 }
921 }
922
923 /* Recovered data file pages are written out
924 as uncompressed. */
925
926 IORequest write_request(IORequest::WRITE);
927
928 write_request.disable_compression();
929
930 /* Write the good page from the doublewrite
931 buffer to the intended position. */
932
933 fil_io(write_request, true,
934 page_id, page_size,
935 0, page_size.physical(),
936 const_cast<byte*>(page), NULL);
937
938 ib::info()
939 << "Recovered page "
940 << page_id
941 << " from the doublewrite buffer.";
942 }
943 }
944
945 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
946 ut_free(unaligned_read_buf);
947
948 buf_parallel_dblwr_finish_recovery();
949
950 /* If parallel doublewrite buffer was used, now it's safe to
951 delete and re-create it. */
952 buf_parallel_dblwr_delete();
953 if (buf_parallel_dblwr_create() != DB_SUCCESS) {
954 ib::fatal()
955 << "Creating the parallel doublewrite buffer failed";
956 }
957 }
958
959 /****************************************************************//**
960 Frees doublewrite buffer. */
961 void
buf_dblwr_free(void)962 buf_dblwr_free(void)
963 /*================*/
964 {
965 /* Free the double write data structures. */
966 ut_a(buf_dblwr != NULL);
967 ut_ad(buf_dblwr->s_reserved == 0);
968
969 os_event_destroy(buf_dblwr->s_event);
970 ut_free(buf_dblwr->write_buf_unaligned);
971 buf_dblwr->write_buf_unaligned = NULL;
972
973 ut_free(buf_dblwr->buf_block_arr);
974 buf_dblwr->buf_block_arr = NULL;
975
976 ut_free(buf_dblwr->in_use);
977 buf_dblwr->in_use = NULL;
978
979 mutex_free(&buf_dblwr->mutex);
980 ut_free(buf_dblwr);
981 buf_dblwr = NULL;
982 }
983
984 /********************************************************************//**
985 Updates the doublewrite buffer when an IO request is completed. */
986 void
buf_dblwr_update(const buf_page_t * bpage,buf_flush_t flush_type)987 buf_dblwr_update(
988 /*=============*/
989 const buf_page_t* bpage, /*!< in: buffer block descriptor */
990 buf_flush_t flush_type)/*!< in: flush type */
991 {
992 if (!srv_use_doublewrite_buf
993 || buf_dblwr == NULL
994 || fsp_is_system_temporary(bpage->id.space())) {
995 return;
996 }
997
998 ut_ad(!srv_read_only_mode);
999
1000 switch (flush_type) {
1001 case BUF_FLUSH_LIST:
1002 case BUF_FLUSH_LRU:
1003 {
1004 ulint i = buf_parallel_dblwr_partition(bpage,
1005 flush_type);
1006 struct parallel_dblwr_shard_t* dblwr_shard
1007 = ¶llel_dblwr_buf.shard[i];
1008
1009 ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1010
1011 if (os_atomic_decrement_ulint(&dblwr_shard->batch_size,
1012 1)
1013 == 0) {
1014
1015 /* The last page from the doublewrite batch. */
1016 os_event_set(dblwr_shard->batch_completed);
1017 }
1018
1019 break;
1020 }
1021 case BUF_FLUSH_SINGLE_PAGE:
1022 {
1023 const ulint size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1024 ulint i;
1025 mutex_enter(&buf_dblwr->mutex);
1026 for (i = 0; i < size; ++i) {
1027 if (buf_dblwr->buf_block_arr[i] == bpage) {
1028 buf_dblwr->s_reserved--;
1029 buf_dblwr->buf_block_arr[i] = NULL;
1030 buf_dblwr->in_use[i] = false;
1031 break;
1032 }
1033 }
1034
1035 /* The block we are looking for must exist as a
1036 reserved block. */
1037 ut_a(i < size);
1038 }
1039 os_event_set(buf_dblwr->s_event);
1040 mutex_exit(&buf_dblwr->mutex);
1041 break;
1042 case BUF_FLUSH_N_TYPES:
1043 ut_error;
1044 }
1045 }
1046
1047 /********************************************************************//**
1048 Check the LSN values on the page. */
1049 static
1050 void
buf_dblwr_check_page_lsn(const page_t * page)1051 buf_dblwr_check_page_lsn(
1052 /*=====================*/
1053 const page_t* page) /*!< in: page to check */
1054 {
1055 if (memcmp(page + (FIL_PAGE_LSN + 4),
1056 page + (UNIV_PAGE_SIZE
1057 - FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
1058 4)) {
1059
1060 const ulint lsn1 = mach_read_from_4(
1061 page + FIL_PAGE_LSN + 4);
1062 const ulint lsn2 = mach_read_from_4(
1063 page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM
1064 + 4);
1065
1066 ib::error() << "The page to be written seems corrupt!"
1067 " The low 4 bytes of LSN fields do not match"
1068 " (" << lsn1 << " != " << lsn2 << ")!"
1069 " Noticed in the buffer pool.";
1070 }
1071 }
1072
1073 /********************************************************************//**
1074 Asserts when a corrupt block is find during writing out data to the
1075 disk. */
1076 static
1077 void
buf_dblwr_assert_on_corrupt_block(const buf_block_t * block)1078 buf_dblwr_assert_on_corrupt_block(
1079 /*==============================*/
1080 const buf_block_t* block) /*!< in: block to check */
1081 {
1082 buf_page_print(block->frame, univ_page_size, BUF_PAGE_PRINT_NO_CRASH);
1083
1084 ib::fatal() << "Apparent corruption of an index page "
1085 << block->page.id
1086 << " to be written to data file. We intentionally crash"
1087 " the server to prevent corrupt data from ending up in"
1088 " data files.";
1089 }
1090
1091 /********************************************************************//**
1092 Check the LSN values on the page with which this block is associated.
1093 Also validate the page if the option is set. */
1094 static
1095 void
buf_dblwr_check_block(const buf_block_t * block)1096 buf_dblwr_check_block(
1097 /*==================*/
1098 const buf_block_t* block) /*!< in: block to check */
1099 {
1100 ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
1101
1102 if (block->skip_flush_check) {
1103 return;
1104 }
1105
1106 switch (fil_page_get_type(block->frame)) {
1107 case FIL_PAGE_INDEX:
1108 case FIL_PAGE_RTREE:
1109 if (page_is_comp(block->frame)) {
1110 if (page_simple_validate_new(block->frame)) {
1111 return;
1112 }
1113 } else if (page_simple_validate_old(block->frame)) {
1114 return;
1115 }
1116 /* While it is possible that this is not an index page
1117 but just happens to have wrongly set FIL_PAGE_TYPE,
1118 such pages should never be modified to without also
1119 adjusting the page type during page allocation or
1120 buf_flush_init_for_writing() or fil_page_reset_type(). */
1121 break;
1122 case FIL_PAGE_TYPE_FSP_HDR:
1123 case FIL_PAGE_IBUF_BITMAP:
1124 case FIL_PAGE_TYPE_UNKNOWN:
1125 /* Do not complain again, we already reset this field. */
1126 case FIL_PAGE_UNDO_LOG:
1127 case FIL_PAGE_INODE:
1128 case FIL_PAGE_IBUF_FREE_LIST:
1129 case FIL_PAGE_TYPE_SYS:
1130 case FIL_PAGE_TYPE_TRX_SYS:
1131 case FIL_PAGE_TYPE_XDES:
1132 case FIL_PAGE_TYPE_BLOB:
1133 case FIL_PAGE_TYPE_ZBLOB:
1134 case FIL_PAGE_TYPE_ZBLOB2:
1135 /* TODO: validate also non-index pages */
1136 return;
1137 case FIL_PAGE_TYPE_ALLOCATED:
1138 /* empty pages could be flushed by encryption threads */
1139 return;
1140 }
1141
1142 buf_dblwr_assert_on_corrupt_block(block);
1143 }
1144
1145 /********************************************************************//**
1146 Writes a page that has already been written to the doublewrite buffer
1147 to the datafile. It is the job of the caller to sync the datafile. */
1148 static
1149 void
buf_dblwr_write_block_to_datafile(const buf_page_t * bpage,bool sync)1150 buf_dblwr_write_block_to_datafile(
1151 /*==============================*/
1152 const buf_page_t* bpage, /*!< in: page to write */
1153 bool sync) /*!< in: true if sync IO
1154 is requested */
1155 {
1156 ut_a(buf_page_in_file(bpage));
1157
1158 ulint type = IORequest::WRITE;
1159
1160 if (sync) {
1161 type |= IORequest::DO_NOT_WAKE;
1162 }
1163
1164 IORequest request(type);
1165
1166 if (bpage->zip.data != NULL) {
1167 ut_ad(bpage->size.is_compressed());
1168
1169 fil_io(request, sync, bpage->id, bpage->size, 0,
1170 bpage->size.physical(),
1171 (void*) bpage->zip.data,
1172 (void*) bpage);
1173 } else {
1174 ut_ad(!bpage->size.is_compressed());
1175
1176 /* Our IO API is common for both reads and writes and is
1177 therefore geared towards a non-const parameter. */
1178
1179 buf_block_t* block = reinterpret_cast<buf_block_t*>(
1180 const_cast<buf_page_t*>(bpage));
1181
1182 ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
1183 buf_dblwr_check_page_lsn(block->frame);
1184
1185 fil_io(request,
1186 sync, bpage->id, bpage->size, 0, bpage->size.physical(),
1187 block->frame, block);
1188 }
1189 }
1190
1191 /** Encrypt a page in doublewerite buffer shard. The page is
1192 encrypted using its tablespace key.
1193 @param[in] block the buffer pool block for the page
1194 @param[in,out] dblwr_page in: unencrypted page
1195 out: encrypted page (if tablespace is
1196 encrypted */
1197 static
1198 void
buf_dblwr_encrypt_page(const buf_block_t * block,page_t * dblwr_page)1199 buf_dblwr_encrypt_page(
1200 const buf_block_t* block,
1201 page_t* dblwr_page)
1202 {
1203 const ulint space_id = block->page.id.space();
1204 fil_space_t* space = fil_space_acquire_silent(space_id);
1205
1206 if (space == NULL) {
1207 /* Tablespace dropped */
1208 return;
1209 }
1210
1211 byte* encrypted_buf = static_cast<byte*>(
1212 ut_zalloc_nokey(UNIV_PAGE_SIZE));
1213 ut_a(encrypted_buf != NULL);
1214
1215 const page_size_t page_size(space->flags);
1216 const bool success = os_dblwr_encrypt_page(
1217 space, dblwr_page, encrypted_buf, UNIV_PAGE_SIZE);
1218
1219 if (success) {
1220 memcpy(dblwr_page, encrypted_buf, page_size.physical());
1221 }
1222
1223 ut_free(encrypted_buf);
1224
1225 fil_space_release(space);
1226 }
1227
1228 /* Disable encryption of Page 0 of any tablespace or if it is system
1229 tablespace, do not encrypt pages upto TRX_SYS_PAGE_NO (including).
1230 TRX_SYS_PAGE should be not encrypted because dblwr buffer is found
1231 from this page
1232 @param[in] block buffer block
1233 @return true if encryption should be disabled for the block, else flase */
1234 static
1235 bool
buf_dblwr_disable_encryption(const buf_block_t * block)1236 buf_dblwr_disable_encryption(
1237 const buf_block_t* block)
1238 {
1239 return(block->page.id.page_no() == 0
1240 || (block->page.id.space() == TRX_SYS_SPACE
1241 && block->page.id.page_no() <= TRX_SYS_PAGE_NO));
1242 }
1243
1244 /********************************************************************//**
1245 Flushes possible buffered writes from the specified partition of the
1246 doublewrite memory buffer to disk, and also wakes up the aio thread if
1247 simulated aio is used. It is very important to call this function after a batch
1248 of writes has been posted, and also when we may have to wait for a page latch!
1249 Otherwise a deadlock of threads can occur. */
1250 void
buf_dblwr_flush_buffered_writes(ulint dblwr_partition)1251 buf_dblwr_flush_buffered_writes(
1252 /*============================*/
1253 ulint dblwr_partition) /*!< in: doublewrite partition */
1254 {
1255 byte* write_buf;
1256 ulint len;
1257
1258 ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1259
1260 if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
1261 /* Sync the writes to the disk. */
1262 buf_dblwr_sync_datafiles();
1263 return;
1264 }
1265
1266 ut_ad(!srv_read_only_mode);
1267
1268 struct parallel_dblwr_shard_t* dblwr_shard
1269 = ¶llel_dblwr_buf.shard[dblwr_partition];
1270
1271 /* Write first to doublewrite buffer blocks. We use synchronous
1272 aio and thus know that file write has been completed when the
1273 control returns. */
1274
1275 if (dblwr_shard->first_free == 0) {
1276
1277 /* Wake possible simulated aio thread as there could be
1278 system temporary tablespace pages active for flushing.
1279 Note: system temporary tablespace pages are not scheduled
1280 for doublewrite. */
1281 os_aio_simulated_wake_handler_threads();
1282
1283 return;
1284 }
1285
1286 write_buf = dblwr_shard->write_buf;
1287
1288 const bool encrypt_parallel_dblwr = srv_parallel_dblwr_encrypt;
1289
1290 for (ulint len2 = 0, i = 0;
1291 i < dblwr_shard->first_free;
1292 len2 += UNIV_PAGE_SIZE, i++) {
1293
1294 const buf_block_t* block;
1295
1296 block = (buf_block_t*)dblwr_shard->buf_block_arr[i];
1297
1298 page_t* dblwr_page = write_buf + len2;
1299
1300 if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
1301 || block->page.zip.data) {
1302 /* No simple validate for compressed
1303 pages exists. */
1304 continue;
1305 }
1306
1307 /* Check that the actual page in the buffer pool is
1308 not corrupt and the LSN values are sane. */
1309 buf_dblwr_check_block(block);
1310
1311 /* Check that the page as written to the doublewrite
1312 buffer has sane LSN values. */
1313 buf_dblwr_check_page_lsn(dblwr_page);
1314
1315 // it can be already encrypted by encryption threads
1316 FilSpace space (TRX_SYS_SPACE);
1317 if (encrypt_parallel_dblwr && space()->crypt_data == NULL
1318 && !buf_dblwr_disable_encryption(block)) {
1319 buf_dblwr_encrypt_page(block, dblwr_page);
1320 }
1321 }
1322
1323 len = dblwr_shard->first_free * UNIV_PAGE_SIZE;
1324
1325 /* Find our part of the doublewrite buffer */
1326 os_offset_t file_pos = dblwr_partition
1327 * srv_doublewrite_batch_size * UNIV_PAGE_SIZE;
1328 IORequest io_req(IORequest::WRITE | IORequest::NO_COMPRESSION);
1329
1330 #ifdef UNIV_DEBUG
1331 /* The file size must not increase */
1332 os_offset_t desired_size = srv_doublewrite_batch_size * UNIV_PAGE_SIZE
1333 * buf_parallel_dblwr_shard_num();
1334 os_offset_t actual_size = os_file_get_size(parallel_dblwr_buf.file);
1335 ut_ad(desired_size == actual_size);
1336 ut_ad(file_pos + len <= actual_size);
1337 /* We must not touch neighboring buffers */
1338 ut_ad(file_pos + len <= (dblwr_partition + 1)
1339 * srv_doublewrite_batch_size * UNIV_PAGE_SIZE);
1340 #endif
1341
1342 dberr_t err = os_file_write(io_req, parallel_dblwr_buf.path,
1343 parallel_dblwr_buf.file, write_buf,
1344 file_pos, len);
1345 if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1346 ib::fatal() << "Parallel doublewrite buffer write failed, "
1347 "crashing the server to avoid data loss";
1348 }
1349
1350 ut_ad(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1351
1352 /* increment the doublewrite flushed pages counter */
1353 srv_stats.dblwr_pages_written.add(dblwr_shard->first_free);
1354 srv_stats.dblwr_writes.inc();
1355
1356 if (parallel_dblwr_buf.needs_flush)
1357 os_file_flush(parallel_dblwr_buf.file);
1358
1359 /* We know that the writes have been flushed to disk now
1360 and in recovery we will find them in the doublewrite buffer
1361 blocks. Next do the writes to the intended positions. */
1362
1363 dblwr_shard->batch_size = dblwr_shard->first_free;
1364 os_wmb;
1365
1366 for (ulint i = 0; i < dblwr_shard->first_free; i++) {
1367 buf_dblwr_write_block_to_datafile(
1368 dblwr_shard->buf_block_arr[i], false);
1369 }
1370
1371 /* Wake possible simulated aio thread to actually post the
1372 writes to the operating system. We don't flush the files
1373 at this point. We leave it to the IO helper thread to flush
1374 datafiles when the whole batch has been processed. */
1375 os_aio_simulated_wake_handler_threads();
1376
1377 os_event_wait(dblwr_shard->batch_completed);
1378 os_event_reset(dblwr_shard->batch_completed);
1379
1380 #ifdef UNIV_DEBUG
1381 os_rmb;
1382 ut_ad(dblwr_shard->batch_size == 0);
1383 #endif
1384 dblwr_shard->first_free = 0;
1385
1386 /* This will finish the batch. Sync data files
1387 to the disk. */
1388 fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
1389 }
1390
1391 /********************************************************************//**
1392 Posts a buffer page for writing. If the doublewrite memory buffer is
1393 full, calls buf_dblwr_flush_buffered_writes and waits for for free
1394 space to appear. */
1395 void
buf_dblwr_add_to_batch(buf_page_t * bpage,buf_flush_t flush_type)1396 buf_dblwr_add_to_batch(
1397 /*====================*/
1398 buf_page_t* bpage, /*!< in: buffer block to write */
1399 buf_flush_t flush_type)/*!< in: BUF_FLUSH_LRU or BUF_FLUSH_LIST */
1400 {
1401 ut_ad(flush_type == BUF_FLUSH_LRU || flush_type == BUF_FLUSH_LIST);
1402 ut_a(buf_page_in_file(bpage));
1403 ut_ad(!mutex_own(&buf_pool_from_bpage(bpage)->LRU_list_mutex));
1404
1405 ulint dblwr_partition = buf_parallel_dblwr_partition(bpage,
1406 flush_type);
1407 struct parallel_dblwr_shard_t* dblwr_shard
1408 = ¶llel_dblwr_buf.shard[dblwr_partition];
1409
1410 try_again:
1411 ut_a(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1412 ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1413
1414 if (dblwr_shard->first_free == srv_doublewrite_batch_size) {
1415
1416 buf_dblwr_flush_buffered_writes(dblwr_partition);
1417
1418 goto try_again;
1419 }
1420
1421 byte* p = dblwr_shard->write_buf
1422 + univ_page_size.physical() * dblwr_shard->first_free;
1423
1424 if (bpage->size.is_compressed()) {
1425 UNIV_MEM_ASSERT_RW(bpage->zip.data, bpage->size.physical());
1426 /* Copy the compressed page and clear the rest. */
1427
1428 memcpy(p, bpage->zip.data, bpage->size.physical());
1429
1430 memset(p + bpage->size.physical(), 0x0,
1431 univ_page_size.physical() - bpage->size.physical());
1432 } else {
1433 ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1434
1435 UNIV_MEM_ASSERT_RW(((buf_block_t*) bpage)->frame,
1436 bpage->size.logical());
1437
1438 memcpy(p, ((buf_block_t*) bpage)->frame, bpage->size.logical());
1439 }
1440
1441 dblwr_shard->buf_block_arr[dblwr_shard->first_free++] = bpage;
1442
1443 ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1444 ut_ad(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1445 }
1446
1447 /********************************************************************//**
1448 Writes a page to the doublewrite buffer on disk, sync it, then write
1449 the page to the datafile and sync the datafile. This function is used
1450 for single page flushes. If all the buffers allocated for single page
1451 flushes in the doublewrite buffer are in use we wait here for one to
1452 become free. We are guaranteed that a slot will become free because any
1453 thread that is using a slot must also release the slot before leaving
1454 this function. */
1455 void
buf_dblwr_write_single_page(buf_page_t * bpage,bool sync)1456 buf_dblwr_write_single_page(
1457 /*========================*/
1458 buf_page_t* bpage, /*!< in: buffer block to write */
1459 bool sync) /*!< in: true if sync IO requested */
1460 {
1461 ulint size;
1462 ulint offset;
1463 ulint i;
1464
1465 ut_a(buf_page_in_file(bpage));
1466 ut_a(srv_use_doublewrite_buf);
1467 ut_a(buf_dblwr != NULL);
1468
1469 size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1470
1471 if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1472
1473 /* Check that the actual page in the buffer pool is
1474 not corrupt and the LSN values are sane. */
1475 buf_dblwr_check_block((buf_block_t*) bpage);
1476
1477 /* Check that the page as written to the doublewrite
1478 buffer has sane LSN values. */
1479 if (!bpage->zip.data) {
1480 buf_dblwr_check_page_lsn(
1481 ((buf_block_t*) bpage)->frame);
1482 }
1483 }
1484
1485 retry:
1486 mutex_enter(&buf_dblwr->mutex);
1487 if (buf_dblwr->s_reserved == size) {
1488
1489 /* All slots are reserved. */
1490 int64_t sig_count = os_event_reset(buf_dblwr->s_event);
1491 mutex_exit(&buf_dblwr->mutex);
1492 os_event_wait_low(buf_dblwr->s_event, sig_count);
1493
1494 goto retry;
1495 }
1496
1497 for (i = 0; i < size; ++i) {
1498
1499 if (!buf_dblwr->in_use[i]) {
1500 break;
1501 }
1502 }
1503
1504 /* We are guaranteed to find a slot. */
1505 ut_a(i < size);
1506 buf_dblwr->in_use[i] = true;
1507 buf_dblwr->s_reserved++;
1508 buf_dblwr->buf_block_arr[i] = bpage;
1509
1510 /* increment the doublewrite flushed pages counter */
1511 srv_stats.dblwr_pages_written.inc();
1512 srv_stats.dblwr_writes.inc();
1513
1514 mutex_exit(&buf_dblwr->mutex);
1515
1516 /* Lets see if we are going to write in the first or second
1517 block of the doublewrite buffer. */
1518 if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1519 offset = buf_dblwr->block1 + i;
1520 } else {
1521 offset = buf_dblwr->block2 + i
1522 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1523 }
1524
1525 /* We deal with compressed and uncompressed pages a little
1526 differently here. In case of uncompressed pages we can
1527 directly write the block to the allocated slot in the
1528 doublewrite buffer in the system tablespace and then after
1529 syncing the system table space we can proceed to write the page
1530 in the datafile.
1531 In case of compressed page we first do a memcpy of the block
1532 to the in-memory buffer of doublewrite before proceeding to
1533 write it. This is so because we want to pad the remaining
1534 bytes in the doublewrite page with zeros. */
1535
1536 IORequest write_request(IORequest::WRITE);
1537
1538 if (buf_dblwr_disable_encryption((buf_block_t*)bpage)) {
1539 write_request.disable_encryption();
1540 }
1541
1542 if (bpage->size.is_compressed()) {
1543 memcpy(buf_dblwr->write_buf + univ_page_size.physical() * i,
1544 bpage->zip.data, bpage->size.physical());
1545
1546 memset(buf_dblwr->write_buf + univ_page_size.physical() * i
1547 + bpage->size.physical(), 0x0,
1548 univ_page_size.physical() - bpage->size.physical());
1549
1550 fil_io(write_request, true,
1551 page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1552 univ_page_size.physical(),
1553 (void*) (buf_dblwr->write_buf
1554 + univ_page_size.physical() * i),
1555 NULL);
1556 } else {
1557 /* It is a regular page. Write it directly to the
1558 doublewrite buffer */
1559 fil_io(IORequestWrite, true,
1560 page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1561 univ_page_size.physical(),
1562 (void*) ((buf_block_t*) bpage)->frame,
1563 NULL);
1564 }
1565
1566 /* Now flush the doublewrite buffer data to disk */
1567 fil_flush(TRX_SYS_SPACE);
1568
1569 /* We know that the write has been flushed to disk now
1570 and during recovery we will find it in the doublewrite buffer
1571 blocks. Next do the write to the intended position. */
1572 buf_dblwr_write_block_to_datafile(bpage, sync);
1573 }
1574
1575 /** Compute the size and path of the parallel doublewrite buffer, create it,
1576 and disable OS caching for it
1577 @return DB_SUCCESS or error code */
1578 static MY_ATTRIBUTE((warn_unused_result))
1579 dberr_t
buf_parallel_dblwr_file_create(void)1580 buf_parallel_dblwr_file_create(void)
1581 {
1582 ut_ad(!srv_read_only_mode);
1583 /* The buffer size is two doublewrite batches (one for LRU, one for
1584 flush list flusher) per buffer pool instance. */
1585 os_offset_t size = srv_doublewrite_batch_size * UNIV_PAGE_SIZE
1586 * buf_parallel_dblwr_shard_num();
1587 ut_a(size <= MAX_DOUBLEWRITE_FILE_SIZE);
1588 ut_a(size > 0);
1589 ut_a(size % UNIV_PAGE_SIZE == 0);
1590
1591 dberr_t err = buf_parallel_dblwr_make_path();
1592 if (err != DB_SUCCESS)
1593 return(err);
1594
1595 ut_ad(parallel_dblwr_buf.file.is_closed());
1596 ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1597
1598 /* Set O_SYNC if innodb_flush_method == O_DSYNC. */
1599 ulint o_sync = (srv_unix_file_flush_method == SRV_UNIX_O_DSYNC)
1600 ? OS_FILE_O_SYNC : 0;
1601
1602 bool success;
1603 parallel_dblwr_buf.file
1604 = os_file_create_simple(innodb_parallel_dblwrite_file_key,
1605 parallel_dblwr_buf.path,
1606 OS_FILE_CREATE | o_sync,
1607 OS_FILE_READ_WRITE, false, &success);
1608 if (!success) {
1609 if (os_file_get_last_error(false) == OS_FILE_ALREADY_EXISTS) {
1610 ib::error() << "A parallel doublewrite file "
1611 << parallel_dblwr_buf.path
1612 << " found on startup.";
1613 }
1614 return(DB_ERROR);
1615 }
1616
1617 const bool o_direct_set
1618 = os_file_set_nocache(parallel_dblwr_buf.file,
1619 parallel_dblwr_buf.path,
1620 "create", false);
1621 switch (srv_unix_file_flush_method) {
1622 case SRV_UNIX_NOSYNC:
1623 case SRV_UNIX_O_DSYNC:
1624 case SRV_UNIX_O_DIRECT_NO_FSYNC:
1625 case SRV_UNIX_ALL_O_DIRECT:
1626 parallel_dblwr_buf.needs_flush = !o_direct_set;
1627 break;
1628 case SRV_UNIX_FSYNC:
1629 case SRV_UNIX_LITTLESYNC:
1630 case SRV_UNIX_O_DIRECT:
1631 parallel_dblwr_buf.needs_flush = true;
1632 break;
1633 }
1634
1635 success = os_file_set_size(parallel_dblwr_buf.path,
1636 parallel_dblwr_buf.file, size, false);
1637 if (!success) {
1638 buf_parallel_dblwr_free(true);
1639 return(DB_ERROR);
1640 }
1641 ut_ad(os_file_get_size(parallel_dblwr_buf.file) == size);
1642
1643 ib::info() << "Created parallel doublewrite buffer at "
1644 << parallel_dblwr_buf.path << ", size "
1645 << os_file_get_size(parallel_dblwr_buf.file) << " bytes";
1646
1647 return(DB_SUCCESS);
1648 }
1649
1650 /** Initialize parallel doublewrite subsystem: create its data structure and
1651 the disk file.
1652 @return DB_SUCCESS or error code */
1653 dberr_t
buf_parallel_dblwr_create(void)1654 buf_parallel_dblwr_create(void)
1655 {
1656 if (!srv_use_doublewrite_buf) {
1657 return(DB_SUCCESS);
1658 }
1659
1660 if (!parallel_dblwr_buf.file.is_closed() || srv_read_only_mode) {
1661
1662 ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1663 return(DB_SUCCESS);
1664 }
1665
1666 memset(parallel_dblwr_buf.shard, 0, sizeof(parallel_dblwr_buf.shard));
1667
1668 dberr_t err = buf_parallel_dblwr_file_create();
1669 if (err != DB_SUCCESS) {
1670 return(err);
1671 }
1672
1673 for (ulint i = 0; i < buf_parallel_dblwr_shard_num(); i++) {
1674
1675 struct parallel_dblwr_shard_t* dblwr_shard
1676 = ¶llel_dblwr_buf.shard[i];
1677
1678 dblwr_shard->write_buf_unaligned
1679 = static_cast<byte*>(ut_malloc((1
1680 + srv_doublewrite_batch_size)
1681 * UNIV_PAGE_SIZE,
1682 mem_key_parallel_doublewrite));
1683 if (!dblwr_shard->write_buf_unaligned) {
1684 buf_parallel_dblwr_free(true);
1685 return(DB_OUT_OF_MEMORY);
1686 }
1687 dblwr_shard->write_buf = static_cast<byte*>(
1688 ut_align(dblwr_shard->write_buf_unaligned,
1689 UNIV_PAGE_SIZE));
1690 dblwr_shard->buf_block_arr
1691 = static_cast<buf_page_t**>(
1692 ut_zalloc(srv_doublewrite_batch_size
1693 * sizeof(void*),
1694 mem_key_parallel_doublewrite));
1695 if (!dblwr_shard->buf_block_arr) {
1696 buf_parallel_dblwr_free(true);
1697 return(DB_OUT_OF_MEMORY);
1698 }
1699
1700 dblwr_shard->batch_completed
1701 = os_event_create("parallel_dblwr_batch_completed");
1702 os_event_reset(dblwr_shard->batch_completed);
1703 }
1704
1705 return(DB_SUCCESS);
1706 }
1707
1708 /** Cleanup parallel doublewrite memory structures and optionally close and
1709 delete the doublewrite buffer file too.
1710 @param delete_file whether to close and delete the buffer file too */
1711 void
buf_parallel_dblwr_free(bool delete_file)1712 buf_parallel_dblwr_free(bool delete_file)
1713 {
1714 for (ulint i = 0; i < buf_parallel_dblwr_shard_num(); i++) {
1715
1716 struct parallel_dblwr_shard_t* dblwr_shard
1717 = ¶llel_dblwr_buf.shard[i];
1718
1719 if (dblwr_shard->write_buf_unaligned
1720 && dblwr_shard->buf_block_arr) {
1721 os_event_destroy(dblwr_shard->batch_completed);
1722 }
1723
1724 ut_free(dblwr_shard->write_buf_unaligned);
1725 ut_free(dblwr_shard->buf_block_arr);
1726 }
1727
1728 if (delete_file) {
1729 buf_parallel_dblwr_close();
1730 buf_parallel_dblwr_delete();
1731 }
1732
1733 ut_free(parallel_dblwr_buf.path);
1734 parallel_dblwr_buf.path = NULL;
1735 }
1736
1737 /** The parallel doublewrite buffer */
1738 parallel_dblwr_t parallel_dblwr_buf;
1739
1740 #endif /* !UNIV_HOTBACKUP */
1741