1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2021, Oracle and/or its affiliates.
4 Copyright (c) 2016, Percona Inc. All Rights Reserved.
5 
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License, version 2.0,
8 as published by the Free Software Foundation.
9 
10 This program is also distributed with certain software (including
11 but not limited to OpenSSL) that is licensed under separate terms,
12 as designated in a particular file or component or in included license
13 documentation.  The authors of MySQL hereby grant you an additional
14 permission to link the program and your derivative works with the
15 separately licensed software that they have included with MySQL.
16 
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 GNU General Public License, version 2.0, for more details.
21 
22 You should have received a copy of the GNU General Public License along with
23 this program; if not, write to the Free Software Foundation, Inc.,
24 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
25 
26 *****************************************************************************/
27 
28 /**************************************************//**
29 @file buf/buf0dblwr.cc
30 Doublwrite buffer module
31 
32 Created 2011/12/19
33 *******************************************************/
34 
35 #include "ha_prototypes.h"
36 #include "buf0dblwr.h"
37 
38 #ifdef UNIV_NONINL
39 #include "buf0buf.ic"
40 #include "buf0dblrw.ic"
41 #endif
42 
43 #include "buf0buf.h"
44 #include "buf0checksum.h"
45 #include "srv0start.h"
46 #include "srv0srv.h"
47 #include "page0zip.h"
48 #include "trx0sys.h"
49 #include "os0file.h"
50 
51 #ifndef UNIV_HOTBACKUP
52 
53 /** The doublewrite buffer */
54 buf_dblwr_t*	buf_dblwr = NULL;
55 
56 /** Set to TRUE when the doublewrite buffer is being created */
57 ibool	buf_dblwr_being_created = FALSE;
58 
59 /****************************************************************//**
60 Determines if a page number is located inside the doublewrite buffer.
61 @return TRUE if the location is inside the two blocks of the
62 doublewrite buffer */
63 ibool
buf_dblwr_page_inside(ulint page_no)64 buf_dblwr_page_inside(
65 /*==================*/
66 	ulint	page_no)	/*!< in: page number */
67 {
68 	if (buf_dblwr == NULL) {
69 
70 		return(FALSE);
71 	}
72 
73 	if (page_no >= buf_dblwr->block1
74 	    && page_no < buf_dblwr->block1
75 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
76 		return(TRUE);
77 	}
78 
79 	if (page_no >= buf_dblwr->block2
80 	    && page_no < buf_dblwr->block2
81 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
82 		return(TRUE);
83 	}
84 
85 	return(FALSE);
86 }
87 
88 /****************************************************************//**
89 Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
90 doublewrite buffer within it.
91 @return pointer to the doublewrite buffer within the filespace header
92 page. */
93 UNIV_INLINE
94 byte*
buf_dblwr_get(mtr_t * mtr)95 buf_dblwr_get(
96 /*==========*/
97 	mtr_t*	mtr)	/*!< in/out: MTR to hold the page latch */
98 {
99 	buf_block_t*	block;
100 
101 	block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
102 			     univ_page_size, RW_X_LATCH, mtr);
103 
104 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
105 
106 	return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
107 }
108 
109 /********************************************************************//**
110 Flush a batch of writes to the datafiles that have already been
111 written to the dblwr buffer on disk. */
112 void
buf_dblwr_sync_datafiles()113 buf_dblwr_sync_datafiles()
114 /*======================*/
115 {
116 	/* Wake possible simulated aio thread to actually post the
117 	writes to the operating system */
118 	os_aio_simulated_wake_handler_threads();
119 
120 	/* Wait that all async writes to tablespaces have been posted to
121 	the OS */
122 	os_aio_wait_until_no_pending_writes();
123 
124 	/* Now we flush the data to disk (for example, with fsync) */
125 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
126 }
127 
128 /****************************************************************//**
129 Creates or initialializes the doublewrite buffer at a database start. */
130 static
131 void
buf_dblwr_init(byte * doublewrite)132 buf_dblwr_init(
133 /*===========*/
134 	byte*	doublewrite)	/*!< in: pointer to the doublewrite buf
135 				header on trx sys page */
136 {
137 	ulint	buf_size;
138 
139 	buf_dblwr = static_cast<buf_dblwr_t*>(
140 		ut_zalloc_nokey(sizeof(buf_dblwr_t)));
141 
142 	/* There are two blocks of same size in the doublewrite
143 	buffer. */
144 	buf_size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
145 
146 	/* There must be atleast one buffer for single page writes
147 	and one buffer for batch writes. */
148 	ut_a(srv_doublewrite_batch_size > 0
149 	     && srv_doublewrite_batch_size < buf_size);
150 
151 	mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
152 
153 	buf_dblwr->s_event = os_event_create("dblwr_single_event");
154 	buf_dblwr->s_reserved = 0;
155 
156 	buf_dblwr->block1 = mach_read_from_4(
157 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
158 	buf_dblwr->block2 = mach_read_from_4(
159 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
160 
161 	buf_dblwr->in_use = static_cast<bool*>(
162 		ut_zalloc_nokey(buf_size * sizeof(bool)));
163 
164 	buf_dblwr->write_buf_unaligned = static_cast<byte*>(
165 		ut_malloc_nokey((1 + buf_size) * UNIV_PAGE_SIZE));
166 
167 	buf_dblwr->write_buf = static_cast<byte*>(
168 		ut_align(buf_dblwr->write_buf_unaligned,
169 			 UNIV_PAGE_SIZE));
170 
171 	buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
172 		ut_zalloc_nokey(buf_size * sizeof(void*)));
173 }
174 
175 /****************************************************************//**
176 Creates the doublewrite buffer to a new InnoDB installation. The header of the
177 doublewrite buffer is placed on the trx system header page.
178 @return true if successful, false if not. */
179 MY_ATTRIBUTE((warn_unused_result))
180 bool
buf_dblwr_create(void)181 buf_dblwr_create(void)
182 /*==================*/
183 {
184 	buf_block_t*	block2;
185 	buf_block_t*	new_block;
186 	byte*	doublewrite;
187 	byte*	fseg_header;
188 	ulint	page_no;
189 	ulint	prev_page_no;
190 	ulint	i;
191 	mtr_t	mtr;
192 
193 	if (buf_dblwr) {
194 		/* Already inited */
195 
196 		return(true);
197 	}
198 
199 start_again:
200 	mtr_start(&mtr);
201 	buf_dblwr_being_created = TRUE;
202 
203 	doublewrite = buf_dblwr_get(&mtr);
204 
205 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
206 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
207 		/* The doublewrite buffer has already been created:
208 		just read in some numbers */
209 
210 		buf_dblwr_init(doublewrite);
211 
212 		mtr_commit(&mtr);
213 		buf_dblwr_being_created = FALSE;
214 		return(true);
215 	}
216 
217 	ib::info() << "Doublewrite buffer not found: creating new";
218 
219 	ulint min_doublewrite_size =
220 		( ( 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
221 		  + FSP_EXTENT_SIZE / 2
222 		  + 100)
223 		* UNIV_PAGE_SIZE);
224 	if (buf_pool_get_curr_size() <  min_doublewrite_size) {
225 		ib::error() << "Cannot create doublewrite buffer: you must"
226 			" increase your buffer pool size. Cannot continue"
227 			" operation.";
228 
229 		return(false);
230 	}
231 
232 	block2 = fseg_create(TRX_SYS_SPACE, TRX_SYS_PAGE_NO,
233 			     TRX_SYS_DOUBLEWRITE
234 			     + TRX_SYS_DOUBLEWRITE_FSEG, &mtr);
235 
236 	/* fseg_create acquires a second latch on the page,
237 	therefore we must declare it: */
238 
239 	buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
240 
241 	if (block2 == NULL) {
242 		ib::error() << "Cannot create doublewrite buffer: you must"
243 			" increase your tablespace size."
244 			" Cannot continue operation.";
245 
246 		/* We exit without committing the mtr to prevent
247 		its modifications to the database getting to disk */
248 
249 		return(false);
250 	}
251 
252 	fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
253 	prev_page_no = 0;
254 
255 	for (i = 0; i < 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
256 		     + FSP_EXTENT_SIZE / 2; i++) {
257 		new_block = fseg_alloc_free_page(
258 			fseg_header, prev_page_no + 1, FSP_UP, &mtr);
259 		if (new_block == NULL) {
260 			ib::error() << "Cannot create doublewrite buffer: "
261 				" you must increase your tablespace size."
262 				" Cannot continue operation.";
263 
264 			return(false);
265 		}
266 
267 		/* We read the allocated pages to the buffer pool;
268 		when they are written to disk in a flush, the space
269 		id and page number fields are also written to the
270 		pages. When we at database startup read pages
271 		from the doublewrite buffer, we know that if the
272 		space id and page number in them are the same as
273 		the page position in the tablespace, then the page
274 		has not been written to in doublewrite. */
275 
276 		ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
277 		page_no = new_block->page.id.page_no();
278 
279 		if (i == FSP_EXTENT_SIZE / 2) {
280 			ut_a(page_no == FSP_EXTENT_SIZE);
281 			mlog_write_ulint(doublewrite
282 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
283 					 page_no, MLOG_4BYTES, &mtr);
284 			mlog_write_ulint(doublewrite
285 					 + TRX_SYS_DOUBLEWRITE_REPEAT
286 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
287 					 page_no, MLOG_4BYTES, &mtr);
288 
289 		} else if (i == FSP_EXTENT_SIZE / 2
290 			   + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
291 			ut_a(page_no == 2 * FSP_EXTENT_SIZE);
292 			mlog_write_ulint(doublewrite
293 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
294 					 page_no, MLOG_4BYTES, &mtr);
295 			mlog_write_ulint(doublewrite
296 					 + TRX_SYS_DOUBLEWRITE_REPEAT
297 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
298 					 page_no, MLOG_4BYTES, &mtr);
299 
300 		} else if (i > FSP_EXTENT_SIZE / 2) {
301 			ut_a(page_no == prev_page_no + 1);
302 		}
303 
304 		if (((i + 1) & 15) == 0) {
305 			/* rw_locks can only be recursively x-locked
306 			2048 times. (on 32 bit platforms,
307 			(lint) 0 - (X_LOCK_DECR * 2049)
308 			is no longer a negative number, and thus
309 			lock_word becomes like a shared lock).
310 			For 4k page size this loop will
311 			lock the fseg header too many times. Since
312 			this code is not done while any other threads
313 			are active, restart the MTR occasionally. */
314 			mtr_commit(&mtr);
315 			mtr_start(&mtr);
316 			doublewrite = buf_dblwr_get(&mtr);
317 			fseg_header = doublewrite
318 				      + TRX_SYS_DOUBLEWRITE_FSEG;
319 		}
320 
321 		prev_page_no = page_no;
322 	}
323 
324 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
325 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
326 			 MLOG_4BYTES, &mtr);
327 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
328 			 + TRX_SYS_DOUBLEWRITE_REPEAT,
329 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
330 			 MLOG_4BYTES, &mtr);
331 
332 	mlog_write_ulint(doublewrite
333 			 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
334 			 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
335 			 MLOG_4BYTES, &mtr);
336 	mtr_commit(&mtr);
337 
338 	/* Flush the modified pages to disk and make a checkpoint */
339 	log_make_checkpoint_at(LSN_MAX, TRUE);
340 
341 	/* Remove doublewrite pages from LRU */
342 	buf_pool_invalidate();
343 
344 	ib::info() <<  "Doublewrite buffer created";
345 
346 	goto start_again;
347 }
348 
349 /** Compute the path to the parallel doublewrite buffer, if not already done */
350 MY_ATTRIBUTE((warn_unused_result))
351 static
352 dberr_t
buf_parallel_dblwr_make_path(void)353 buf_parallel_dblwr_make_path(void)
354 {
355 	if (parallel_dblwr_buf.path)
356 		return(DB_SUCCESS);
357 
358 	char path[FN_REFLEN + 1 /* OS_PATH_SEPARATOR */];
359 	const char *dir = NULL;
360 
361 	ut_ad(srv_parallel_doublewrite_path);
362 
363 	if (is_absolute_path(srv_parallel_doublewrite_path)) {
364 
365 		my_strncpy_trunc(path, srv_parallel_doublewrite_path, sizeof(path));
366 	} else {
367 
368 		/* A relative path to the parallel doublewrite file is based
369 		either on srv_data_home, either mysql data directory if the
370 		former is empty. */
371 		dir = srv_data_home[0] ? srv_data_home
372 			: fil_path_to_mysql_datadir;
373 		if (dir[strlen(dir) - 1] == OS_PATH_SEPARATOR) {
374 
375 			ut_snprintf(path, sizeof(path), "%s%s",
376 				    dir,
377 				    srv_parallel_doublewrite_path);
378 		} else {
379 
380 			ut_snprintf(path, sizeof(path), "%s%c%s",
381 				    dir,
382 				    OS_PATH_SEPARATOR,
383 				    srv_parallel_doublewrite_path);
384 		}
385 	}
386 
387 	os_file_type_t	type;
388 	bool		exists = false;
389 	bool		ret;
390 
391 	ret = os_file_status(path, &exists, &type);
392 
393 	/* For realpath() to succeed the file must exist. */
394 
395 	if (ret && exists) {
396 		if (my_realpath(path, path, MY_WME) != 0) {
397 
398 			return(DB_ERROR);
399 		}
400 		if (type != OS_FILE_TYPE_FILE) {
401 			ib::error() << "Parallel doublewrite path "
402 				    << path << " must point to a regular "
403 				"file";
404 			return(DB_WRONG_FILE_NAME);
405 		}
406 	} else if (!is_absolute_path(srv_parallel_doublewrite_path)) {
407 		/* If it does not exist, and is not an absolute path, then
408 		resolve only the directory part and append
409 		srv_parallel_doublewrite_path to it. */
410 		char	dir_full[FN_REFLEN];
411 
412 		if (my_realpath(dir_full, dir, MY_WME) != 0) {
413 
414 			return(DB_ERROR);
415 		}
416 
417 		if (dir_full[strlen(dir_full) - 1] == OS_PATH_SEPARATOR) {
418 
419 			ut_snprintf(path, sizeof(path), "%s%s",
420 				    dir_full,
421 				    srv_parallel_doublewrite_path);
422 		} else {
423 
424 			ut_snprintf(path, sizeof(path), "%s%c%s",
425 				    dir_full,
426 				    OS_PATH_SEPARATOR,
427 				    srv_parallel_doublewrite_path);
428 		}
429 	}
430 
431 	parallel_dblwr_buf.path = mem_strdup(path);
432 
433 	return(parallel_dblwr_buf.path ? DB_SUCCESS : DB_OUT_OF_MEMORY);
434 }
435 
436 /** Close the parallel doublewrite buffer file */
437 static
438 void
buf_parallel_dblwr_close(void)439 buf_parallel_dblwr_close(void)
440 {
441 	if (!parallel_dblwr_buf.file.is_closed()) {
442 		os_file_close(parallel_dblwr_buf.file);
443 		parallel_dblwr_buf.file.set_closed();
444 	}
445 }
446 
447 /** Maximum possible parallel doublewrite buffer file size in bytes */
448 #define MAX_DOUBLEWRITE_FILE_SIZE \
449 	((MAX_DOUBLEWRITE_BATCH_SIZE) * (MAX_DBLWR_SHARDS) * (UNIV_PAGE_SIZE))
450 
451 /**
452 At database startup initializes the doublewrite buffer memory structure if
453 we already have a doublewrite buffer created in the data files. If we are
454 upgrading to an InnoDB version which supports multiple tablespaces, then this
455 function performs the necessary update operations. If we are in a crash
456 recovery, this function loads the pages from double write buffer into memory.
457 @param[in]	file		File handle
458 @param[in]	path		Path name of file
459 @return DB_SUCCESS or error code */
460 dberr_t
buf_dblwr_init_or_load_pages(pfs_os_file_t file,const char * path)461 buf_dblwr_init_or_load_pages(
462 	pfs_os_file_t	file,
463 	const char*	path)
464 {
465 	byte*		buf;
466 	byte*		page;
467 	ulint		block1;
468 	ulint		block2;
469 	ulint		space_id;
470 	byte*		read_buf;
471 	byte*		doublewrite;
472 	byte*		unaligned_read_buf;
473 	ibool		reset_space_ids = FALSE;
474 	recv_dblwr_t&	recv_dblwr = recv_sys->dblwr;
475 
476 	if (srv_read_only_mode) {
477 
478 		ib::info() << "Skipping doublewrite buffer processing due to "
479 			"InnoDB running in read only mode";
480 		return(DB_SUCCESS);
481 	}
482 
483 	/* We do the file i/o past the buffer pool */
484 
485 	unaligned_read_buf = static_cast<byte*>(
486 		ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
487 
488 	read_buf = static_cast<byte*>(
489 		ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
490 
491 	/* Read the trx sys header to check if we are using the doublewrite
492 	buffer */
493 	dberr_t		err;
494 
495 	IORequest	read_request(IORequest::READ);
496 
497 	read_request.disable_compression();
498 
499 	err = os_file_read(
500 		read_request,
501 		file, read_buf, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE,
502 		UNIV_PAGE_SIZE);
503 
504 	if (err != DB_SUCCESS) {
505 
506 		ib::error()
507 			<< "Failed to read the system tablespace header page";
508 
509 		ut_free(unaligned_read_buf);
510 
511 		return(err);
512 	}
513 
514 	doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
515 
516 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
517 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
518 		/* The doublewrite buffer has been created */
519 
520 		buf_dblwr_init(doublewrite);
521 
522 		block1 = buf_dblwr->block1;
523 		block2 = buf_dblwr->block2;
524 
525 		buf = buf_dblwr->write_buf;
526 	} else {
527 		ut_free(unaligned_read_buf);
528 		return(DB_SUCCESS);
529 	}
530 
531 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
532 	    != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
533 
534 		/* We are upgrading from a version < 4.1.x to a version where
535 		multiple tablespaces are supported. We must reset the space id
536 		field in the pages in the doublewrite buffer because starting
537 		from this version the space id is stored to
538 		FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
539 
540 		reset_space_ids = TRUE;
541 
542 		ib::info() << "Resetting space id's in the doublewrite buffer";
543 	}
544 
545 	/* Read the pages from the doublewrite buffer to memory */
546 	err = os_file_read(
547 		read_request,
548 		file, buf, block1 * UNIV_PAGE_SIZE,
549 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
550 
551 	if (err != DB_SUCCESS) {
552 
553 		ib::error()
554 			<< "Failed to read the first double write buffer "
555 			"extent";
556 
557 		ut_free(unaligned_read_buf);
558 
559 		return(err);
560 	}
561 
562 	err = os_file_read(
563 		read_request,
564 		file,
565 		buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
566 		block2 * UNIV_PAGE_SIZE,
567 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
568 
569 	if (err != DB_SUCCESS) {
570 
571 		ib::error()
572 			<< "Failed to read the second double write buffer "
573 			"extent";
574 
575 		ut_free(unaligned_read_buf);
576 
577 		return(err);
578 	}
579 
580 	/* Check if any of these pages is half-written in data files, in the
581 	intended position */
582 
583 	page = buf;
584 
585 	for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
586 		if (reset_space_ids) {
587 			ulint source_page_no;
588 
589 			space_id = 0;
590 			mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
591 					space_id);
592 			/* We do not need to calculate new checksums for the
593 			pages because the field .._SPACE_ID does not affect
594 			them. Write the page back to where we read it from. */
595 
596 			if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
597 				source_page_no = block1 + i;
598 			} else {
599 				source_page_no = block2
600 					+ i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
601 			}
602 
603 			IORequest	write_request(IORequest::WRITE);
604 
605 			/* Recovered data file pages are written out
606 			as uncompressed. */
607 
608 			write_request.disable_compression();
609 
610 			err = os_file_write(
611 				write_request, path, file, page,
612 				source_page_no * UNIV_PAGE_SIZE,
613 				UNIV_PAGE_SIZE);
614 
615 			if (err != DB_SUCCESS) {
616 
617 				ib::error()
618 					<< "Failed to write to the double write"
619 					" buffer";
620 
621 				ut_free(unaligned_read_buf);
622 
623 				return(err);
624 			}
625 
626 		} else {
627 			recv_dblwr.add_to_sys(page);
628 		}
629 
630 		page += univ_page_size.physical();
631 	}
632 
633 	err = buf_parallel_dblwr_make_path();
634 	if (err != DB_SUCCESS) {
635 
636 		ut_free(unaligned_read_buf);
637 		return(err);
638 	}
639 
640 	ut_ad(parallel_dblwr_buf.file.is_closed());
641 	bool success;
642 	parallel_dblwr_buf.file
643 		= os_file_create_simple_no_error_handling(
644 			innodb_parallel_dblwrite_file_key,
645 			parallel_dblwr_buf.path,
646 			OS_FILE_OPEN, OS_FILE_READ_ONLY, true, &success);
647 	if (!success) {
648 		/* We are not supposed to check errno != ENOENT directly, but
649 		os_file_get_last_error will spam error log if it's handled
650 		there. */
651 		if (errno != ENOENT) {
652 			os_file_get_last_error(true);
653 			ib::error()
654 				<< "Failed to open the parallel doublewrite "
655 				"buffer at " << parallel_dblwr_buf.path;
656 			ut_free(unaligned_read_buf);
657 			return(DB_CANNOT_OPEN_FILE);
658 		}
659 		/* Failed to open because the file did not exist: OK */
660 		ib::info() << "Crash recovery did not find the parallel "
661 			"doublewrite buffer at "
662 			   << parallel_dblwr_buf.path;
663 	} else {
664 		/* Cannot possibly be upgrading from 4.1 */
665 		ut_ad(!reset_space_ids);
666 
667 		os_file_set_nocache(parallel_dblwr_buf.file,
668 				    parallel_dblwr_buf.path,
669 				    "open", false);
670 
671 		os_offset_t size = os_file_get_size(parallel_dblwr_buf.file);
672 
673 		if (size > MAX_DOUBLEWRITE_FILE_SIZE) {
674 			ib::error() << "Parallel doublewrite buffer size "
675 				    << size
676 				    << " bytes is larger than the maximum "
677 				"size " << MAX_DOUBLEWRITE_FILE_SIZE
678 				    << " bytes supported by this server "
679 				"version";
680 			buf_parallel_dblwr_close();
681 			ut_free(unaligned_read_buf);
682 			return(DB_CORRUPTION);
683 		}
684 
685 		if (size % UNIV_PAGE_SIZE) {
686 			ib::error() << "Parallel doublewrite buffer size "
687 				    << size << " bytes is not a multiple of "
688 				"a page size "
689 				    << UNIV_PAGE_SIZE << " bytes";
690 			buf_parallel_dblwr_close();
691 			ut_free(unaligned_read_buf);
692 			return(DB_CORRUPTION);
693 		}
694 
695 		if (size == 0) {
696 			ib::info()
697 				<< "Parallel doublewrite buffer is zero-sized";
698 			buf_parallel_dblwr_close();
699 			ut_free(unaligned_read_buf);
700 			return(DB_SUCCESS);
701 		}
702 
703 		ib::info() << "Recovering partial pages from the parallel "
704 			"doublewrite buffer at " << parallel_dblwr_buf.path;
705 
706 		parallel_dblwr_buf.recovery_buf_unaligned
707 			= static_cast<byte *>(
708 				ut_malloc(size + UNIV_PAGE_SIZE,
709 					  mem_key_parallel_doublewrite));
710 		if (!parallel_dblwr_buf.recovery_buf_unaligned) {
711 			buf_parallel_dblwr_close();
712 			ut_free(unaligned_read_buf);
713 			return(DB_OUT_OF_MEMORY);
714 		}
715 		byte* recovery_buf = static_cast<byte *>
716 			(ut_align(parallel_dblwr_buf.recovery_buf_unaligned,
717 				  UNIV_PAGE_SIZE));
718 
719 		err = os_file_read(read_request, parallel_dblwr_buf.file,
720 				   recovery_buf, 0, size);
721 		if (err != DB_SUCCESS) {
722 			ib::error() << "Failed to read the parallel "
723 				"doublewrite buffer";
724 			buf_parallel_dblwr_close();
725 			ut_free(unaligned_read_buf);
726 			ut_free(parallel_dblwr_buf.recovery_buf_unaligned);
727 			return(DB_ERROR);
728 		}
729 
730 		byte zero_page[UNIV_PAGE_SIZE_MAX] = {0};
731 		for (page = recovery_buf; page < recovery_buf + size;
732 		     page += UNIV_PAGE_SIZE) {
733 
734 			/* Skip all zero pages */
735 			const ulint	checksum = mach_read_from_4(
736 				page + FIL_PAGE_SPACE_OR_CHKSUM);
737 
738 			if (checksum != 0
739                             || memcmp(page, zero_page, UNIV_PAGE_SIZE) != 0) {
740 				recv_dblwr.add(page);
741 			}
742 		}
743 		buf_parallel_dblwr_close();
744 	}
745 
746 	if (reset_space_ids) {
747 		os_file_flush(file);
748 	}
749 
750 	ut_free(unaligned_read_buf);
751 
752 	return(DB_SUCCESS);
753 }
754 
755 /** Delete the parallel doublewrite file, if its path already has been
756 computed. It is up to the caller to ensure that this called at safe point */
757 void
buf_parallel_dblwr_delete(void)758 buf_parallel_dblwr_delete(void)
759 {
760 	if (parallel_dblwr_buf.path) {
761 
762 		os_file_delete_if_exists(innodb_parallel_dblwrite_file_key,
763 					 parallel_dblwr_buf.path, NULL);
764 	}
765 }
766 
767 /** Release any unused parallel doublewrite pages and free their underlying
768 buffer at the end of crash recovery */
769 void
buf_parallel_dblwr_finish_recovery(void)770 buf_parallel_dblwr_finish_recovery(void)
771 {
772 	recv_sys->dblwr.pages.clear();
773 	ut_free(parallel_dblwr_buf.recovery_buf_unaligned);
774 	parallel_dblwr_buf.recovery_buf_unaligned = NULL;
775 }
776 
777 /** Process and remove the double write buffer pages for all tablespaces. */
778 void
buf_dblwr_process(void)779 buf_dblwr_process(void)
780 {
781 	ulint		page_no_dblwr	= 0;
782 	byte*		read_buf;
783 	byte*		unaligned_read_buf;
784 	recv_dblwr_t&	recv_dblwr	= recv_sys->dblwr;
785 
786 	ut_ad(!srv_read_only_mode);
787 
788 	unaligned_read_buf = static_cast<byte*>(
789 		ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
790 
791 	read_buf = static_cast<byte*>(
792 		ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
793 
794 	for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
795 	     i != recv_dblwr.pages.end();
796 	     ++i, ++page_no_dblwr) {
797 
798 		byte*		page		= *i;
799 		ulint		page_no		= page_get_page_no(page);
800 		ulint		space_id	= page_get_space_id(page);
801 
802 		fil_space_t*	space = fil_space_get(space_id);
803 
804 		if (space == NULL) {
805 			/* Maybe we have dropped the tablespace
806 			and this page once belonged to it: do nothing */
807 			continue;
808 		}
809 
810 		fil_space_open_if_needed(space);
811 
812 		if (page_no >= space->size) {
813 
814 			/* Do not report the warning if the tablespace is
815 			schedule for truncate or was truncated and we have live
816 			MLOG_TRUNCATE record in redo. */
817 			bool	skip_warning =
818 				srv_is_tablespace_truncated(space_id)
819 				|| srv_was_tablespace_truncated(space);
820 
821 			if (!skip_warning) {
822 				ib::warn() << "Page " << page_no_dblwr
823 					<< " in the doublewrite buffer is"
824 					" not within space bounds: page "
825 					<< page_id_t(space_id, page_no);
826 			}
827 		} else {
828 			const page_size_t	page_size(space->flags);
829 			const page_id_t		page_id(space_id, page_no);
830 
831 			/* We want to ensure that for partial reads the
832 			unread portion of the page is NUL. */
833 			memset(read_buf, 0x0, page_size.physical());
834 
835 			IORequest	request;
836 
837 			request.dblwr_recover();
838 
839 			/* Read in the actual page from the file */
840 			dberr_t	err = fil_io(
841 				request, true,
842 				page_id, page_size,
843 				0, page_size.physical(), read_buf, NULL);
844 
845 			if (err != DB_SUCCESS) {
846 
847 				ib::warn()
848 					<< "Double write buffer recovery: "
849 					<< page_id << " read failed with "
850 					<< "error: " << ut_strerr(err);
851 			}
852 
853 			/* Check if the page is corrupt */
854 			if (buf_page_is_corrupted(
855 				true, read_buf, page_size,
856 				fsp_is_checksum_disabled(space_id))) {
857 
858 				ib::info() << "Database page corruption or"
859 					<< " a failed file read of page "
860 					<< page_id
861 					<< ". Trying to recover it from the"
862 					<< " doublewrite buffer.";
863 
864 				dberr_t	err = DB_SUCCESS;
865 
866 				if (space->crypt_data == NULL) // if it was crypt_data encrypted it was already decrypted
867 					err = os_dblwr_decrypt_page(
868 					space, page);
869 
870 				if (err != DB_SUCCESS || buf_page_is_corrupted(
871 					true, page, page_size,
872 					fsp_is_checksum_disabled(space_id))) {
873 
874 					ib::error() << "Dump of the page:";
875 					buf_page_print(
876 						read_buf, page_size,
877 						BUF_PAGE_PRINT_NO_CRASH);
878 					ib::error() << "Dump of corresponding"
879 						" page in doublewrite buffer:";
880 
881 					buf_page_print(
882 						page, page_size,
883 						BUF_PAGE_PRINT_NO_CRASH);
884 
885 					ib::fatal() << "The page in the"
886 						" doublewrite buffer is"
887 						" corrupt. Cannot continue"
888 						" operation. You can try to"
889 						" recover the database with"
890 						" innodb_force_recovery=6";
891 				}
892 			} else if (buf_page_is_zeroes(read_buf, page_size)
893 				   && !buf_page_is_zeroes(page, page_size)
894 				   && !buf_page_is_corrupted(
895 					true, page, page_size,
896 					fsp_is_checksum_disabled(space_id))) {
897 
898 				/* Database page contained only zeroes, while
899 				a valid copy is available in dblwr buffer. */
900 
901 			} else {
902 
903 				bool t1 = buf_page_is_zeroes(
904                                         read_buf, page_size);
905 
906 				bool t2 = buf_page_is_zeroes(page, page_size);
907 
908 				bool t3 = buf_page_is_corrupted(
909 					true, page, page_size,
910 					fsp_is_checksum_disabled(space_id));
911 
912 				if (t1 && !(t2 || t3)) {
913 
914 					/* Database page contained only
915 					zeroes, while a valid copy is
916 					available in dblwr buffer. */
917 
918 				} else {
919 					continue;
920 				}
921 			}
922 
923 			/* Recovered data file pages are written out
924 			as uncompressed. */
925 
926 			IORequest	write_request(IORequest::WRITE);
927 
928 			write_request.disable_compression();
929 
930 			/* Write the good page from the doublewrite
931 			buffer to the intended position. */
932 
933 			fil_io(write_request, true,
934 			       page_id, page_size,
935 			       0, page_size.physical(),
936 			       const_cast<byte*>(page), NULL);
937 
938 			ib::info()
939 				<< "Recovered page "
940 				<< page_id
941 				<< " from the doublewrite buffer.";
942 		}
943 	}
944 
945 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
946 	ut_free(unaligned_read_buf);
947 
948 	buf_parallel_dblwr_finish_recovery();
949 
950 	/* If parallel doublewrite buffer was used, now it's safe to
951 	delete and re-create it. */
952 	buf_parallel_dblwr_delete();
953 	if (buf_parallel_dblwr_create() != DB_SUCCESS) {
954 		ib::fatal()
955 			<< "Creating the parallel doublewrite buffer failed";
956 	}
957 }
958 
959 /****************************************************************//**
960 Frees doublewrite buffer. */
961 void
buf_dblwr_free(void)962 buf_dblwr_free(void)
963 /*================*/
964 {
965 	/* Free the double write data structures. */
966 	ut_a(buf_dblwr != NULL);
967 	ut_ad(buf_dblwr->s_reserved == 0);
968 
969 	os_event_destroy(buf_dblwr->s_event);
970 	ut_free(buf_dblwr->write_buf_unaligned);
971 	buf_dblwr->write_buf_unaligned = NULL;
972 
973 	ut_free(buf_dblwr->buf_block_arr);
974 	buf_dblwr->buf_block_arr = NULL;
975 
976 	ut_free(buf_dblwr->in_use);
977 	buf_dblwr->in_use = NULL;
978 
979 	mutex_free(&buf_dblwr->mutex);
980 	ut_free(buf_dblwr);
981 	buf_dblwr = NULL;
982 }
983 
984 /********************************************************************//**
985 Updates the doublewrite buffer when an IO request is completed. */
986 void
buf_dblwr_update(const buf_page_t * bpage,buf_flush_t flush_type)987 buf_dblwr_update(
988 /*=============*/
989 	const buf_page_t*	bpage,	/*!< in: buffer block descriptor */
990 	buf_flush_t		flush_type)/*!< in: flush type */
991 {
992 	if (!srv_use_doublewrite_buf
993 	    || buf_dblwr == NULL
994 	    || fsp_is_system_temporary(bpage->id.space())) {
995 		return;
996 	}
997 
998 	ut_ad(!srv_read_only_mode);
999 
1000 	switch (flush_type) {
1001 	case BUF_FLUSH_LIST:
1002 	case BUF_FLUSH_LRU:
1003 		{
1004 			ulint i = buf_parallel_dblwr_partition(bpage,
1005 							       flush_type);
1006 			struct parallel_dblwr_shard_t* dblwr_shard
1007 				= &parallel_dblwr_buf.shard[i];
1008 
1009 			ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1010 
1011 			if (os_atomic_decrement_ulint(&dblwr_shard->batch_size,
1012 						      1)
1013 			    == 0) {
1014 
1015 				/* The last page from the doublewrite batch. */
1016 				os_event_set(dblwr_shard->batch_completed);
1017 			}
1018 
1019 			break;
1020 		}
1021 	case BUF_FLUSH_SINGLE_PAGE:
1022 		{
1023 			const ulint size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1024 			ulint i;
1025 			mutex_enter(&buf_dblwr->mutex);
1026 			for (i = 0; i < size; ++i) {
1027 				if (buf_dblwr->buf_block_arr[i] == bpage) {
1028 					buf_dblwr->s_reserved--;
1029 					buf_dblwr->buf_block_arr[i] = NULL;
1030 					buf_dblwr->in_use[i] = false;
1031 					break;
1032 				}
1033 			}
1034 
1035 			/* The block we are looking for must exist as a
1036 			reserved block. */
1037 			ut_a(i < size);
1038 		}
1039 		os_event_set(buf_dblwr->s_event);
1040 		mutex_exit(&buf_dblwr->mutex);
1041 		break;
1042 	case BUF_FLUSH_N_TYPES:
1043 		ut_error;
1044 	}
1045 }
1046 
1047 /********************************************************************//**
1048 Check the LSN values on the page. */
1049 static
1050 void
buf_dblwr_check_page_lsn(const page_t * page)1051 buf_dblwr_check_page_lsn(
1052 /*=====================*/
1053 	const page_t*	page)		/*!< in: page to check */
1054 {
1055 	if (memcmp(page + (FIL_PAGE_LSN + 4),
1056 		   page + (UNIV_PAGE_SIZE
1057 			   - FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
1058 		   4)) {
1059 
1060 		const ulint	lsn1 = mach_read_from_4(
1061 			page + FIL_PAGE_LSN + 4);
1062 		const ulint	lsn2 = mach_read_from_4(
1063 			page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM
1064 			+ 4);
1065 
1066 		ib::error() << "The page to be written seems corrupt!"
1067 			" The low 4 bytes of LSN fields do not match"
1068 			" (" << lsn1 << " != " << lsn2 << ")!"
1069 			" Noticed in the buffer pool.";
1070 	}
1071 }
1072 
1073 /********************************************************************//**
1074 Asserts when a corrupt block is find during writing out data to the
1075 disk. */
1076 static
1077 void
buf_dblwr_assert_on_corrupt_block(const buf_block_t * block)1078 buf_dblwr_assert_on_corrupt_block(
1079 /*==============================*/
1080 	const buf_block_t*	block)	/*!< in: block to check */
1081 {
1082 	buf_page_print(block->frame, univ_page_size, BUF_PAGE_PRINT_NO_CRASH);
1083 
1084 	ib::fatal() << "Apparent corruption of an index page "
1085 		<< block->page.id
1086 		<< " to be written to data file. We intentionally crash"
1087 		" the server to prevent corrupt data from ending up in"
1088 		" data files.";
1089 }
1090 
1091 /********************************************************************//**
1092 Check the LSN values on the page with which this block is associated.
1093 Also validate the page if the option is set. */
1094 static
1095 void
buf_dblwr_check_block(const buf_block_t * block)1096 buf_dblwr_check_block(
1097 /*==================*/
1098 	const buf_block_t*	block)	/*!< in: block to check */
1099 {
1100 	ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
1101 
1102 	if (block->skip_flush_check) {
1103 		return;
1104 	}
1105 
1106 	switch (fil_page_get_type(block->frame)) {
1107 	case FIL_PAGE_INDEX:
1108 	case FIL_PAGE_RTREE:
1109 		if (page_is_comp(block->frame)) {
1110 			if (page_simple_validate_new(block->frame)) {
1111 				return;
1112 			}
1113 		} else if (page_simple_validate_old(block->frame)) {
1114 			return;
1115 		}
1116 		/* While it is possible that this is not an index page
1117 		but just happens to have wrongly set FIL_PAGE_TYPE,
1118 		such pages should never be modified to without also
1119 		adjusting the page type during page allocation or
1120 		buf_flush_init_for_writing() or fil_page_reset_type(). */
1121 		break;
1122 	case FIL_PAGE_TYPE_FSP_HDR:
1123 	case FIL_PAGE_IBUF_BITMAP:
1124 	case FIL_PAGE_TYPE_UNKNOWN:
1125 		/* Do not complain again, we already reset this field. */
1126 	case FIL_PAGE_UNDO_LOG:
1127 	case FIL_PAGE_INODE:
1128 	case FIL_PAGE_IBUF_FREE_LIST:
1129 	case FIL_PAGE_TYPE_SYS:
1130 	case FIL_PAGE_TYPE_TRX_SYS:
1131 	case FIL_PAGE_TYPE_XDES:
1132 	case FIL_PAGE_TYPE_BLOB:
1133 	case FIL_PAGE_TYPE_ZBLOB:
1134 	case FIL_PAGE_TYPE_ZBLOB2:
1135 		/* TODO: validate also non-index pages */
1136 		return;
1137 	case FIL_PAGE_TYPE_ALLOCATED:
1138 		/* empty pages could be flushed by encryption threads */
1139 		return;
1140 	}
1141 
1142 	buf_dblwr_assert_on_corrupt_block(block);
1143 }
1144 
1145 /********************************************************************//**
1146 Writes a page that has already been written to the doublewrite buffer
1147 to the datafile. It is the job of the caller to sync the datafile. */
1148 static
1149 void
buf_dblwr_write_block_to_datafile(const buf_page_t * bpage,bool sync)1150 buf_dblwr_write_block_to_datafile(
1151 /*==============================*/
1152 	const buf_page_t*	bpage,	/*!< in: page to write */
1153 	bool			sync)	/*!< in: true if sync IO
1154 					is requested */
1155 {
1156 	ut_a(buf_page_in_file(bpage));
1157 
1158 	ulint	type = IORequest::WRITE;
1159 
1160 	if (sync) {
1161 		type |= IORequest::DO_NOT_WAKE;
1162 	}
1163 
1164 	IORequest	request(type);
1165 
1166 	if (bpage->zip.data != NULL) {
1167 		ut_ad(bpage->size.is_compressed());
1168 
1169 		fil_io(request, sync, bpage->id, bpage->size, 0,
1170 		       bpage->size.physical(),
1171 		       (void*) bpage->zip.data,
1172 		       (void*) bpage);
1173 	} else {
1174 		ut_ad(!bpage->size.is_compressed());
1175 
1176 		/* Our IO API is common for both reads and writes and is
1177 		therefore geared towards a non-const parameter. */
1178 
1179 		buf_block_t*	block = reinterpret_cast<buf_block_t*>(
1180 			const_cast<buf_page_t*>(bpage));
1181 
1182 		ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
1183 		buf_dblwr_check_page_lsn(block->frame);
1184 
1185 		fil_io(request,
1186 		       sync, bpage->id, bpage->size, 0, bpage->size.physical(),
1187 		       block->frame, block);
1188 	}
1189 }
1190 
1191 /** Encrypt a page in doublewerite buffer shard. The page is
1192 encrypted using its tablespace key.
1193 @param[in]	block		the buffer pool block for the page
1194 @param[in,out]	dblwr_page	in: unencrypted page
1195 				out: encrypted page (if tablespace is
1196 				encrypted */
1197 static
1198 void
buf_dblwr_encrypt_page(const buf_block_t * block,page_t * dblwr_page)1199 buf_dblwr_encrypt_page(
1200 	const buf_block_t*	block,
1201 	page_t*			dblwr_page)
1202 {
1203 	const ulint	space_id = block->page.id.space();
1204 	fil_space_t*	space = fil_space_acquire_silent(space_id);
1205 
1206 	if (space == NULL) {
1207 		/* Tablespace dropped */
1208 		return;
1209 	}
1210 
1211 	byte*		encrypted_buf = static_cast<byte*>(
1212 		ut_zalloc_nokey(UNIV_PAGE_SIZE));
1213 	ut_a(encrypted_buf != NULL);
1214 
1215 	const page_size_t	page_size(space->flags);
1216 	const bool 	success = os_dblwr_encrypt_page(
1217 		space, dblwr_page, encrypted_buf, UNIV_PAGE_SIZE);
1218 
1219 	if (success) {
1220 		memcpy(dblwr_page, encrypted_buf, page_size.physical());
1221 	}
1222 
1223 	ut_free(encrypted_buf);
1224 
1225 	fil_space_release(space);
1226 }
1227 
1228 /* Disable encryption of Page 0 of any tablespace or if it is system
1229 tablespace, do not encrypt pages upto TRX_SYS_PAGE_NO (including).
1230 TRX_SYS_PAGE should be not encrypted because dblwr buffer is found
1231 from this page
1232 @param[in]	block	buffer block
1233 @return true if encryption should be disabled for the block, else flase */
1234 static
1235 bool
buf_dblwr_disable_encryption(const buf_block_t * block)1236 buf_dblwr_disable_encryption(
1237 	const buf_block_t*	block)
1238 {
1239 	return(block->page.id.page_no() == 0
1240 	       || (block->page.id.space() == TRX_SYS_SPACE
1241 		   && block->page.id.page_no() <= TRX_SYS_PAGE_NO));
1242 }
1243 
1244 /********************************************************************//**
1245 Flushes possible buffered writes from the specified partition of the
1246 doublewrite memory buffer to disk, and also wakes up the aio thread if
1247 simulated aio is used. It is very important to call this function after a batch
1248 of writes has been posted, and also when we may have to wait for a page latch!
1249 Otherwise a deadlock of threads can occur. */
1250 void
buf_dblwr_flush_buffered_writes(ulint dblwr_partition)1251 buf_dblwr_flush_buffered_writes(
1252 /*============================*/
1253 	ulint dblwr_partition)	/*!< in: doublewrite partition */
1254 {
1255 	byte*		write_buf;
1256 	ulint		len;
1257 
1258 	ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1259 
1260 	if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
1261 		/* Sync the writes to the disk. */
1262 		buf_dblwr_sync_datafiles();
1263 		return;
1264 	}
1265 
1266 	ut_ad(!srv_read_only_mode);
1267 
1268 	struct parallel_dblwr_shard_t* dblwr_shard
1269 		= &parallel_dblwr_buf.shard[dblwr_partition];
1270 
1271 	/* Write first to doublewrite buffer blocks. We use synchronous
1272 	aio and thus know that file write has been completed when the
1273 	control returns. */
1274 
1275 	if (dblwr_shard->first_free == 0) {
1276 
1277 		/* Wake possible simulated aio thread as there could be
1278 		system temporary tablespace pages active for flushing.
1279 		Note: system temporary tablespace pages are not scheduled
1280 		for doublewrite. */
1281 		os_aio_simulated_wake_handler_threads();
1282 
1283 		return;
1284 	}
1285 
1286 	write_buf = dblwr_shard->write_buf;
1287 
1288 	const bool	encrypt_parallel_dblwr = srv_parallel_dblwr_encrypt;
1289 
1290 	for (ulint len2 = 0, i = 0;
1291 	     i < dblwr_shard->first_free;
1292 	     len2 += UNIV_PAGE_SIZE, i++) {
1293 
1294 		const buf_block_t*	block;
1295 
1296 		block = (buf_block_t*)dblwr_shard->buf_block_arr[i];
1297 
1298 		page_t*	dblwr_page = write_buf + len2;
1299 
1300 		if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
1301 		    || block->page.zip.data) {
1302 			/* No simple validate for compressed
1303 			pages exists. */
1304 			continue;
1305 		}
1306 
1307 		/* Check that the actual page in the buffer pool is
1308 		not corrupt and the LSN values are sane. */
1309 		buf_dblwr_check_block(block);
1310 
1311 		/* Check that the page as written to the doublewrite
1312 		buffer has sane LSN values. */
1313 		buf_dblwr_check_page_lsn(dblwr_page);
1314 
1315 		// it can be already encrypted by encryption threads
1316 		FilSpace space (TRX_SYS_SPACE);
1317 		if (encrypt_parallel_dblwr && space()->crypt_data == NULL
1318 		    && !buf_dblwr_disable_encryption(block)) {
1319 			buf_dblwr_encrypt_page(block, dblwr_page);
1320 		}
1321 	}
1322 
1323 	len = dblwr_shard->first_free * UNIV_PAGE_SIZE;
1324 
1325 	/* Find our part of the doublewrite buffer */
1326 	os_offset_t file_pos = dblwr_partition
1327 		* srv_doublewrite_batch_size * UNIV_PAGE_SIZE;
1328 	IORequest io_req(IORequest::WRITE | IORequest::NO_COMPRESSION);
1329 
1330 #ifdef UNIV_DEBUG
1331 	/* The file size must not increase */
1332 	os_offset_t desired_size = srv_doublewrite_batch_size * UNIV_PAGE_SIZE
1333 		* buf_parallel_dblwr_shard_num();
1334 	os_offset_t actual_size = os_file_get_size(parallel_dblwr_buf.file);
1335 	ut_ad(desired_size == actual_size);
1336 	ut_ad(file_pos + len <= actual_size);
1337 	/* We must not touch neighboring buffers */
1338 	ut_ad(file_pos + len <= (dblwr_partition + 1)
1339 	      * srv_doublewrite_batch_size * UNIV_PAGE_SIZE);
1340 #endif
1341 
1342 	dberr_t err = os_file_write(io_req, parallel_dblwr_buf.path,
1343 				    parallel_dblwr_buf.file, write_buf,
1344 				    file_pos, len);
1345 	if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
1346 		ib::fatal() << "Parallel doublewrite buffer write failed, "
1347 			"crashing the server to avoid data loss";
1348 	}
1349 
1350 	ut_ad(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1351 
1352 	/* increment the doublewrite flushed pages counter */
1353 	srv_stats.dblwr_pages_written.add(dblwr_shard->first_free);
1354 	srv_stats.dblwr_writes.inc();
1355 
1356 	if (parallel_dblwr_buf.needs_flush)
1357 		os_file_flush(parallel_dblwr_buf.file);
1358 
1359 	/* We know that the writes have been flushed to disk now
1360 	and in recovery we will find them in the doublewrite buffer
1361 	blocks. Next do the writes to the intended positions. */
1362 
1363 	dblwr_shard->batch_size = dblwr_shard->first_free;
1364 	os_wmb;
1365 
1366 	for (ulint i = 0; i < dblwr_shard->first_free; i++) {
1367 		buf_dblwr_write_block_to_datafile(
1368 			dblwr_shard->buf_block_arr[i], false);
1369 	}
1370 
1371 	/* Wake possible simulated aio thread to actually post the
1372 	writes to the operating system. We don't flush the files
1373 	at this point. We leave it to the IO helper thread to flush
1374 	datafiles when the whole batch has been processed. */
1375 	os_aio_simulated_wake_handler_threads();
1376 
1377 	os_event_wait(dblwr_shard->batch_completed);
1378 	os_event_reset(dblwr_shard->batch_completed);
1379 
1380 #ifdef UNIV_DEBUG
1381 	os_rmb;
1382 	ut_ad(dblwr_shard->batch_size == 0);
1383 #endif
1384 	dblwr_shard->first_free = 0;
1385 
1386 	/* This will finish the batch. Sync data files
1387 	to the disk. */
1388 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
1389 }
1390 
1391 /********************************************************************//**
1392 Posts a buffer page for writing. If the doublewrite memory buffer is
1393 full, calls buf_dblwr_flush_buffered_writes and waits for for free
1394 space to appear. */
1395 void
buf_dblwr_add_to_batch(buf_page_t * bpage,buf_flush_t flush_type)1396 buf_dblwr_add_to_batch(
1397 /*====================*/
1398 	buf_page_t*	bpage,	/*!< in: buffer block to write */
1399 	buf_flush_t	flush_type)/*!< in: BUF_FLUSH_LRU or BUF_FLUSH_LIST */
1400 {
1401 	ut_ad(flush_type == BUF_FLUSH_LRU || flush_type == BUF_FLUSH_LIST);
1402 	ut_a(buf_page_in_file(bpage));
1403 	ut_ad(!mutex_own(&buf_pool_from_bpage(bpage)->LRU_list_mutex));
1404 
1405 	ulint dblwr_partition = buf_parallel_dblwr_partition(bpage,
1406 							     flush_type);
1407 	struct parallel_dblwr_shard_t* dblwr_shard
1408 		= &parallel_dblwr_buf.shard[dblwr_partition];
1409 
1410 try_again:
1411 	ut_a(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1412 	ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1413 
1414 	if (dblwr_shard->first_free == srv_doublewrite_batch_size) {
1415 
1416 		buf_dblwr_flush_buffered_writes(dblwr_partition);
1417 
1418 		goto try_again;
1419 	}
1420 
1421 	byte*	p = dblwr_shard->write_buf
1422 		+ univ_page_size.physical() * dblwr_shard->first_free;
1423 
1424 	if (bpage->size.is_compressed()) {
1425 		UNIV_MEM_ASSERT_RW(bpage->zip.data, bpage->size.physical());
1426 		/* Copy the compressed page and clear the rest. */
1427 
1428 		memcpy(p, bpage->zip.data, bpage->size.physical());
1429 
1430 		memset(p + bpage->size.physical(), 0x0,
1431 		       univ_page_size.physical() - bpage->size.physical());
1432 	} else {
1433 		ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1434 
1435 		UNIV_MEM_ASSERT_RW(((buf_block_t*) bpage)->frame,
1436 				   bpage->size.logical());
1437 
1438 		memcpy(p, ((buf_block_t*) bpage)->frame, bpage->size.logical());
1439 	}
1440 
1441 	dblwr_shard->buf_block_arr[dblwr_shard->first_free++] = bpage;
1442 
1443 	ut_ad(!os_event_is_set(dblwr_shard->batch_completed));
1444 	ut_ad(dblwr_shard->first_free <= srv_doublewrite_batch_size);
1445 }
1446 
1447 /********************************************************************//**
1448 Writes a page to the doublewrite buffer on disk, sync it, then write
1449 the page to the datafile and sync the datafile. This function is used
1450 for single page flushes. If all the buffers allocated for single page
1451 flushes in the doublewrite buffer are in use we wait here for one to
1452 become free. We are guaranteed that a slot will become free because any
1453 thread that is using a slot must also release the slot before leaving
1454 this function. */
1455 void
buf_dblwr_write_single_page(buf_page_t * bpage,bool sync)1456 buf_dblwr_write_single_page(
1457 /*========================*/
1458 	buf_page_t*	bpage,	/*!< in: buffer block to write */
1459 	bool		sync)	/*!< in: true if sync IO requested */
1460 {
1461 	ulint		size;
1462 	ulint		offset;
1463 	ulint		i;
1464 
1465 	ut_a(buf_page_in_file(bpage));
1466 	ut_a(srv_use_doublewrite_buf);
1467 	ut_a(buf_dblwr != NULL);
1468 
1469 	size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1470 
1471 	if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1472 
1473 		/* Check that the actual page in the buffer pool is
1474 		not corrupt and the LSN values are sane. */
1475 		buf_dblwr_check_block((buf_block_t*) bpage);
1476 
1477 		/* Check that the page as written to the doublewrite
1478 		buffer has sane LSN values. */
1479 		if (!bpage->zip.data) {
1480 			buf_dblwr_check_page_lsn(
1481 				((buf_block_t*) bpage)->frame);
1482 		}
1483 	}
1484 
1485 retry:
1486 	mutex_enter(&buf_dblwr->mutex);
1487 	if (buf_dblwr->s_reserved == size) {
1488 
1489 		/* All slots are reserved. */
1490 		int64_t	sig_count = os_event_reset(buf_dblwr->s_event);
1491 		mutex_exit(&buf_dblwr->mutex);
1492 		os_event_wait_low(buf_dblwr->s_event, sig_count);
1493 
1494 		goto retry;
1495 	}
1496 
1497 	for (i = 0; i < size; ++i) {
1498 
1499 		if (!buf_dblwr->in_use[i]) {
1500 			break;
1501 		}
1502 	}
1503 
1504 	/* We are guaranteed to find a slot. */
1505 	ut_a(i < size);
1506 	buf_dblwr->in_use[i] = true;
1507 	buf_dblwr->s_reserved++;
1508 	buf_dblwr->buf_block_arr[i] = bpage;
1509 
1510 	/* increment the doublewrite flushed pages counter */
1511 	srv_stats.dblwr_pages_written.inc();
1512 	srv_stats.dblwr_writes.inc();
1513 
1514 	mutex_exit(&buf_dblwr->mutex);
1515 
1516 	/* Lets see if we are going to write in the first or second
1517 	block of the doublewrite buffer. */
1518 	if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1519 		offset = buf_dblwr->block1 + i;
1520 	} else {
1521 		offset = buf_dblwr->block2 + i
1522 			 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1523 	}
1524 
1525 	/* We deal with compressed and uncompressed pages a little
1526 	differently here. In case of uncompressed pages we can
1527 	directly write the block to the allocated slot in the
1528 	doublewrite buffer in the system tablespace and then after
1529 	syncing the system table space we can proceed to write the page
1530 	in the datafile.
1531 	In case of compressed page we first do a memcpy of the block
1532 	to the in-memory buffer of doublewrite before proceeding to
1533 	write it. This is so because we want to pad the remaining
1534 	bytes in the doublewrite page with zeros. */
1535 
1536 	IORequest	write_request(IORequest::WRITE);
1537 
1538 	if (buf_dblwr_disable_encryption((buf_block_t*)bpage)) {
1539 		write_request.disable_encryption();
1540 	}
1541 
1542 	if (bpage->size.is_compressed()) {
1543 		memcpy(buf_dblwr->write_buf + univ_page_size.physical() * i,
1544 		       bpage->zip.data, bpage->size.physical());
1545 
1546 		memset(buf_dblwr->write_buf + univ_page_size.physical() * i
1547 		       + bpage->size.physical(), 0x0,
1548 		       univ_page_size.physical() - bpage->size.physical());
1549 
1550 		fil_io(write_request, true,
1551 		       page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1552 		       univ_page_size.physical(),
1553 		       (void*) (buf_dblwr->write_buf
1554 				+ univ_page_size.physical() * i),
1555 		       NULL);
1556 	} else {
1557 		/* It is a regular page. Write it directly to the
1558 		doublewrite buffer */
1559 		fil_io(IORequestWrite, true,
1560 		       page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1561 		       univ_page_size.physical(),
1562 		       (void*) ((buf_block_t*) bpage)->frame,
1563 		       NULL);
1564 	}
1565 
1566 	/* Now flush the doublewrite buffer data to disk */
1567 	fil_flush(TRX_SYS_SPACE);
1568 
1569 	/* We know that the write has been flushed to disk now
1570 	and during recovery we will find it in the doublewrite buffer
1571 	blocks. Next do the write to the intended position. */
1572 	buf_dblwr_write_block_to_datafile(bpage, sync);
1573 }
1574 
1575 /** Compute the size and path of the parallel doublewrite buffer, create it,
1576 and disable OS caching for it
1577 @return DB_SUCCESS or error code */
1578 static MY_ATTRIBUTE((warn_unused_result))
1579 dberr_t
buf_parallel_dblwr_file_create(void)1580 buf_parallel_dblwr_file_create(void)
1581 {
1582 	ut_ad(!srv_read_only_mode);
1583 	/* The buffer size is two doublewrite batches (one for LRU, one for
1584 	flush list flusher) per buffer pool instance. */
1585 	os_offset_t size = srv_doublewrite_batch_size * UNIV_PAGE_SIZE
1586 		* buf_parallel_dblwr_shard_num();
1587 	ut_a(size <= MAX_DOUBLEWRITE_FILE_SIZE);
1588 	ut_a(size > 0);
1589 	ut_a(size % UNIV_PAGE_SIZE == 0);
1590 
1591 	dberr_t err = buf_parallel_dblwr_make_path();
1592 	if (err != DB_SUCCESS)
1593 		return(err);
1594 
1595 	ut_ad(parallel_dblwr_buf.file.is_closed());
1596 	ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1597 
1598 	/* Set O_SYNC if innodb_flush_method == O_DSYNC. */
1599 	ulint o_sync = (srv_unix_file_flush_method == SRV_UNIX_O_DSYNC)
1600 		? OS_FILE_O_SYNC : 0;
1601 
1602 	bool success;
1603 	parallel_dblwr_buf.file
1604 		= os_file_create_simple(innodb_parallel_dblwrite_file_key,
1605 					parallel_dblwr_buf.path,
1606 					OS_FILE_CREATE | o_sync,
1607 					OS_FILE_READ_WRITE, false, &success);
1608 	if (!success) {
1609 		if (os_file_get_last_error(false) == OS_FILE_ALREADY_EXISTS) {
1610 			ib::error() << "A parallel doublewrite file "
1611 				    << parallel_dblwr_buf.path
1612 				    << " found on startup.";
1613 		}
1614 		return(DB_ERROR);
1615 	}
1616 
1617 	const bool o_direct_set
1618 		= os_file_set_nocache(parallel_dblwr_buf.file,
1619 				      parallel_dblwr_buf.path,
1620 				      "create", false);
1621 	switch (srv_unix_file_flush_method) {
1622 	case SRV_UNIX_NOSYNC:
1623 	case SRV_UNIX_O_DSYNC:
1624 	case SRV_UNIX_O_DIRECT_NO_FSYNC:
1625 	case SRV_UNIX_ALL_O_DIRECT:
1626 		parallel_dblwr_buf.needs_flush = !o_direct_set;
1627 		break;
1628 	case SRV_UNIX_FSYNC:
1629 	case SRV_UNIX_LITTLESYNC:
1630 	case SRV_UNIX_O_DIRECT:
1631 		parallel_dblwr_buf.needs_flush = true;
1632 		break;
1633 	}
1634 
1635 	success = os_file_set_size(parallel_dblwr_buf.path,
1636 				   parallel_dblwr_buf.file, size, false);
1637 	if (!success) {
1638 		buf_parallel_dblwr_free(true);
1639 		return(DB_ERROR);
1640 	}
1641 	ut_ad(os_file_get_size(parallel_dblwr_buf.file) == size);
1642 
1643 	ib::info() << "Created parallel doublewrite buffer at "
1644 		   << parallel_dblwr_buf.path << ", size "
1645 		   << os_file_get_size(parallel_dblwr_buf.file) << " bytes";
1646 
1647 	return(DB_SUCCESS);
1648 }
1649 
1650 /** Initialize parallel doublewrite subsystem: create its data structure and
1651 the disk file.
1652 @return DB_SUCCESS or error code */
1653 dberr_t
buf_parallel_dblwr_create(void)1654 buf_parallel_dblwr_create(void)
1655 {
1656 	if (!srv_use_doublewrite_buf) {
1657 		return(DB_SUCCESS);
1658 	}
1659 
1660 	if (!parallel_dblwr_buf.file.is_closed() || srv_read_only_mode) {
1661 
1662 		ut_ad(parallel_dblwr_buf.recovery_buf_unaligned == NULL);
1663 		return(DB_SUCCESS);
1664 	}
1665 
1666 	memset(parallel_dblwr_buf.shard, 0, sizeof(parallel_dblwr_buf.shard));
1667 
1668 	dberr_t err = buf_parallel_dblwr_file_create();
1669 	if (err != DB_SUCCESS) {
1670 		return(err);
1671 	}
1672 
1673 	for (ulint i = 0; i < buf_parallel_dblwr_shard_num(); i++) {
1674 
1675 		struct parallel_dblwr_shard_t* dblwr_shard
1676 			= &parallel_dblwr_buf.shard[i];
1677 
1678 		dblwr_shard->write_buf_unaligned
1679 			= static_cast<byte*>(ut_malloc((1
1680 					  + srv_doublewrite_batch_size)
1681 						       * UNIV_PAGE_SIZE,
1682 					mem_key_parallel_doublewrite));
1683 		if (!dblwr_shard->write_buf_unaligned) {
1684 			buf_parallel_dblwr_free(true);
1685 			return(DB_OUT_OF_MEMORY);
1686 		}
1687 		dblwr_shard->write_buf = static_cast<byte*>(
1688 			ut_align(dblwr_shard->write_buf_unaligned,
1689 				 UNIV_PAGE_SIZE));
1690 		dblwr_shard->buf_block_arr
1691 			= static_cast<buf_page_t**>(
1692 			ut_zalloc(srv_doublewrite_batch_size
1693 				  * sizeof(void*),
1694 				  mem_key_parallel_doublewrite));
1695 		if (!dblwr_shard->buf_block_arr) {
1696 			buf_parallel_dblwr_free(true);
1697 			return(DB_OUT_OF_MEMORY);
1698 		}
1699 
1700 		dblwr_shard->batch_completed
1701 			= os_event_create("parallel_dblwr_batch_completed");
1702 		os_event_reset(dblwr_shard->batch_completed);
1703 	}
1704 
1705 	return(DB_SUCCESS);
1706 }
1707 
1708 /** Cleanup parallel doublewrite memory structures and optionally close and
1709 delete the doublewrite buffer file too.
1710 @param	delete_file	whether to close and delete the buffer file too  */
1711 void
buf_parallel_dblwr_free(bool delete_file)1712 buf_parallel_dblwr_free(bool delete_file)
1713 {
1714 	for (ulint i = 0; i < buf_parallel_dblwr_shard_num(); i++) {
1715 
1716 		struct parallel_dblwr_shard_t* dblwr_shard
1717 			= &parallel_dblwr_buf.shard[i];
1718 
1719 		if (dblwr_shard->write_buf_unaligned
1720 		    && dblwr_shard->buf_block_arr) {
1721 			os_event_destroy(dblwr_shard->batch_completed);
1722 		}
1723 
1724 		ut_free(dblwr_shard->write_buf_unaligned);
1725 		ut_free(dblwr_shard->buf_block_arr);
1726 	}
1727 
1728 	if (delete_file) {
1729 		buf_parallel_dblwr_close();
1730 		buf_parallel_dblwr_delete();
1731 	}
1732 
1733 	ut_free(parallel_dblwr_buf.path);
1734 	parallel_dblwr_buf.path = NULL;
1735 }
1736 
1737 /** The parallel doublewrite buffer */
1738 parallel_dblwr_t parallel_dblwr_buf;
1739 
1740 #endif /* !UNIV_HOTBACKUP */
1741