1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file buf/buf0dblwr.cc
29 Doublwrite buffer module
30 
31 Created 2011/12/19
32 *******************************************************/
33 
34 #include "ha_prototypes.h"
35 #include "buf0dblwr.h"
36 
37 #ifdef UNIV_NONINL
38 #include "buf0buf.ic"
39 #endif
40 
41 #include "buf0buf.h"
42 #include "buf0checksum.h"
43 #include "srv0start.h"
44 #include "srv0srv.h"
45 #include "page0zip.h"
46 #include "trx0sys.h"
47 
48 #ifndef UNIV_HOTBACKUP
49 
50 /** The doublewrite buffer */
51 buf_dblwr_t*	buf_dblwr = NULL;
52 
53 /** Set to TRUE when the doublewrite buffer is being created */
54 ibool	buf_dblwr_being_created = FALSE;
55 
56 /****************************************************************//**
57 Determines if a page number is located inside the doublewrite buffer.
58 @return TRUE if the location is inside the two blocks of the
59 doublewrite buffer */
60 ibool
buf_dblwr_page_inside(ulint page_no)61 buf_dblwr_page_inside(
62 /*==================*/
63 	ulint	page_no)	/*!< in: page number */
64 {
65 	if (buf_dblwr == NULL) {
66 
67 		return(FALSE);
68 	}
69 
70 	if (page_no >= buf_dblwr->block1
71 	    && page_no < buf_dblwr->block1
72 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
73 		return(TRUE);
74 	}
75 
76 	if (page_no >= buf_dblwr->block2
77 	    && page_no < buf_dblwr->block2
78 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
79 		return(TRUE);
80 	}
81 
82 	return(FALSE);
83 }
84 
85 /****************************************************************//**
86 Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
87 doublewrite buffer within it.
88 @return pointer to the doublewrite buffer within the filespace header
89 page. */
90 UNIV_INLINE
91 byte*
buf_dblwr_get(mtr_t * mtr)92 buf_dblwr_get(
93 /*==========*/
94 	mtr_t*	mtr)	/*!< in/out: MTR to hold the page latch */
95 {
96 	buf_block_t*	block;
97 
98 	block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
99 			     univ_page_size, RW_X_LATCH, mtr);
100 
101 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
102 
103 	return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
104 }
105 
106 /********************************************************************//**
107 Flush a batch of writes to the datafiles that have already been
108 written to the dblwr buffer on disk. */
109 void
buf_dblwr_sync_datafiles()110 buf_dblwr_sync_datafiles()
111 /*======================*/
112 {
113 	/* Wake possible simulated aio thread to actually post the
114 	writes to the operating system */
115 	os_aio_simulated_wake_handler_threads();
116 
117 	/* Wait that all async writes to tablespaces have been posted to
118 	the OS */
119 	os_aio_wait_until_no_pending_writes();
120 
121 	/* Now we flush the data to disk (for example, with fsync) */
122 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
123 }
124 
125 /****************************************************************//**
126 Creates or initialializes the doublewrite buffer at a database start. */
127 static
128 void
buf_dblwr_init(byte * doublewrite)129 buf_dblwr_init(
130 /*===========*/
131 	byte*	doublewrite)	/*!< in: pointer to the doublewrite buf
132 				header on trx sys page */
133 {
134 	ulint	buf_size;
135 
136 	buf_dblwr = static_cast<buf_dblwr_t*>(
137 		ut_zalloc_nokey(sizeof(buf_dblwr_t)));
138 
139 	/* There are two blocks of same size in the doublewrite
140 	buffer. */
141 	buf_size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
142 
143 	/* There must be atleast one buffer for single page writes
144 	and one buffer for batch writes. */
145 	ut_a(srv_doublewrite_batch_size > 0
146 	     && srv_doublewrite_batch_size < buf_size);
147 
148 	mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
149 
150 	buf_dblwr->b_event = os_event_create("dblwr_batch_event");
151 	buf_dblwr->s_event = os_event_create("dblwr_single_event");
152 	buf_dblwr->first_free = 0;
153 	buf_dblwr->s_reserved = 0;
154 	buf_dblwr->b_reserved = 0;
155 
156 	buf_dblwr->block1 = mach_read_from_4(
157 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
158 	buf_dblwr->block2 = mach_read_from_4(
159 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
160 
161 	buf_dblwr->in_use = static_cast<bool*>(
162 		ut_zalloc_nokey(buf_size * sizeof(bool)));
163 
164 	buf_dblwr->write_buf_unaligned = static_cast<byte*>(
165 		ut_malloc_nokey((1 + buf_size) * UNIV_PAGE_SIZE));
166 
167 	buf_dblwr->write_buf = static_cast<byte*>(
168 		ut_align(buf_dblwr->write_buf_unaligned,
169 			 UNIV_PAGE_SIZE));
170 
171 	buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
172 		ut_zalloc_nokey(buf_size * sizeof(void*)));
173 }
174 
175 /****************************************************************//**
176 Creates the doublewrite buffer to a new InnoDB installation. The header of the
177 doublewrite buffer is placed on the trx system header page.
178 @return true if successful, false if not. */
179 MY_ATTRIBUTE((warn_unused_result))
180 bool
buf_dblwr_create(void)181 buf_dblwr_create(void)
182 /*==================*/
183 {
184 	buf_block_t*	block2;
185 	buf_block_t*	new_block;
186 	byte*	doublewrite;
187 	byte*	fseg_header;
188 	ulint	page_no;
189 	ulint	prev_page_no;
190 	ulint	i;
191 	mtr_t	mtr;
192 
193 	if (buf_dblwr) {
194 		/* Already inited */
195 
196 		return(true);
197 	}
198 
199 start_again:
200 	mtr_start(&mtr);
201 	buf_dblwr_being_created = TRUE;
202 
203 	doublewrite = buf_dblwr_get(&mtr);
204 
205 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
206 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
207 		/* The doublewrite buffer has already been created:
208 		just read in some numbers */
209 
210 		buf_dblwr_init(doublewrite);
211 
212 		mtr_commit(&mtr);
213 		buf_dblwr_being_created = FALSE;
214 		return(true);
215 	}
216 
217 	ib::info() << "Doublewrite buffer not found: creating new";
218 
219 	ulint min_doublewrite_size =
220 		( ( 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
221 		  + FSP_EXTENT_SIZE / 2
222 		  + 100)
223 		* UNIV_PAGE_SIZE);
224 	if (buf_pool_get_curr_size() <  min_doublewrite_size) {
225 		ib::error() << "Cannot create doublewrite buffer: you must"
226 			" increase your buffer pool size. Cannot continue"
227 			" operation.";
228 
229 		return(false);
230 	}
231 
232 	block2 = fseg_create(TRX_SYS_SPACE, TRX_SYS_PAGE_NO,
233 			     TRX_SYS_DOUBLEWRITE
234 			     + TRX_SYS_DOUBLEWRITE_FSEG, &mtr);
235 
236 	/* fseg_create acquires a second latch on the page,
237 	therefore we must declare it: */
238 
239 	buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
240 
241 	if (block2 == NULL) {
242 		ib::error() << "Cannot create doublewrite buffer: you must"
243 			" increase your tablespace size."
244 			" Cannot continue operation.";
245 
246 		/* We exit without committing the mtr to prevent
247 		its modifications to the database getting to disk */
248 
249 		return(false);
250 	}
251 
252 	fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
253 	prev_page_no = 0;
254 
255 	for (i = 0; i < 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
256 		     + FSP_EXTENT_SIZE / 2; i++) {
257 		new_block = fseg_alloc_free_page(
258 			fseg_header, prev_page_no + 1, FSP_UP, &mtr);
259 		if (new_block == NULL) {
260 			ib::error() << "Cannot create doublewrite buffer: "
261 				" you must increase your tablespace size."
262 				" Cannot continue operation.";
263 
264 			return(false);
265 		}
266 
267 		/* We read the allocated pages to the buffer pool;
268 		when they are written to disk in a flush, the space
269 		id and page number fields are also written to the
270 		pages. When we at database startup read pages
271 		from the doublewrite buffer, we know that if the
272 		space id and page number in them are the same as
273 		the page position in the tablespace, then the page
274 		has not been written to in doublewrite. */
275 
276 		ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
277 		page_no = new_block->page.id.page_no();
278 
279 		if (i == FSP_EXTENT_SIZE / 2) {
280 			ut_a(page_no == FSP_EXTENT_SIZE);
281 			mlog_write_ulint(doublewrite
282 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
283 					 page_no, MLOG_4BYTES, &mtr);
284 			mlog_write_ulint(doublewrite
285 					 + TRX_SYS_DOUBLEWRITE_REPEAT
286 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
287 					 page_no, MLOG_4BYTES, &mtr);
288 
289 		} else if (i == FSP_EXTENT_SIZE / 2
290 			   + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
291 			ut_a(page_no == 2 * FSP_EXTENT_SIZE);
292 			mlog_write_ulint(doublewrite
293 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
294 					 page_no, MLOG_4BYTES, &mtr);
295 			mlog_write_ulint(doublewrite
296 					 + TRX_SYS_DOUBLEWRITE_REPEAT
297 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
298 					 page_no, MLOG_4BYTES, &mtr);
299 
300 		} else if (i > FSP_EXTENT_SIZE / 2) {
301 			ut_a(page_no == prev_page_no + 1);
302 		}
303 
304 		if (((i + 1) & 15) == 0) {
305 			/* rw_locks can only be recursively x-locked
306 			2048 times. (on 32 bit platforms,
307 			(lint) 0 - (X_LOCK_DECR * 2049)
308 			is no longer a negative number, and thus
309 			lock_word becomes like a shared lock).
310 			For 4k page size this loop will
311 			lock the fseg header too many times. Since
312 			this code is not done while any other threads
313 			are active, restart the MTR occasionally. */
314 			mtr_commit(&mtr);
315 			mtr_start(&mtr);
316 			doublewrite = buf_dblwr_get(&mtr);
317 			fseg_header = doublewrite
318 				      + TRX_SYS_DOUBLEWRITE_FSEG;
319 		}
320 
321 		prev_page_no = page_no;
322 	}
323 
324 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
325 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
326 			 MLOG_4BYTES, &mtr);
327 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
328 			 + TRX_SYS_DOUBLEWRITE_REPEAT,
329 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
330 			 MLOG_4BYTES, &mtr);
331 
332 	mlog_write_ulint(doublewrite
333 			 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
334 			 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
335 			 MLOG_4BYTES, &mtr);
336 	mtr_commit(&mtr);
337 
338 	/* Flush the modified pages to disk and make a checkpoint */
339 	log_make_checkpoint_at(LSN_MAX, TRUE);
340 
341 	/* Remove doublewrite pages from LRU */
342 	buf_pool_invalidate();
343 
344 	ib::info() <<  "Doublewrite buffer created";
345 
346 	goto start_again;
347 }
348 
349 /**
350 At database startup initializes the doublewrite buffer memory structure if
351 we already have a doublewrite buffer created in the data files. If we are
352 upgrading to an InnoDB version which supports multiple tablespaces, then this
353 function performs the necessary update operations. If we are in a crash
354 recovery, this function loads the pages from double write buffer into memory.
355 @param[in]	file		File handle
356 @param[in]	path		Path name of file
357 @return DB_SUCCESS or error code */
358 dberr_t
buf_dblwr_init_or_load_pages(pfs_os_file_t file,const char * path)359 buf_dblwr_init_or_load_pages(
360 	pfs_os_file_t	file,
361 	const char*	path)
362 {
363 	byte*		buf;
364 	byte*		page;
365 	ulint		block1;
366 	ulint		block2;
367 	ulint		space_id;
368 	byte*		read_buf;
369 	byte*		doublewrite;
370 	byte*		unaligned_read_buf;
371 	ibool		reset_space_ids = FALSE;
372 	recv_dblwr_t&	recv_dblwr = recv_sys->dblwr;
373 
374 	/* We do the file i/o past the buffer pool */
375 
376 	unaligned_read_buf = static_cast<byte*>(
377 		ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
378 
379 	read_buf = static_cast<byte*>(
380 		ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
381 
382 	/* Read the trx sys header to check if we are using the doublewrite
383 	buffer */
384 	dberr_t		err;
385 
386 	IORequest	read_request(IORequest::READ);
387 
388 	read_request.disable_compression();
389 
390 	err = os_file_read(
391 		read_request,
392 		file, read_buf, TRX_SYS_PAGE_NO * UNIV_PAGE_SIZE,
393 		UNIV_PAGE_SIZE);
394 
395 	if (err != DB_SUCCESS) {
396 
397 		ib::error()
398 			<< "Failed to read the system tablespace header page";
399 
400 		ut_free(unaligned_read_buf);
401 
402 		return(err);
403 	}
404 
405 	doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
406 
407 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
408 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
409 		/* The doublewrite buffer has been created */
410 
411 		buf_dblwr_init(doublewrite);
412 
413 		block1 = buf_dblwr->block1;
414 		block2 = buf_dblwr->block2;
415 
416 		buf = buf_dblwr->write_buf;
417 	} else {
418 		ut_free(unaligned_read_buf);
419 		return(DB_SUCCESS);
420 	}
421 
422 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
423 	    != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
424 
425 		/* We are upgrading from a version < 4.1.x to a version where
426 		multiple tablespaces are supported. We must reset the space id
427 		field in the pages in the doublewrite buffer because starting
428 		from this version the space id is stored to
429 		FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
430 
431 		reset_space_ids = TRUE;
432 
433 		ib::info() << "Resetting space id's in the doublewrite buffer";
434 	}
435 
436 	/* Read the pages from the doublewrite buffer to memory */
437 	err = os_file_read(
438 		read_request,
439 		file, buf, block1 * UNIV_PAGE_SIZE,
440 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
441 
442 	if (err != DB_SUCCESS) {
443 
444 		ib::error()
445 			<< "Failed to read the first double write buffer "
446 			"extent";
447 
448 		ut_free(unaligned_read_buf);
449 
450 		return(err);
451 	}
452 
453 	err = os_file_read(
454 		read_request,
455 		file,
456 		buf + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE,
457 		block2 * UNIV_PAGE_SIZE,
458 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE);
459 
460 	if (err != DB_SUCCESS) {
461 
462 		ib::error()
463 			<< "Failed to read the second double write buffer "
464 			"extent";
465 
466 		ut_free(unaligned_read_buf);
467 
468 		return(err);
469 	}
470 
471 	/* Check if any of these pages is half-written in data files, in the
472 	intended position */
473 
474 	page = buf;
475 
476 	for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
477 		if (reset_space_ids) {
478 			ulint source_page_no;
479 
480 			space_id = 0;
481 			mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
482 					space_id);
483 			/* We do not need to calculate new checksums for the
484 			pages because the field .._SPACE_ID does not affect
485 			them. Write the page back to where we read it from. */
486 
487 			if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
488 				source_page_no = block1 + i;
489 			} else {
490 				source_page_no = block2
491 					+ i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
492 			}
493 
494 			IORequest	write_request(IORequest::WRITE);
495 
496 			/* Recovered data file pages are written out
497 			as uncompressed. */
498 
499 			write_request.disable_compression();
500 
501 			err = os_file_write(
502 				write_request, path, file, page,
503 				source_page_no * UNIV_PAGE_SIZE,
504 				UNIV_PAGE_SIZE);
505 
506 			if (err != DB_SUCCESS) {
507 
508 				ib::error()
509 					<< "Failed to write to the double write"
510 					" buffer";
511 
512 				ut_free(unaligned_read_buf);
513 
514 				return(err);
515 			}
516 
517 		} else {
518 
519 			recv_dblwr.add(page);
520 		}
521 
522 		page += univ_page_size.physical();
523 	}
524 
525 	if (reset_space_ids) {
526 		os_file_flush(file);
527 	}
528 
529 	ut_free(unaligned_read_buf);
530 
531 	return(DB_SUCCESS);
532 }
533 
534 /** Process and remove the double write buffer pages for all tablespaces. */
535 void
buf_dblwr_process(void)536 buf_dblwr_process(void)
537 {
538 	ulint		page_no_dblwr	= 0;
539 	byte*		read_buf;
540 	byte*		unaligned_read_buf;
541 	recv_dblwr_t&	recv_dblwr	= recv_sys->dblwr;
542 
543 	unaligned_read_buf = static_cast<byte*>(
544 		ut_malloc_nokey(2 * UNIV_PAGE_SIZE));
545 
546 	read_buf = static_cast<byte*>(
547 		ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
548 
549 	for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
550 	     i != recv_dblwr.pages.end();
551 	     ++i, ++page_no_dblwr) {
552 
553 		const byte*	page		= *i;
554 		ulint		page_no		= page_get_page_no(page);
555 		ulint		space_id	= page_get_space_id(page);
556 
557 		fil_space_t*	space = fil_space_get(space_id);
558 
559 		if (space == NULL) {
560 			/* Maybe we have dropped the tablespace
561 			and this page once belonged to it: do nothing */
562 			continue;
563 		}
564 
565 		fil_space_open_if_needed(space);
566 
567 		if (page_no >= space->size) {
568 
569 			/* Do not report the warning if the tablespace is
570 			schedule for truncate or was truncated and we have live
571 			MLOG_TRUNCATE record in redo. */
572 			bool	skip_warning =
573 				srv_is_tablespace_truncated(space_id)
574 				|| srv_was_tablespace_truncated(space);
575 
576 			if (!skip_warning) {
577 				ib::warn() << "Page " << page_no_dblwr
578 					<< " in the doublewrite buffer is"
579 					" not within space bounds: page "
580 					<< page_id_t(space_id, page_no);
581 			}
582 		} else {
583 			const page_size_t	page_size(space->flags);
584 			const page_id_t		page_id(space_id, page_no);
585 
586 			/* We want to ensure that for partial reads the
587 			unread portion of the page is NUL. */
588 			memset(read_buf, 0x0, page_size.physical());
589 
590 			IORequest	request;
591 
592 			request.dblwr_recover();
593 
594 			/* Read in the actual page from the file */
595 			dberr_t	err = fil_io(
596 				request, true,
597 				page_id, page_size,
598 				0, page_size.physical(), read_buf, NULL);
599 
600 			if (err != DB_SUCCESS) {
601 
602 				ib::warn()
603 					<< "Double write buffer recovery: "
604 					<< page_id << " read failed with "
605 					<< "error: " << ut_strerr(err);
606 			}
607 
608 			/* Check if the page is corrupt */
609 			if (buf_page_is_corrupted(
610 				true, read_buf, page_size,
611 				fsp_is_checksum_disabled(space_id))) {
612 
613 				ib::warn() << "Database page corruption or"
614 					<< " a failed file read of page "
615 					<< page_id
616 					<< ". Trying to recover it from the"
617 					<< " doublewrite buffer.";
618 
619 				if (buf_page_is_corrupted(
620 					true, page, page_size,
621 					fsp_is_checksum_disabled(space_id))) {
622 
623 					ib::error() << "Dump of the page:";
624 					buf_page_print(
625 						read_buf, page_size,
626 						BUF_PAGE_PRINT_NO_CRASH);
627 					ib::error() << "Dump of corresponding"
628 						" page in doublewrite buffer:";
629 
630 					buf_page_print(
631 						page, page_size,
632 						BUF_PAGE_PRINT_NO_CRASH);
633 
634 					ib::fatal() << "The page in the"
635 						" doublewrite buffer is"
636 						" corrupt. Cannot continue"
637 						" operation. You can try to"
638 						" recover the database with"
639 						" innodb_force_recovery=6";
640 				}
641 			} else if (buf_page_is_zeroes(read_buf, page_size)
642 				   && !buf_page_is_zeroes(page, page_size)
643 				   && !buf_page_is_corrupted(
644 					true, page, page_size,
645 					fsp_is_checksum_disabled(space_id))) {
646 
647 				/* Database page contained only zeroes, while
648 				a valid copy is available in dblwr buffer. */
649 
650 			} else {
651 
652 				bool t1 = buf_page_is_zeroes(
653                                         read_buf, page_size);
654 
655 				bool t2 = buf_page_is_zeroes(page, page_size);
656 
657 				bool t3 = buf_page_is_corrupted(
658 					true, page, page_size,
659 					fsp_is_checksum_disabled(space_id));
660 
661 				if (t1 && !(t2 || t3)) {
662 
663 					/* Database page contained only
664 					zeroes, while a valid copy is
665 					available in dblwr buffer. */
666 
667 				} else {
668 					continue;
669 				}
670 			}
671 
672 			/* Recovered data file pages are written out
673 			as uncompressed. */
674 
675 			IORequest	write_request(IORequest::WRITE);
676 
677 			write_request.disable_compression();
678 
679 			/* Write the good page from the doublewrite
680 			buffer to the intended position. */
681 
682 			fil_io(write_request, true,
683 			       page_id, page_size,
684 			       0, page_size.physical(),
685 			       const_cast<byte*>(page), NULL);
686 
687 			ib::info()
688 				<< "Recovered page "
689 				<< page_id
690 				<< " from the doublewrite buffer.";
691 		}
692 	}
693 
694 	recv_dblwr.pages.clear();
695 
696 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
697 	ut_free(unaligned_read_buf);
698 }
699 
700 /****************************************************************//**
701 Frees doublewrite buffer. */
702 void
buf_dblwr_free(void)703 buf_dblwr_free(void)
704 /*================*/
705 {
706 	/* Free the double write data structures. */
707 	ut_a(buf_dblwr != NULL);
708 	ut_ad(buf_dblwr->s_reserved == 0);
709 	ut_ad(buf_dblwr->b_reserved == 0);
710 
711 	os_event_destroy(buf_dblwr->b_event);
712 	os_event_destroy(buf_dblwr->s_event);
713 	ut_free(buf_dblwr->write_buf_unaligned);
714 	buf_dblwr->write_buf_unaligned = NULL;
715 
716 	ut_free(buf_dblwr->buf_block_arr);
717 	buf_dblwr->buf_block_arr = NULL;
718 
719 	ut_free(buf_dblwr->in_use);
720 	buf_dblwr->in_use = NULL;
721 
722 	mutex_free(&buf_dblwr->mutex);
723 	ut_free(buf_dblwr);
724 	buf_dblwr = NULL;
725 }
726 
727 /********************************************************************//**
728 Updates the doublewrite buffer when an IO request is completed. */
729 void
buf_dblwr_update(const buf_page_t * bpage,buf_flush_t flush_type)730 buf_dblwr_update(
731 /*=============*/
732 	const buf_page_t*	bpage,	/*!< in: buffer block descriptor */
733 	buf_flush_t		flush_type)/*!< in: flush type */
734 {
735 	if (!srv_use_doublewrite_buf
736 	    || buf_dblwr == NULL
737 	    || fsp_is_system_temporary(bpage->id.space())) {
738 		return;
739 	}
740 
741 	ut_ad(!srv_read_only_mode);
742 
743 	switch (flush_type) {
744 	case BUF_FLUSH_LIST:
745 	case BUF_FLUSH_LRU:
746 		mutex_enter(&buf_dblwr->mutex);
747 
748 		ut_ad(buf_dblwr->batch_running);
749 		ut_ad(buf_dblwr->b_reserved > 0);
750 		ut_ad(buf_dblwr->b_reserved <= buf_dblwr->first_free);
751 
752 		buf_dblwr->b_reserved--;
753 
754 		if (buf_dblwr->b_reserved == 0) {
755 			mutex_exit(&buf_dblwr->mutex);
756 			/* This will finish the batch. Sync data files
757 			to the disk. */
758 			fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
759 			mutex_enter(&buf_dblwr->mutex);
760 
761 			/* We can now reuse the doublewrite memory buffer: */
762 			buf_dblwr->first_free = 0;
763 			buf_dblwr->batch_running = false;
764 			os_event_set(buf_dblwr->b_event);
765 		}
766 
767 		mutex_exit(&buf_dblwr->mutex);
768 		break;
769 	case BUF_FLUSH_SINGLE_PAGE:
770 		{
771 			const ulint size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
772 			ulint i;
773 			mutex_enter(&buf_dblwr->mutex);
774 			for (i = srv_doublewrite_batch_size; i < size; ++i) {
775 				if (buf_dblwr->buf_block_arr[i] == bpage) {
776 					buf_dblwr->s_reserved--;
777 					buf_dblwr->buf_block_arr[i] = NULL;
778 					buf_dblwr->in_use[i] = false;
779 					break;
780 				}
781 			}
782 
783 			/* The block we are looking for must exist as a
784 			reserved block. */
785 			ut_a(i < size);
786 		}
787 		os_event_set(buf_dblwr->s_event);
788 		mutex_exit(&buf_dblwr->mutex);
789 		break;
790 	case BUF_FLUSH_N_TYPES:
791 		ut_error;
792 	}
793 }
794 
795 /********************************************************************//**
796 Check the LSN values on the page. */
797 static
798 void
buf_dblwr_check_page_lsn(const page_t * page)799 buf_dblwr_check_page_lsn(
800 /*=====================*/
801 	const page_t*	page)		/*!< in: page to check */
802 {
803 	if (memcmp(page + (FIL_PAGE_LSN + 4),
804 		   page + (UNIV_PAGE_SIZE
805 			   - FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
806 		   4)) {
807 
808 		const ulint	lsn1 = mach_read_from_4(
809 			page + FIL_PAGE_LSN + 4);
810 		const ulint	lsn2 = mach_read_from_4(
811 			page + UNIV_PAGE_SIZE - FIL_PAGE_END_LSN_OLD_CHKSUM
812 			+ 4);
813 
814 		ib::error() << "The page to be written seems corrupt!"
815 			" The low 4 bytes of LSN fields do not match"
816 			" (" << lsn1 << " != " << lsn2 << ")!"
817 			" Noticed in the buffer pool.";
818 	}
819 }
820 
821 /********************************************************************//**
822 Asserts when a corrupt block is find during writing out data to the
823 disk. */
824 static
825 void
buf_dblwr_assert_on_corrupt_block(const buf_block_t * block)826 buf_dblwr_assert_on_corrupt_block(
827 /*==============================*/
828 	const buf_block_t*	block)	/*!< in: block to check */
829 {
830 	buf_page_print(block->frame, univ_page_size, BUF_PAGE_PRINT_NO_CRASH);
831 
832 	ib::fatal() << "Apparent corruption of an index page "
833 		<< block->page.id
834 		<< " to be written to data file. We intentionally crash"
835 		" the server to prevent corrupt data from ending up in"
836 		" data files.";
837 }
838 
839 /********************************************************************//**
840 Check the LSN values on the page with which this block is associated.
841 Also validate the page if the option is set. */
842 static
843 void
buf_dblwr_check_block(const buf_block_t * block)844 buf_dblwr_check_block(
845 /*==================*/
846 	const buf_block_t*	block)	/*!< in: block to check */
847 {
848 	ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
849 
850 	if (block->skip_flush_check) {
851 		return;
852 	}
853 
854 	switch (fil_page_get_type(block->frame)) {
855 	case FIL_PAGE_INDEX:
856 	case FIL_PAGE_RTREE:
857 		if (page_is_comp(block->frame)) {
858 			if (page_simple_validate_new(block->frame)) {
859 				return;
860 			}
861 		} else if (page_simple_validate_old(block->frame)) {
862 			return;
863 		}
864 		/* While it is possible that this is not an index page
865 		but just happens to have wrongly set FIL_PAGE_TYPE,
866 		such pages should never be modified to without also
867 		adjusting the page type during page allocation or
868 		buf_flush_init_for_writing() or fil_page_reset_type(). */
869 		break;
870 	case FIL_PAGE_TYPE_FSP_HDR:
871 	case FIL_PAGE_IBUF_BITMAP:
872 	case FIL_PAGE_TYPE_UNKNOWN:
873 		/* Do not complain again, we already reset this field. */
874 	case FIL_PAGE_UNDO_LOG:
875 	case FIL_PAGE_INODE:
876 	case FIL_PAGE_IBUF_FREE_LIST:
877 	case FIL_PAGE_TYPE_SYS:
878 	case FIL_PAGE_TYPE_TRX_SYS:
879 	case FIL_PAGE_TYPE_XDES:
880 	case FIL_PAGE_TYPE_BLOB:
881 	case FIL_PAGE_TYPE_ZBLOB:
882 	case FIL_PAGE_TYPE_ZBLOB2:
883 		/* TODO: validate also non-index pages */
884 		return;
885 	case FIL_PAGE_TYPE_ALLOCATED:
886 		/* empty pages should never be flushed */
887 		break;
888 	}
889 
890 	buf_dblwr_assert_on_corrupt_block(block);
891 }
892 
893 /********************************************************************//**
894 Writes a page that has already been written to the doublewrite buffer
895 to the datafile. It is the job of the caller to sync the datafile. */
896 static
897 void
buf_dblwr_write_block_to_datafile(const buf_page_t * bpage,bool sync)898 buf_dblwr_write_block_to_datafile(
899 /*==============================*/
900 	const buf_page_t*	bpage,	/*!< in: page to write */
901 	bool			sync)	/*!< in: true if sync IO
902 					is requested */
903 {
904 	ut_a(buf_page_in_file(bpage));
905 
906 	ulint	type = IORequest::WRITE;
907 
908 	if (sync) {
909 		type |= IORequest::DO_NOT_WAKE;
910 	}
911 
912 	IORequest	request(type);
913 
914 	if (bpage->zip.data != NULL) {
915 		ut_ad(bpage->size.is_compressed());
916 
917 		fil_io(request, sync, bpage->id, bpage->size, 0,
918 		       bpage->size.physical(),
919 		       (void*) bpage->zip.data,
920 		       (void*) bpage);
921 	} else {
922 		ut_ad(!bpage->size.is_compressed());
923 
924 		/* Our IO API is common for both reads and writes and is
925 		therefore geared towards a non-const parameter. */
926 
927 		buf_block_t*	block = reinterpret_cast<buf_block_t*>(
928 			const_cast<buf_page_t*>(bpage));
929 
930 		ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
931 		buf_dblwr_check_page_lsn(block->frame);
932 
933 		fil_io(request,
934 		       sync, bpage->id, bpage->size, 0, bpage->size.physical(),
935 		       block->frame, block);
936 	}
937 }
938 
939 /********************************************************************//**
940 Flushes possible buffered writes from the doublewrite memory buffer to disk,
941 and also wakes up the aio thread if simulated aio is used. It is very
942 important to call this function after a batch of writes has been posted,
943 and also when we may have to wait for a page latch! Otherwise a deadlock
944 of threads can occur. */
945 void
buf_dblwr_flush_buffered_writes(void)946 buf_dblwr_flush_buffered_writes(void)
947 /*=================================*/
948 {
949 	byte*		write_buf;
950 	ulint		first_free;
951 	ulint		len;
952 
953 	if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
954 		/* Sync the writes to the disk. */
955 		buf_dblwr_sync_datafiles();
956 		return;
957 	}
958 
959 	ut_ad(!srv_read_only_mode);
960 
961 try_again:
962 	mutex_enter(&buf_dblwr->mutex);
963 
964 	/* Write first to doublewrite buffer blocks. We use synchronous
965 	aio and thus know that file write has been completed when the
966 	control returns. */
967 
968 	if (buf_dblwr->first_free == 0) {
969 
970 		mutex_exit(&buf_dblwr->mutex);
971 
972 		/* Wake possible simulated aio thread as there could be
973 		system temporary tablespace pages active for flushing.
974 		Note: system temporary tablespace pages are not scheduled
975 		for doublewrite. */
976 		os_aio_simulated_wake_handler_threads();
977 
978 		return;
979 	}
980 
981 	if (buf_dblwr->batch_running) {
982 		/* Another thread is running the batch right now. Wait
983 		for it to finish. */
984 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
985 		mutex_exit(&buf_dblwr->mutex);
986 
987 		os_event_wait_low(buf_dblwr->b_event, sig_count);
988 		goto try_again;
989 	}
990 
991 	ut_a(!buf_dblwr->batch_running);
992 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
993 
994 	/* Disallow anyone else to post to doublewrite buffer or to
995 	start another batch of flushing. */
996 	buf_dblwr->batch_running = true;
997 	first_free = buf_dblwr->first_free;
998 
999 	/* Now safe to release the mutex. Note that though no other
1000 	thread is allowed to post to the doublewrite batch flushing
1001 	but any threads working on single page flushes are allowed
1002 	to proceed. */
1003 	mutex_exit(&buf_dblwr->mutex);
1004 
1005 	write_buf = buf_dblwr->write_buf;
1006 
1007 	for (ulint len2 = 0, i = 0;
1008 	     i < buf_dblwr->first_free;
1009 	     len2 += UNIV_PAGE_SIZE, i++) {
1010 
1011 		const buf_block_t*	block;
1012 
1013 		block = (buf_block_t*) buf_dblwr->buf_block_arr[i];
1014 
1015 		if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
1016 		    || block->page.zip.data) {
1017 			/* No simple validate for compressed
1018 			pages exists. */
1019 			continue;
1020 		}
1021 
1022 		/* Check that the actual page in the buffer pool is
1023 		not corrupt and the LSN values are sane. */
1024 		buf_dblwr_check_block(block);
1025 
1026 		/* Check that the page as written to the doublewrite
1027 		buffer has sane LSN values. */
1028 		buf_dblwr_check_page_lsn(write_buf + len2);
1029 	}
1030 
1031 	/* Write out the first block of the doublewrite buffer */
1032 	len = ut_min(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE,
1033 		     buf_dblwr->first_free) * UNIV_PAGE_SIZE;
1034 
1035 	fil_io(IORequestWrite, true,
1036 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block1), univ_page_size,
1037 	       0, len, (void*) write_buf, NULL);
1038 
1039 	if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1040 		/* No unwritten pages in the second block. */
1041 		goto flush;
1042 	}
1043 
1044 	/* Write out the second block of the doublewrite buffer. */
1045 	len = (buf_dblwr->first_free - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE)
1046 	       * UNIV_PAGE_SIZE;
1047 
1048 	write_buf = buf_dblwr->write_buf
1049 		    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * UNIV_PAGE_SIZE;
1050 
1051 	fil_io(IORequestWrite, true,
1052 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block2), univ_page_size,
1053 	       0, len, (void*) write_buf, NULL);
1054 
1055 flush:
1056 	/* increment the doublewrite flushed pages counter */
1057 	srv_stats.dblwr_pages_written.add(buf_dblwr->first_free);
1058 	srv_stats.dblwr_writes.inc();
1059 
1060 	/* Now flush the doublewrite buffer data to disk */
1061 	fil_flush(TRX_SYS_SPACE);
1062 
1063 	/* We know that the writes have been flushed to disk now
1064 	and in recovery we will find them in the doublewrite buffer
1065 	blocks. Next do the writes to the intended positions. */
1066 
1067 	/* Up to this point first_free and buf_dblwr->first_free are
1068 	same because we have set the buf_dblwr->batch_running flag
1069 	disallowing any other thread to post any request but we
1070 	can't safely access buf_dblwr->first_free in the loop below.
1071 	This is so because it is possible that after we are done with
1072 	the last iteration and before we terminate the loop, the batch
1073 	gets finished in the IO helper thread and another thread posts
1074 	a new batch setting buf_dblwr->first_free to a higher value.
1075 	If this happens and we are using buf_dblwr->first_free in the
1076 	loop termination condition then we'll end up dispatching
1077 	the same block twice from two different threads. */
1078 	ut_ad(first_free == buf_dblwr->first_free);
1079 	for (ulint i = 0; i < first_free; i++) {
1080 		buf_dblwr_write_block_to_datafile(
1081 			buf_dblwr->buf_block_arr[i], false);
1082 	}
1083 
1084 	/* Wake possible simulated aio thread to actually post the
1085 	writes to the operating system. We don't flush the files
1086 	at this point. We leave it to the IO helper thread to flush
1087 	datafiles when the whole batch has been processed. */
1088 	os_aio_simulated_wake_handler_threads();
1089 }
1090 
1091 /********************************************************************//**
1092 Posts a buffer page for writing. If the doublewrite memory buffer is
1093 full, calls buf_dblwr_flush_buffered_writes and waits for for free
1094 space to appear. */
1095 void
buf_dblwr_add_to_batch(buf_page_t * bpage)1096 buf_dblwr_add_to_batch(
1097 /*====================*/
1098 	buf_page_t*	bpage)	/*!< in: buffer block to write */
1099 {
1100 	ut_a(buf_page_in_file(bpage));
1101 
1102 try_again:
1103 	mutex_enter(&buf_dblwr->mutex);
1104 
1105 	ut_a(buf_dblwr->first_free <= srv_doublewrite_batch_size);
1106 
1107 	if (buf_dblwr->batch_running) {
1108 
1109 		/* This not nearly as bad as it looks. There is only
1110 		page_cleaner thread which does background flushing
1111 		in batches therefore it is unlikely to be a contention
1112 		point. The only exception is when a user thread is
1113 		forced to do a flush batch because of a sync
1114 		checkpoint. */
1115 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
1116 		mutex_exit(&buf_dblwr->mutex);
1117 
1118 		os_event_wait_low(buf_dblwr->b_event, sig_count);
1119 		goto try_again;
1120 	}
1121 
1122 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1123 		mutex_exit(&(buf_dblwr->mutex));
1124 
1125 		buf_dblwr_flush_buffered_writes();
1126 
1127 		goto try_again;
1128 	}
1129 
1130 	byte*	p = buf_dblwr->write_buf
1131 		+ univ_page_size.physical() * buf_dblwr->first_free;
1132 
1133 	if (bpage->size.is_compressed()) {
1134 		UNIV_MEM_ASSERT_RW(bpage->zip.data, bpage->size.physical());
1135 		/* Copy the compressed page and clear the rest. */
1136 
1137 		memcpy(p, bpage->zip.data, bpage->size.physical());
1138 
1139 		memset(p + bpage->size.physical(), 0x0,
1140 		       univ_page_size.physical() - bpage->size.physical());
1141 	} else {
1142 		ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1143 
1144 		UNIV_MEM_ASSERT_RW(((buf_block_t*) bpage)->frame,
1145 				   bpage->size.logical());
1146 
1147 		memcpy(p, ((buf_block_t*) bpage)->frame, bpage->size.logical());
1148 	}
1149 
1150 	buf_dblwr->buf_block_arr[buf_dblwr->first_free] = bpage;
1151 
1152 	buf_dblwr->first_free++;
1153 	buf_dblwr->b_reserved++;
1154 
1155 	ut_ad(!buf_dblwr->batch_running);
1156 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
1157 	ut_ad(buf_dblwr->b_reserved <= srv_doublewrite_batch_size);
1158 
1159 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1160 		mutex_exit(&(buf_dblwr->mutex));
1161 
1162 		buf_dblwr_flush_buffered_writes();
1163 
1164 		return;
1165 	}
1166 
1167 	mutex_exit(&(buf_dblwr->mutex));
1168 }
1169 
1170 /********************************************************************//**
1171 Writes a page to the doublewrite buffer on disk, sync it, then write
1172 the page to the datafile and sync the datafile. This function is used
1173 for single page flushes. If all the buffers allocated for single page
1174 flushes in the doublewrite buffer are in use we wait here for one to
1175 become free. We are guaranteed that a slot will become free because any
1176 thread that is using a slot must also release the slot before leaving
1177 this function. */
1178 void
buf_dblwr_write_single_page(buf_page_t * bpage,bool sync)1179 buf_dblwr_write_single_page(
1180 /*========================*/
1181 	buf_page_t*	bpage,	/*!< in: buffer block to write */
1182 	bool		sync)	/*!< in: true if sync IO requested */
1183 {
1184 	ulint		n_slots;
1185 	ulint		size;
1186 	ulint		offset;
1187 	ulint		i;
1188 
1189 	ut_a(buf_page_in_file(bpage));
1190 	ut_a(srv_use_doublewrite_buf);
1191 	ut_a(buf_dblwr != NULL);
1192 
1193 	/* total number of slots available for single page flushes
1194 	starts from srv_doublewrite_batch_size to the end of the
1195 	buffer. */
1196 	size = 2 * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1197 	ut_a(size > srv_doublewrite_batch_size);
1198 	n_slots = size - srv_doublewrite_batch_size;
1199 
1200 	if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1201 
1202 		/* Check that the actual page in the buffer pool is
1203 		not corrupt and the LSN values are sane. */
1204 		buf_dblwr_check_block((buf_block_t*) bpage);
1205 
1206 		/* Check that the page as written to the doublewrite
1207 		buffer has sane LSN values. */
1208 		if (!bpage->zip.data) {
1209 			buf_dblwr_check_page_lsn(
1210 				((buf_block_t*) bpage)->frame);
1211 		}
1212 	}
1213 
1214 retry:
1215 	mutex_enter(&buf_dblwr->mutex);
1216 	if (buf_dblwr->s_reserved == n_slots) {
1217 
1218 		/* All slots are reserved. */
1219 		int64_t	sig_count = os_event_reset(buf_dblwr->s_event);
1220 		mutex_exit(&buf_dblwr->mutex);
1221 		os_event_wait_low(buf_dblwr->s_event, sig_count);
1222 
1223 		goto retry;
1224 	}
1225 
1226 	for (i = srv_doublewrite_batch_size; i < size; ++i) {
1227 
1228 		if (!buf_dblwr->in_use[i]) {
1229 			break;
1230 		}
1231 	}
1232 
1233 	/* We are guaranteed to find a slot. */
1234 	ut_a(i < size);
1235 	buf_dblwr->in_use[i] = true;
1236 	buf_dblwr->s_reserved++;
1237 	buf_dblwr->buf_block_arr[i] = bpage;
1238 
1239 	/* increment the doublewrite flushed pages counter */
1240 	srv_stats.dblwr_pages_written.inc();
1241 	srv_stats.dblwr_writes.inc();
1242 
1243 	mutex_exit(&buf_dblwr->mutex);
1244 
1245 	/* Lets see if we are going to write in the first or second
1246 	block of the doublewrite buffer. */
1247 	if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1248 		offset = buf_dblwr->block1 + i;
1249 	} else {
1250 		offset = buf_dblwr->block2 + i
1251 			 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1252 	}
1253 
1254 	/* We deal with compressed and uncompressed pages a little
1255 	differently here. In case of uncompressed pages we can
1256 	directly write the block to the allocated slot in the
1257 	doublewrite buffer in the system tablespace and then after
1258 	syncing the system table space we can proceed to write the page
1259 	in the datafile.
1260 	In case of compressed page we first do a memcpy of the block
1261 	to the in-memory buffer of doublewrite before proceeding to
1262 	write it. This is so because we want to pad the remaining
1263 	bytes in the doublewrite page with zeros. */
1264 
1265 	if (bpage->size.is_compressed()) {
1266 		memcpy(buf_dblwr->write_buf + univ_page_size.physical() * i,
1267 		       bpage->zip.data, bpage->size.physical());
1268 
1269 		memset(buf_dblwr->write_buf + univ_page_size.physical() * i
1270 		       + bpage->size.physical(), 0x0,
1271 		       univ_page_size.physical() - bpage->size.physical());
1272 
1273 		fil_io(IORequestWrite, true,
1274 		       page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1275 		       univ_page_size.physical(),
1276 		       (void*) (buf_dblwr->write_buf
1277 				+ univ_page_size.physical() * i),
1278 		       NULL);
1279 	} else {
1280 		/* It is a regular page. Write it directly to the
1281 		doublewrite buffer */
1282 		fil_io(IORequestWrite, true,
1283 		       page_id_t(TRX_SYS_SPACE, offset), univ_page_size, 0,
1284 		       univ_page_size.physical(),
1285 		       (void*) ((buf_block_t*) bpage)->frame,
1286 		       NULL);
1287 	}
1288 
1289 	/* Now flush the doublewrite buffer data to disk */
1290 	fil_flush(TRX_SYS_SPACE);
1291 
1292 	/* We know that the write has been flushed to disk now
1293 	and during recovery we will find it in the doublewrite buffer
1294 	blocks. Next do the write to the intended position. */
1295 	buf_dblwr_write_block_to_datafile(bpage, sync);
1296 }
1297 #endif /* !UNIV_HOTBACKUP */
1298