1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2013, 2020, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file buf/buf0dblwr.cc
22 Doublwrite buffer module
23 
24 Created 2011/12/19
25 *******************************************************/
26 
27 #include "buf0dblwr.h"
28 #include "buf0buf.h"
29 #include "buf0checksum.h"
30 #include "srv0start.h"
31 #include "srv0srv.h"
32 #include "page0zip.h"
33 #include "trx0sys.h"
34 #include "fil0crypt.h"
35 #include "fil0pagecompress.h"
36 
37 using st_::span;
38 
39 /** The doublewrite buffer */
40 buf_dblwr_t*	buf_dblwr = NULL;
41 
42 /** Set to TRUE when the doublewrite buffer is being created */
43 ibool	buf_dblwr_being_created = FALSE;
44 
45 #define TRX_SYS_DOUBLEWRITE_BLOCKS 2
46 
47 /****************************************************************//**
48 Determines if a page number is located inside the doublewrite buffer.
49 @return TRUE if the location is inside the two blocks of the
50 doublewrite buffer */
51 ibool
buf_dblwr_page_inside(ulint page_no)52 buf_dblwr_page_inside(
53 /*==================*/
54 	ulint	page_no)	/*!< in: page number */
55 {
56 	if (buf_dblwr == NULL) {
57 
58 		return(FALSE);
59 	}
60 
61 	if (page_no >= buf_dblwr->block1
62 	    && page_no < buf_dblwr->block1
63 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
64 		return(TRUE);
65 	}
66 
67 	if (page_no >= buf_dblwr->block2
68 	    && page_no < buf_dblwr->block2
69 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
70 		return(TRUE);
71 	}
72 
73 	return(FALSE);
74 }
75 
76 /****************************************************************//**
77 Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
78 doublewrite buffer within it.
79 @return pointer to the doublewrite buffer within the filespace header
80 page. */
81 UNIV_INLINE
82 byte*
buf_dblwr_get(mtr_t * mtr)83 buf_dblwr_get(
84 /*==========*/
85 	mtr_t*	mtr)	/*!< in/out: MTR to hold the page latch */
86 {
87 	buf_block_t*	block;
88 
89 	block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
90 			     univ_page_size, RW_X_LATCH, mtr);
91 
92 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
93 
94 	return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
95 }
96 
97 /********************************************************************//**
98 Flush a batch of writes to the datafiles that have already been
99 written to the dblwr buffer on disk. */
100 void
buf_dblwr_sync_datafiles()101 buf_dblwr_sync_datafiles()
102 /*======================*/
103 {
104 	/* Wake possible simulated aio thread to actually post the
105 	writes to the operating system */
106 	os_aio_simulated_wake_handler_threads();
107 
108 	/* Wait that all async writes to tablespaces have been posted to
109 	the OS */
110 	os_aio_wait_until_no_pending_writes();
111 }
112 
113 /****************************************************************//**
114 Creates or initialializes the doublewrite buffer at a database start. */
115 static
116 void
buf_dblwr_init(byte * doublewrite)117 buf_dblwr_init(
118 /*===========*/
119 	byte*	doublewrite)	/*!< in: pointer to the doublewrite buf
120 				header on trx sys page */
121 {
122 	ulint	buf_size;
123 
124 	buf_dblwr = static_cast<buf_dblwr_t*>(
125 		ut_zalloc_nokey(sizeof(buf_dblwr_t)));
126 
127 	/* There are two blocks of same size in the doublewrite
128 	buffer. */
129 	buf_size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
130 
131 	/* There must be atleast one buffer for single page writes
132 	and one buffer for batch writes. */
133 	ut_a(srv_doublewrite_batch_size > 0
134 	     && srv_doublewrite_batch_size < buf_size);
135 
136 	mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
137 
138 	buf_dblwr->b_event = os_event_create("dblwr_batch_event");
139 	buf_dblwr->s_event = os_event_create("dblwr_single_event");
140 	buf_dblwr->first_free = 0;
141 	buf_dblwr->s_reserved = 0;
142 	buf_dblwr->b_reserved = 0;
143 
144 	buf_dblwr->block1 = mach_read_from_4(
145 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
146 	buf_dblwr->block2 = mach_read_from_4(
147 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
148 
149 	buf_dblwr->in_use = static_cast<bool*>(
150 		ut_zalloc_nokey(buf_size * sizeof(bool)));
151 
152 	buf_dblwr->write_buf_unaligned = static_cast<byte*>(
153 		ut_malloc_nokey((1 + buf_size) << srv_page_size_shift));
154 
155 	buf_dblwr->write_buf = static_cast<byte*>(
156 		ut_align(buf_dblwr->write_buf_unaligned,
157 			 srv_page_size));
158 
159 	buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
160 		ut_zalloc_nokey(buf_size * sizeof(void*)));
161 }
162 
163 /** Create the doublewrite buffer if the doublewrite buffer header
164 is not present in the TRX_SYS page.
165 @return	whether the operation succeeded
166 @retval	true	if the doublewrite buffer exists or was created
167 @retval	false	if the creation failed (too small first data file) */
168 bool
buf_dblwr_create()169 buf_dblwr_create()
170 {
171 	buf_block_t*	block2;
172 	buf_block_t*	new_block;
173 	buf_block_t*	trx_sys_block;
174 	byte*	doublewrite;
175 	byte*	fseg_header;
176 	ulint	page_no;
177 	ulint	prev_page_no;
178 	ulint	i;
179 	mtr_t	mtr;
180 
181 	if (buf_dblwr) {
182 		/* Already inited */
183 		return(true);
184 	}
185 
186 start_again:
187 	mtr.start();
188 	buf_dblwr_being_created = TRUE;
189 
190 	doublewrite = buf_dblwr_get(&mtr);
191 
192 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
193 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
194 		/* The doublewrite buffer has already been created:
195 		just read in some numbers */
196 
197 		buf_dblwr_init(doublewrite);
198 
199 		mtr.commit();
200 		buf_dblwr_being_created = FALSE;
201 		return(true);
202 	} else {
203 		if (UT_LIST_GET_FIRST(fil_system.sys_space->chain)->size
204 		    < 3 * FSP_EXTENT_SIZE) {
205 			goto too_small;
206 		}
207 	}
208 
209 	trx_sys_block = buf_page_get(
210 		page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
211 		univ_page_size, RW_X_LATCH, &mtr);
212 
213 	block2 = fseg_create(fil_system.sys_space,
214 			     TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG,
215 			     &mtr, false, trx_sys_block);
216 
217 	if (block2 == NULL) {
218 too_small:
219 		ib::error()
220 			<< "Cannot create doublewrite buffer: "
221 			"the first file in innodb_data_file_path"
222 			" must be at least "
223 			<< (3 * (FSP_EXTENT_SIZE
224 				 >> (20U - srv_page_size_shift)))
225 			<< "M.";
226 		mtr.commit();
227 		return(false);
228 	}
229 
230 	ib::info() << "Doublewrite buffer not found: creating new";
231 
232 	/* FIXME: After this point, the doublewrite buffer creation
233 	is not atomic. The doublewrite buffer should not exist in
234 	the InnoDB system tablespace file in the first place.
235 	It could be located in separate optional file(s) in a
236 	user-specified location. */
237 
238 	/* fseg_create acquires a second latch on the page,
239 	therefore we must declare it: */
240 
241 	buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
242 
243 	fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
244 	prev_page_no = 0;
245 
246 	for (i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
247 		     + FSP_EXTENT_SIZE / 2; i++) {
248 		new_block = fseg_alloc_free_page(
249 			fseg_header, prev_page_no + 1, FSP_UP, &mtr);
250 		if (new_block == NULL) {
251 			ib::error() << "Cannot create doublewrite buffer: "
252 				" you must increase your tablespace size."
253 				" Cannot continue operation.";
254 			/* This may essentially corrupt the doublewrite
255 			buffer. However, usually the doublewrite buffer
256 			is created at database initialization, and it
257 			should not matter (just remove all newly created
258 			InnoDB files and restart). */
259 			mtr.commit();
260 			return(false);
261 		}
262 
263 		/* We read the allocated pages to the buffer pool;
264 		when they are written to disk in a flush, the space
265 		id and page number fields are also written to the
266 		pages. When we at database startup read pages
267 		from the doublewrite buffer, we know that if the
268 		space id and page number in them are the same as
269 		the page position in the tablespace, then the page
270 		has not been written to in doublewrite. */
271 
272 		ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
273 		page_no = new_block->page.id.page_no();
274 		/* We only do this in the debug build, to ensure that
275 		both the check in buf_flush_init_for_writing() and
276 		recv_parse_or_apply_log_rec_body() will see a valid
277 		page type. The flushes of new_block are actually
278 		unnecessary here.  */
279 		ut_d(mlog_write_ulint(FIL_PAGE_TYPE + new_block->frame,
280 				      FIL_PAGE_TYPE_SYS, MLOG_2BYTES, &mtr));
281 
282 		if (i == FSP_EXTENT_SIZE / 2) {
283 			ut_a(page_no == FSP_EXTENT_SIZE);
284 			mlog_write_ulint(doublewrite
285 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
286 					 page_no, MLOG_4BYTES, &mtr);
287 			mlog_write_ulint(doublewrite
288 					 + TRX_SYS_DOUBLEWRITE_REPEAT
289 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
290 					 page_no, MLOG_4BYTES, &mtr);
291 
292 		} else if (i == FSP_EXTENT_SIZE / 2
293 			   + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
294 			ut_a(page_no == 2 * FSP_EXTENT_SIZE);
295 			mlog_write_ulint(doublewrite
296 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
297 					 page_no, MLOG_4BYTES, &mtr);
298 			mlog_write_ulint(doublewrite
299 					 + TRX_SYS_DOUBLEWRITE_REPEAT
300 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
301 					 page_no, MLOG_4BYTES, &mtr);
302 
303 		} else if (i > FSP_EXTENT_SIZE / 2) {
304 			ut_a(page_no == prev_page_no + 1);
305 		}
306 
307 		if (((i + 1) & 15) == 0) {
308 			/* rw_locks can only be recursively x-locked
309 			2048 times. (on 32 bit platforms,
310 			(lint) 0 - (X_LOCK_DECR * 2049)
311 			is no longer a negative number, and thus
312 			lock_word becomes like a shared lock).
313 			For 4k page size this loop will
314 			lock the fseg header too many times. Since
315 			this code is not done while any other threads
316 			are active, restart the MTR occasionally. */
317 			mtr_commit(&mtr);
318 			mtr_start(&mtr);
319 			doublewrite = buf_dblwr_get(&mtr);
320 			fseg_header = doublewrite
321 				      + TRX_SYS_DOUBLEWRITE_FSEG;
322 		}
323 
324 		prev_page_no = page_no;
325 	}
326 
327 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
328 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
329 			 MLOG_4BYTES, &mtr);
330 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
331 			 + TRX_SYS_DOUBLEWRITE_REPEAT,
332 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
333 			 MLOG_4BYTES, &mtr);
334 
335 	mlog_write_ulint(doublewrite
336 			 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
337 			 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
338 			 MLOG_4BYTES, &mtr);
339 	mtr_commit(&mtr);
340 
341 	/* Flush the modified pages to disk and make a checkpoint */
342 	log_make_checkpoint();
343 	buf_dblwr_being_created = FALSE;
344 
345 	/* Remove doublewrite pages from LRU */
346 	buf_pool_invalidate();
347 
348 	ib::info() <<  "Doublewrite buffer created";
349 
350 	goto start_again;
351 }
352 
353 /**
354 At database startup initializes the doublewrite buffer memory structure if
355 we already have a doublewrite buffer created in the data files. If we are
356 upgrading to an InnoDB version which supports multiple tablespaces, then this
357 function performs the necessary update operations. If we are in a crash
358 recovery, this function loads the pages from double write buffer into memory.
359 @param[in]	file		File handle
360 @param[in]	path		Path name of file
361 @return DB_SUCCESS or error code */
362 dberr_t
buf_dblwr_init_or_load_pages(pfs_os_file_t file,const char * path)363 buf_dblwr_init_or_load_pages(
364 	pfs_os_file_t	file,
365 	const char*	path)
366 {
367 	byte*		buf;
368 	byte*		page;
369 	ulint		block1;
370 	ulint		block2;
371 	ulint		space_id;
372 	byte*		read_buf;
373 	byte*		doublewrite;
374 	byte*		unaligned_read_buf;
375 	ibool		reset_space_ids = FALSE;
376 	recv_dblwr_t&	recv_dblwr = recv_sys->dblwr;
377 
378 	/* We do the file i/o past the buffer pool */
379 
380 	unaligned_read_buf = static_cast<byte*>(
381 		ut_malloc_nokey(3U << srv_page_size_shift));
382 
383 	read_buf = static_cast<byte*>(
384 		ut_align(unaligned_read_buf, srv_page_size));
385 
386 	/* Read the trx sys header to check if we are using the doublewrite
387 	buffer */
388 	dberr_t		err;
389 
390 	IORequest	read_request(IORequest::READ);
391 
392 	err = os_file_read(
393 		read_request,
394 		file, read_buf, TRX_SYS_PAGE_NO << srv_page_size_shift,
395 		srv_page_size);
396 
397 	if (err != DB_SUCCESS) {
398 
399 		ib::error()
400 			<< "Failed to read the system tablespace header page";
401 
402 		ut_free(unaligned_read_buf);
403 
404 		return(err);
405 	}
406 
407 	doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
408 
409 	/* TRX_SYS_PAGE_NO is not encrypted see fil_crypt_rotate_page() */
410 
411 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
412 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
413 		/* The doublewrite buffer has been created */
414 
415 		buf_dblwr_init(doublewrite);
416 
417 		block1 = buf_dblwr->block1;
418 		block2 = buf_dblwr->block2;
419 
420 		buf = buf_dblwr->write_buf;
421 	} else {
422 		ut_free(unaligned_read_buf);
423 		return(DB_SUCCESS);
424 	}
425 
426 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
427 	    != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
428 
429 		/* We are upgrading from a version < 4.1.x to a version where
430 		multiple tablespaces are supported. We must reset the space id
431 		field in the pages in the doublewrite buffer because starting
432 		from this version the space id is stored to
433 		FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
434 
435 		reset_space_ids = TRUE;
436 
437 		ib::info() << "Resetting space id's in the doublewrite buffer";
438 	}
439 
440 	/* Read the pages from the doublewrite buffer to memory */
441 	err = os_file_read(
442 		read_request,
443 		file, buf, block1 << srv_page_size_shift,
444 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
445 
446 	if (err != DB_SUCCESS) {
447 
448 		ib::error()
449 			<< "Failed to read the first double write buffer "
450 			"extent";
451 
452 		ut_free(unaligned_read_buf);
453 
454 		return(err);
455 	}
456 
457 	err = os_file_read(
458 		read_request,
459 		file,
460 		buf + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift),
461 		block2 << srv_page_size_shift,
462 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
463 
464 	if (err != DB_SUCCESS) {
465 
466 		ib::error()
467 			<< "Failed to read the second double write buffer "
468 			"extent";
469 
470 		ut_free(unaligned_read_buf);
471 
472 		return(err);
473 	}
474 
475 	/* Check if any of these pages is half-written in data files, in the
476 	intended position */
477 
478 	page = buf;
479 
480 	for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
481 		if (reset_space_ids) {
482 			ulint source_page_no;
483 
484 			space_id = 0;
485 			mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
486 					space_id);
487 			/* We do not need to calculate new checksums for the
488 			pages because the field .._SPACE_ID does not affect
489 			them. Write the page back to where we read it from. */
490 
491 			if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
492 				source_page_no = block1 + i;
493 			} else {
494 				source_page_no = block2
495 					+ i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
496 			}
497 
498 			IORequest	write_request(IORequest::WRITE);
499 
500 			err = os_file_write(
501 				write_request, path, file, page,
502 				source_page_no << srv_page_size_shift,
503 				srv_page_size);
504 			if (err != DB_SUCCESS) {
505 
506 				ib::error()
507 					<< "Failed to write to the double write"
508 					" buffer";
509 
510 				ut_free(unaligned_read_buf);
511 
512 				return(err);
513 			}
514 
515 		} else if (memcmp(field_ref_zero, page + FIL_PAGE_LSN, 8)) {
516 			/* Each valid page header must contain
517 			a nonzero FIL_PAGE_LSN field. */
518 			recv_dblwr.add(page);
519 		}
520 
521 		page += srv_page_size;
522 	}
523 
524 	if (reset_space_ids) {
525 		os_file_flush(file);
526 	}
527 
528 	ut_free(unaligned_read_buf);
529 
530 	return(DB_SUCCESS);
531 }
532 
533 /** Process and remove the double write buffer pages for all tablespaces. */
534 void
buf_dblwr_process()535 buf_dblwr_process()
536 {
537 	ut_ad(recv_sys->parse_start_lsn);
538 
539 	ulint		page_no_dblwr	= 0;
540 	byte*		read_buf;
541 	recv_dblwr_t&	recv_dblwr	= recv_sys->dblwr;
542 
543 	if (!buf_dblwr) {
544 		return;
545 	}
546 
547 	read_buf = static_cast<byte*>(
548 		aligned_malloc(3 * srv_page_size, srv_page_size));
549 	byte* const buf = read_buf + srv_page_size;
550 
551 	for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
552 	     i != recv_dblwr.pages.end();
553 	     ++i, ++page_no_dblwr) {
554 		byte* page = *i;
555 		const ulint page_no = page_get_page_no(page);
556 
557 		if (!page_no) {
558 			/* page 0 should have been recovered
559 			already via Datafile::restore_from_doublewrite() */
560 			continue;
561 		}
562 
563 		const ulint space_id = page_get_space_id(page);
564 		const lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
565 
566 		if (recv_sys->parse_start_lsn > lsn) {
567 			/* Pages written before the checkpoint are
568 			not useful for recovery. */
569 			continue;
570 		}
571 
572 		const page_id_t page_id(space_id, page_no);
573 
574 		if (recv_sys->scanned_lsn < lsn) {
575 			ib::warn() << "Ignoring a doublewrite copy of page "
576 				   << page_id
577 				   << " with future log sequence number "
578 				   << lsn;
579 			continue;
580 		}
581 
582 		fil_space_t* space = fil_space_acquire_for_io(space_id);
583 
584 		if (!space) {
585 			/* Maybe we have dropped the tablespace
586 			and this page once belonged to it: do nothing */
587 			continue;
588 		}
589 
590 		fil_space_open_if_needed(space);
591 
592 		if (UNIV_UNLIKELY(page_no >= space->size)) {
593 
594 			/* Do not report the warning if the tablespace
595 			is scheduled for truncation or was truncated
596 			and we have parsed an MLOG_TRUNCATE record. */
597 			if (!srv_is_tablespace_truncated(space_id)
598 			    && !srv_was_tablespace_truncated(space)
599 			    && !srv_is_undo_tablespace(space_id)) {
600 				ib::warn() << "A copy of page " << page_no
601 					<< " in the doublewrite buffer slot "
602 					<< page_no_dblwr
603 					<< " is beyond the end of tablespace "
604 					<< space->name
605 					<< " (" << space->size << " pages)";
606 			}
607 next_page:
608 			space->release_for_io();
609 			continue;
610 		}
611 
612 		const page_size_t	page_size(space->flags);
613 		ut_ad(!buf_is_zeroes(span<const byte>(page,
614 						      page_size.physical())));
615 
616 		/* We want to ensure that for partial reads the
617 		unread portion of the page is NUL. */
618 		memset(read_buf, 0x0, page_size.physical());
619 
620 		IORequest	request;
621 
622 		request.dblwr_recover();
623 
624 		/* Read in the actual page from the file */
625 		dberr_t	err = fil_io(
626 			request, true,
627 			page_id, page_size,
628 				0, page_size.physical(), read_buf, NULL);
629 
630 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
631 			ib::warn()
632 				<< "Double write buffer recovery: "
633 				<< page_id << " read failed with "
634 				<< "error: " << err;
635 		}
636 
637 		if (buf_is_zeroes(span<const byte>(read_buf,
638 						   page_size.physical()))) {
639 			/* We will check if the copy in the
640 			doublewrite buffer is valid. If not, we will
641 			ignore this page (there should be redo log
642 			records to initialize it). */
643 		} else if (recv_dblwr.validate_page(
644 				page_id, read_buf, space, buf)) {
645 			goto next_page;
646 		} else {
647 			/* We intentionally skip this message for
648 			all-zero pages. */
649 			ib::info()
650 				<< "Trying to recover page " << page_id
651 				<< " from the doublewrite buffer.";
652 		}
653 
654 		page = recv_dblwr.find_page(page_id, space, buf);
655 
656 		if (!page) {
657 			goto next_page;
658 		}
659 
660 		/* Write the good page from the doublewrite buffer to
661 		the intended position. */
662 
663 		IORequest	write_request(IORequest::WRITE);
664 
665 		fil_io(write_request, true, page_id, page_size,
666 		       0, page_size.physical(), page, NULL);
667 
668 		ib::info() << "Recovered page " << page_id
669 			<< " from the doublewrite buffer.";
670 
671 		goto next_page;
672 	}
673 
674 	recv_dblwr.pages.clear();
675 
676 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
677 	aligned_free(read_buf);
678 }
679 
680 /****************************************************************//**
681 Frees doublewrite buffer. */
682 void
buf_dblwr_free()683 buf_dblwr_free()
684 {
685 	/* Free the double write data structures. */
686 	ut_a(buf_dblwr != NULL);
687 	ut_ad(buf_dblwr->s_reserved == 0);
688 	ut_ad(buf_dblwr->b_reserved == 0);
689 
690 	os_event_destroy(buf_dblwr->b_event);
691 	os_event_destroy(buf_dblwr->s_event);
692 	ut_free(buf_dblwr->write_buf_unaligned);
693 	buf_dblwr->write_buf_unaligned = NULL;
694 
695 	ut_free(buf_dblwr->buf_block_arr);
696 	buf_dblwr->buf_block_arr = NULL;
697 
698 	ut_free(buf_dblwr->in_use);
699 	buf_dblwr->in_use = NULL;
700 
701 	mutex_free(&buf_dblwr->mutex);
702 	ut_free(buf_dblwr);
703 	buf_dblwr = NULL;
704 }
705 
706 /********************************************************************//**
707 Updates the doublewrite buffer when an IO request is completed. */
708 void
buf_dblwr_update(const buf_page_t * bpage,buf_flush_t flush_type)709 buf_dblwr_update(
710 /*=============*/
711 	const buf_page_t*	bpage,	/*!< in: buffer block descriptor */
712 	buf_flush_t		flush_type)/*!< in: flush type */
713 {
714 	ut_ad(srv_use_doublewrite_buf);
715 	ut_ad(buf_dblwr);
716 	ut_ad(!fsp_is_system_temporary(bpage->id.space()));
717 	ut_ad(!srv_read_only_mode);
718 
719 	switch (flush_type) {
720 	case BUF_FLUSH_LIST:
721 	case BUF_FLUSH_LRU:
722 		mutex_enter(&buf_dblwr->mutex);
723 
724 		ut_ad(buf_dblwr->batch_running);
725 		ut_ad(buf_dblwr->b_reserved > 0);
726 		ut_ad(buf_dblwr->b_reserved <= buf_dblwr->first_free);
727 
728 		buf_dblwr->b_reserved--;
729 
730 		if (buf_dblwr->b_reserved == 0) {
731 			mutex_exit(&buf_dblwr->mutex);
732 			/* This will finish the batch. Sync data files
733 			to the disk. */
734 			fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
735 			mutex_enter(&buf_dblwr->mutex);
736 
737 			/* We can now reuse the doublewrite memory buffer: */
738 			buf_dblwr->first_free = 0;
739 			buf_dblwr->batch_running = false;
740 			os_event_set(buf_dblwr->b_event);
741 		}
742 
743 		mutex_exit(&buf_dblwr->mutex);
744 		break;
745 	case BUF_FLUSH_SINGLE_PAGE:
746 		{
747 			const ulint size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
748 			ulint i;
749 			mutex_enter(&buf_dblwr->mutex);
750 			for (i = srv_doublewrite_batch_size; i < size; ++i) {
751 				if (buf_dblwr->buf_block_arr[i] == bpage) {
752 					buf_dblwr->s_reserved--;
753 					buf_dblwr->buf_block_arr[i] = NULL;
754 					buf_dblwr->in_use[i] = false;
755 					break;
756 				}
757 			}
758 
759 			/* The block we are looking for must exist as a
760 			reserved block. */
761 			ut_a(i < size);
762 		}
763 		os_event_set(buf_dblwr->s_event);
764 		mutex_exit(&buf_dblwr->mutex);
765 		break;
766 	case BUF_FLUSH_N_TYPES:
767 		ut_error;
768 	}
769 }
770 
771 /********************************************************************//**
772 Check the LSN values on the page. */
773 static
774 void
buf_dblwr_check_page_lsn(const page_t * page)775 buf_dblwr_check_page_lsn(
776 /*=====================*/
777 	const page_t*	page)		/*!< in: page to check */
778 {
779 	ibool page_compressed = (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED);
780 	uint key_version = mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
781 
782 	/* Ignore page compressed or encrypted pages */
783 	if (page_compressed || key_version) {
784 		return;
785 	}
786 
787 	if (memcmp(page + (FIL_PAGE_LSN + 4),
788 		   page + (srv_page_size
789 			   - FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
790 		   4)) {
791 
792 		const ulint	lsn1 = mach_read_from_4(
793 			page + FIL_PAGE_LSN + 4);
794 		const ulint	lsn2 = mach_read_from_4(
795 			page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM
796 			+ 4);
797 
798 		ib::error() << "The page to be written seems corrupt!"
799 			" The low 4 bytes of LSN fields do not match"
800 			" (" << lsn1 << " != " << lsn2 << ")!"
801 			" Noticed in the buffer pool.";
802 	}
803 }
804 
805 /********************************************************************//**
806 Asserts when a corrupt block is find during writing out data to the
807 disk. */
808 static
809 void
buf_dblwr_assert_on_corrupt_block(const buf_block_t * block)810 buf_dblwr_assert_on_corrupt_block(
811 /*==============================*/
812 	const buf_block_t*	block)	/*!< in: block to check */
813 {
814 	buf_page_print(block->frame, univ_page_size);
815 
816 	ib::fatal() << "Apparent corruption of an index page "
817 		<< block->page.id
818 		<< " to be written to data file. We intentionally crash"
819 		" the server to prevent corrupt data from ending up in"
820 		" data files.";
821 }
822 
823 /********************************************************************//**
824 Check the LSN values on the page with which this block is associated.
825 Also validate the page if the option is set. */
826 static
827 void
buf_dblwr_check_block(const buf_block_t * block)828 buf_dblwr_check_block(
829 /*==================*/
830 	const buf_block_t*	block)	/*!< in: block to check */
831 {
832 	ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
833 
834 	switch (fil_page_get_type(block->frame)) {
835 	case FIL_PAGE_INDEX:
836 	case FIL_PAGE_TYPE_INSTANT:
837 	case FIL_PAGE_RTREE:
838 		if (page_is_comp(block->frame)) {
839 			if (page_simple_validate_new(block->frame)) {
840 				return;
841 			}
842 		} else if (page_simple_validate_old(block->frame)) {
843 			return;
844 		}
845 		/* While it is possible that this is not an index page
846 		but just happens to have wrongly set FIL_PAGE_TYPE,
847 		such pages should never be modified to without also
848 		adjusting the page type during page allocation or
849 		buf_flush_init_for_writing() or fil_block_reset_type(). */
850 		break;
851 	case FIL_PAGE_TYPE_FSP_HDR:
852 	case FIL_PAGE_IBUF_BITMAP:
853 	case FIL_PAGE_TYPE_UNKNOWN:
854 		/* Do not complain again, we already reset this field. */
855 	case FIL_PAGE_UNDO_LOG:
856 	case FIL_PAGE_INODE:
857 	case FIL_PAGE_IBUF_FREE_LIST:
858 	case FIL_PAGE_TYPE_SYS:
859 	case FIL_PAGE_TYPE_TRX_SYS:
860 	case FIL_PAGE_TYPE_XDES:
861 	case FIL_PAGE_TYPE_BLOB:
862 	case FIL_PAGE_TYPE_ZBLOB:
863 	case FIL_PAGE_TYPE_ZBLOB2:
864 		/* TODO: validate also non-index pages */
865 		return;
866 	case FIL_PAGE_TYPE_ALLOCATED:
867 		/* empty pages should never be flushed */
868 		return;
869 	}
870 
871 	buf_dblwr_assert_on_corrupt_block(block);
872 }
873 
874 /********************************************************************//**
875 Writes a page that has already been written to the doublewrite buffer
876 to the datafile. It is the job of the caller to sync the datafile. */
877 static
878 void
buf_dblwr_write_block_to_datafile(const buf_page_t * bpage,bool sync)879 buf_dblwr_write_block_to_datafile(
880 /*==============================*/
881 	const buf_page_t*	bpage,	/*!< in: page to write */
882 	bool			sync)	/*!< in: true if sync IO
883 					is requested */
884 {
885 	ut_a(buf_page_in_file(bpage));
886 
887 	ulint	type = IORequest::WRITE;
888 
889 	if (sync) {
890 		type |= IORequest::DO_NOT_WAKE;
891 	}
892 
893 	IORequest	request(type, const_cast<buf_page_t*>(bpage));
894 
895 	/* We request frame here to get correct buffer in case of
896 	encryption and/or page compression */
897 	void * frame = buf_page_get_frame(bpage);
898 
899 	if (bpage->zip.data != NULL) {
900 		ut_ad(bpage->size.is_compressed());
901 
902 		fil_io(request, sync, bpage->id, bpage->size, 0,
903 		       bpage->size.physical(),
904 		       (void*) frame,
905 		       (void*) bpage);
906 	} else {
907 		ut_ad(!bpage->size.is_compressed());
908 
909 		/* Our IO API is common for both reads and writes and is
910 		therefore geared towards a non-const parameter. */
911 
912 		buf_block_t*	block = reinterpret_cast<buf_block_t*>(
913 			const_cast<buf_page_t*>(bpage));
914 
915 		ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
916 		buf_dblwr_check_page_lsn(block->frame);
917 
918 		fil_io(request,
919 			sync, bpage->id, bpage->size, 0, bpage->real_size,
920 			frame, block);
921 	}
922 }
923 
924 /********************************************************************//**
925 Flushes possible buffered writes from the doublewrite memory buffer to disk,
926 and also wakes up the aio thread if simulated aio is used. It is very
927 important to call this function after a batch of writes has been posted,
928 and also when we may have to wait for a page latch! Otherwise a deadlock
929 of threads can occur. */
930 void
buf_dblwr_flush_buffered_writes()931 buf_dblwr_flush_buffered_writes()
932 {
933 	byte*		write_buf;
934 	ulint		first_free;
935 	ulint		len;
936 
937 	if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
938 		/* Sync the writes to the disk. */
939 		buf_dblwr_sync_datafiles();
940 		/* Now we flush the data to disk (for example, with fsync) */
941 		fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
942 		return;
943 	}
944 
945 	ut_ad(!srv_read_only_mode);
946 
947 try_again:
948 	mutex_enter(&buf_dblwr->mutex);
949 
950 	/* Write first to doublewrite buffer blocks. We use synchronous
951 	aio and thus know that file write has been completed when the
952 	control returns. */
953 
954 	if (buf_dblwr->first_free == 0) {
955 
956 		mutex_exit(&buf_dblwr->mutex);
957 
958 		/* Wake possible simulated aio thread as there could be
959 		system temporary tablespace pages active for flushing.
960 		Note: system temporary tablespace pages are not scheduled
961 		for doublewrite. */
962 		os_aio_simulated_wake_handler_threads();
963 
964 		return;
965 	}
966 
967 	if (buf_dblwr->batch_running) {
968 		/* Another thread is running the batch right now. Wait
969 		for it to finish. */
970 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
971 		mutex_exit(&buf_dblwr->mutex);
972 
973 		os_aio_simulated_wake_handler_threads();
974 		os_event_wait_low(buf_dblwr->b_event, sig_count);
975 		goto try_again;
976 	}
977 
978 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
979 
980 	/* Disallow anyone else to post to doublewrite buffer or to
981 	start another batch of flushing. */
982 	buf_dblwr->batch_running = true;
983 	first_free = buf_dblwr->first_free;
984 
985 	/* Now safe to release the mutex. Note that though no other
986 	thread is allowed to post to the doublewrite batch flushing
987 	but any threads working on single page flushes are allowed
988 	to proceed. */
989 	mutex_exit(&buf_dblwr->mutex);
990 
991 	write_buf = buf_dblwr->write_buf;
992 
993 	for (ulint len2 = 0, i = 0;
994 	     i < buf_dblwr->first_free;
995 	     len2 += srv_page_size, i++) {
996 
997 		const buf_block_t*	block;
998 
999 		block = (buf_block_t*) buf_dblwr->buf_block_arr[i];
1000 
1001 		if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
1002 		    || block->page.zip.data) {
1003 			/* No simple validate for compressed
1004 			pages exists. */
1005 			continue;
1006 		}
1007 
1008 		/* Check that the actual page in the buffer pool is
1009 		not corrupt and the LSN values are sane. */
1010 		buf_dblwr_check_block(block);
1011 
1012 		/* Check that the page as written to the doublewrite
1013 		buffer has sane LSN values. */
1014 		buf_dblwr_check_page_lsn(write_buf + len2);
1015 	}
1016 
1017 	/* Write out the first block of the doublewrite buffer */
1018 	len = std::min<ulint>(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE,
1019 			      buf_dblwr->first_free) << srv_page_size_shift;
1020 
1021 	fil_io(IORequestWrite, true,
1022 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block1), univ_page_size,
1023 	       0, len, (void*) write_buf, NULL);
1024 
1025 	if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1026 		/* No unwritten pages in the second block. */
1027 		goto flush;
1028 	}
1029 
1030 	/* Write out the second block of the doublewrite buffer. */
1031 	len = (buf_dblwr->first_free - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE)
1032 	       << srv_page_size_shift;
1033 
1034 	write_buf = buf_dblwr->write_buf
1035 		+ (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
1036 
1037 	fil_io(IORequestWrite, true,
1038 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block2), univ_page_size,
1039 	       0, len, (void*) write_buf, NULL);
1040 
1041 flush:
1042 	/* increment the doublewrite flushed pages counter */
1043 	srv_stats.dblwr_pages_written.add(buf_dblwr->first_free);
1044 	srv_stats.dblwr_writes.inc();
1045 
1046 	/* Now flush the doublewrite buffer data to disk */
1047 	fil_flush(TRX_SYS_SPACE);
1048 
1049 	/* We know that the writes have been flushed to disk now
1050 	and in recovery we will find them in the doublewrite buffer
1051 	blocks. Next do the writes to the intended positions. */
1052 
1053 	/* Up to this point first_free and buf_dblwr->first_free are
1054 	same because we have set the buf_dblwr->batch_running flag
1055 	disallowing any other thread to post any request but we
1056 	can't safely access buf_dblwr->first_free in the loop below.
1057 	This is so because it is possible that after we are done with
1058 	the last iteration and before we terminate the loop, the batch
1059 	gets finished in the IO helper thread and another thread posts
1060 	a new batch setting buf_dblwr->first_free to a higher value.
1061 	If this happens and we are using buf_dblwr->first_free in the
1062 	loop termination condition then we'll end up dispatching
1063 	the same block twice from two different threads. */
1064 	ut_ad(first_free == buf_dblwr->first_free);
1065 	for (ulint i = 0; i < first_free; i++) {
1066 		buf_dblwr_write_block_to_datafile(
1067 			buf_dblwr->buf_block_arr[i], false);
1068 	}
1069 
1070 	/* Wake possible simulated aio thread to actually post the
1071 	writes to the operating system. We don't flush the files
1072 	at this point. We leave it to the IO helper thread to flush
1073 	datafiles when the whole batch has been processed. */
1074 	os_aio_simulated_wake_handler_threads();
1075 }
1076 
1077 /********************************************************************//**
1078 Posts a buffer page for writing. If the doublewrite memory buffer is
1079 full, calls buf_dblwr_flush_buffered_writes and waits for for free
1080 space to appear. */
1081 void
buf_dblwr_add_to_batch(buf_page_t * bpage)1082 buf_dblwr_add_to_batch(
1083 /*====================*/
1084 	buf_page_t*	bpage)	/*!< in: buffer block to write */
1085 {
1086 	ut_a(buf_page_in_file(bpage));
1087 
1088 try_again:
1089 	mutex_enter(&buf_dblwr->mutex);
1090 
1091 	ut_a(buf_dblwr->first_free <= srv_doublewrite_batch_size);
1092 
1093 	if (buf_dblwr->batch_running) {
1094 
1095 		/* This not nearly as bad as it looks. There is only
1096 		page_cleaner thread which does background flushing
1097 		in batches therefore it is unlikely to be a contention
1098 		point. The only exception is when a user thread is
1099 		forced to do a flush batch because of a sync
1100 		checkpoint. */
1101 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
1102 		mutex_exit(&buf_dblwr->mutex);
1103 		os_aio_simulated_wake_handler_threads();
1104 
1105 		os_event_wait_low(buf_dblwr->b_event, sig_count);
1106 		goto try_again;
1107 	}
1108 
1109 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1110 		mutex_exit(&(buf_dblwr->mutex));
1111 
1112 		buf_dblwr_flush_buffered_writes();
1113 
1114 		goto try_again;
1115 	}
1116 
1117 	byte*	p = buf_dblwr->write_buf
1118 		+ srv_page_size * buf_dblwr->first_free;
1119 
1120 	/* We request frame here to get correct buffer in case of
1121 	encryption and/or page compression */
1122 	void * frame = buf_page_get_frame(bpage);
1123 
1124 	if (bpage->size.is_compressed()) {
1125 		MEM_CHECK_DEFINED(bpage->zip.data, bpage->size.physical());
1126 		/* Copy the compressed page and clear the rest. */
1127 
1128 		memcpy(p, frame, bpage->size.physical());
1129 
1130 		memset(p + bpage->size.physical(), 0x0,
1131 		       srv_page_size - bpage->size.physical());
1132 	} else {
1133 		ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1134 		MEM_CHECK_DEFINED(frame, bpage->size.logical());
1135 		memcpy(p, frame, bpage->size.logical());
1136 	}
1137 
1138 	buf_dblwr->buf_block_arr[buf_dblwr->first_free] = bpage;
1139 
1140 	buf_dblwr->first_free++;
1141 	buf_dblwr->b_reserved++;
1142 
1143 	ut_ad(!buf_dblwr->batch_running);
1144 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
1145 	ut_ad(buf_dblwr->b_reserved <= srv_doublewrite_batch_size);
1146 
1147 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1148 		mutex_exit(&(buf_dblwr->mutex));
1149 
1150 		buf_dblwr_flush_buffered_writes();
1151 
1152 		return;
1153 	}
1154 
1155 	mutex_exit(&(buf_dblwr->mutex));
1156 }
1157 
1158 /********************************************************************//**
1159 Writes a page to the doublewrite buffer on disk, sync it, then write
1160 the page to the datafile and sync the datafile. This function is used
1161 for single page flushes. If all the buffers allocated for single page
1162 flushes in the doublewrite buffer are in use we wait here for one to
1163 become free. We are guaranteed that a slot will become free because any
1164 thread that is using a slot must also release the slot before leaving
1165 this function. */
1166 void
buf_dblwr_write_single_page(buf_page_t * bpage,bool sync)1167 buf_dblwr_write_single_page(
1168 /*========================*/
1169 	buf_page_t*	bpage,	/*!< in: buffer block to write */
1170 	bool		sync)	/*!< in: true if sync IO requested */
1171 {
1172 	ulint		n_slots;
1173 	ulint		size;
1174 	ulint		offset;
1175 	ulint		i;
1176 
1177 	ut_a(buf_page_in_file(bpage));
1178 	ut_a(srv_use_doublewrite_buf);
1179 	ut_a(buf_dblwr != NULL);
1180 
1181 	/* total number of slots available for single page flushes
1182 	starts from srv_doublewrite_batch_size to the end of the
1183 	buffer. */
1184 	size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1185 	ut_a(size > srv_doublewrite_batch_size);
1186 	n_slots = size - srv_doublewrite_batch_size;
1187 
1188 	if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1189 
1190 		/* Check that the actual page in the buffer pool is
1191 		not corrupt and the LSN values are sane. */
1192 		buf_dblwr_check_block((buf_block_t*) bpage);
1193 
1194 		/* Check that the page as written to the doublewrite
1195 		buffer has sane LSN values. */
1196 		if (!bpage->zip.data) {
1197 			buf_dblwr_check_page_lsn(
1198 				((buf_block_t*) bpage)->frame);
1199 		}
1200 	}
1201 
1202 retry:
1203 	mutex_enter(&buf_dblwr->mutex);
1204 	if (buf_dblwr->s_reserved == n_slots) {
1205 
1206 		/* All slots are reserved. */
1207 		int64_t	sig_count = os_event_reset(buf_dblwr->s_event);
1208 		mutex_exit(&buf_dblwr->mutex);
1209 		os_event_wait_low(buf_dblwr->s_event, sig_count);
1210 
1211 		goto retry;
1212 	}
1213 
1214 	for (i = srv_doublewrite_batch_size; i < size; ++i) {
1215 
1216 		if (!buf_dblwr->in_use[i]) {
1217 			break;
1218 		}
1219 	}
1220 
1221 	/* We are guaranteed to find a slot. */
1222 	ut_a(i < size);
1223 	buf_dblwr->in_use[i] = true;
1224 	buf_dblwr->s_reserved++;
1225 	buf_dblwr->buf_block_arr[i] = bpage;
1226 
1227 	/* increment the doublewrite flushed pages counter */
1228 	srv_stats.dblwr_pages_written.inc();
1229 	srv_stats.dblwr_writes.inc();
1230 
1231 	mutex_exit(&buf_dblwr->mutex);
1232 
1233 	/* Lets see if we are going to write in the first or second
1234 	block of the doublewrite buffer. */
1235 	if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1236 		offset = buf_dblwr->block1 + i;
1237 	} else {
1238 		offset = buf_dblwr->block2 + i
1239 			 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1240 	}
1241 
1242 	/* We deal with compressed and uncompressed pages a little
1243 	differently here. In case of uncompressed pages we can
1244 	directly write the block to the allocated slot in the
1245 	doublewrite buffer in the system tablespace and then after
1246 	syncing the system table space we can proceed to write the page
1247 	in the datafile.
1248 	In case of compressed page we first do a memcpy of the block
1249 	to the in-memory buffer of doublewrite before proceeding to
1250 	write it. This is so because we want to pad the remaining
1251 	bytes in the doublewrite page with zeros. */
1252 
1253 	/* We request frame here to get correct buffer in case of
1254 	encryption and/or page compression */
1255 	void * frame = buf_page_get_frame(bpage);
1256 
1257 	if (bpage->size.is_compressed()) {
1258 		memcpy(buf_dblwr->write_buf + srv_page_size * i,
1259 		       frame, bpage->size.physical());
1260 
1261 		memset(buf_dblwr->write_buf + srv_page_size * i
1262 		       + bpage->size.physical(), 0x0,
1263 		       srv_page_size - bpage->size.physical());
1264 
1265 		fil_io(IORequestWrite,
1266 		       true,
1267 		       page_id_t(TRX_SYS_SPACE, offset),
1268 		       univ_page_size,
1269 		       0,
1270 		       srv_page_size,
1271 		       (void *)(buf_dblwr->write_buf + srv_page_size * i),
1272 		       NULL);
1273 	} else {
1274 		/* It is a regular page. Write it directly to the
1275 		doublewrite buffer */
1276 		fil_io(IORequestWrite,
1277 		       true,
1278 		       page_id_t(TRX_SYS_SPACE, offset),
1279 		       univ_page_size,
1280 		       0,
1281 		       srv_page_size,
1282 		       (void*) frame,
1283 		       NULL);
1284 	}
1285 
1286 	/* Now flush the doublewrite buffer data to disk */
1287 	fil_flush(TRX_SYS_SPACE);
1288 
1289 	/* We know that the write has been flushed to disk now
1290 	and during recovery we will find it in the doublewrite buffer
1291 	blocks. Next do the write to the intended position. */
1292 	buf_dblwr_write_block_to_datafile(bpage, sync);
1293 }
1294