1 /*****************************************************************************
2 
3 Copyright (c) 1995, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2013, 2020, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file buf/buf0dblwr.cc
22 Doublwrite buffer module
23 
24 Created 2011/12/19
25 *******************************************************/
26 
27 #include "buf0dblwr.h"
28 #include "buf0buf.h"
29 #include "buf0checksum.h"
30 #include "srv0start.h"
31 #include "srv0srv.h"
32 #include "page0zip.h"
33 #include "trx0sys.h"
34 #include "fil0crypt.h"
35 #include "fil0pagecompress.h"
36 
37 using st_::span;
38 
39 /** The doublewrite buffer */
40 buf_dblwr_t*	buf_dblwr = NULL;
41 
42 /** Set to TRUE when the doublewrite buffer is being created */
43 ibool	buf_dblwr_being_created = FALSE;
44 
45 #define TRX_SYS_DOUBLEWRITE_BLOCKS 2
46 
47 /****************************************************************//**
48 Determines if a page number is located inside the doublewrite buffer.
49 @return TRUE if the location is inside the two blocks of the
50 doublewrite buffer */
51 ibool
buf_dblwr_page_inside(ulint page_no)52 buf_dblwr_page_inside(
53 /*==================*/
54 	ulint	page_no)	/*!< in: page number */
55 {
56 	if (buf_dblwr == NULL) {
57 
58 		return(FALSE);
59 	}
60 
61 	if (page_no >= buf_dblwr->block1
62 	    && page_no < buf_dblwr->block1
63 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
64 		return(TRUE);
65 	}
66 
67 	if (page_no >= buf_dblwr->block2
68 	    && page_no < buf_dblwr->block2
69 	    + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
70 		return(TRUE);
71 	}
72 
73 	return(FALSE);
74 }
75 
76 /****************************************************************//**
77 Calls buf_page_get() on the TRX_SYS_PAGE and returns a pointer to the
78 doublewrite buffer within it.
79 @return pointer to the doublewrite buffer within the filespace header
80 page. */
81 UNIV_INLINE
82 byte*
buf_dblwr_get(mtr_t * mtr)83 buf_dblwr_get(
84 /*==========*/
85 	mtr_t*	mtr)	/*!< in/out: MTR to hold the page latch */
86 {
87 	buf_block_t*	block;
88 
89 	block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
90 			     0, RW_X_LATCH, mtr);
91 
92 	buf_block_dbg_add_level(block, SYNC_NO_ORDER_CHECK);
93 
94 	return(buf_block_get_frame(block) + TRX_SYS_DOUBLEWRITE);
95 }
96 
97 /********************************************************************//**
98 Flush a batch of writes to the datafiles that have already been
99 written to the dblwr buffer on disk. */
100 void
buf_dblwr_sync_datafiles()101 buf_dblwr_sync_datafiles()
102 /*======================*/
103 {
104 	/* Wake possible simulated aio thread to actually post the
105 	writes to the operating system */
106 	os_aio_simulated_wake_handler_threads();
107 
108 	/* Wait that all async writes to tablespaces have been posted to
109 	the OS */
110 	os_aio_wait_until_no_pending_writes();
111 }
112 
113 /****************************************************************//**
114 Creates or initialializes the doublewrite buffer at a database start. */
115 static
116 void
buf_dblwr_init(byte * doublewrite)117 buf_dblwr_init(
118 /*===========*/
119 	byte*	doublewrite)	/*!< in: pointer to the doublewrite buf
120 				header on trx sys page */
121 {
122 	ulint	buf_size;
123 
124 	buf_dblwr = static_cast<buf_dblwr_t*>(
125 		ut_zalloc_nokey(sizeof(buf_dblwr_t)));
126 
127 	/* There are two blocks of same size in the doublewrite
128 	buffer. */
129 	buf_size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
130 
131 	/* There must be atleast one buffer for single page writes
132 	and one buffer for batch writes. */
133 	ut_a(srv_doublewrite_batch_size > 0
134 	     && srv_doublewrite_batch_size < buf_size);
135 
136 	mutex_create(LATCH_ID_BUF_DBLWR, &buf_dblwr->mutex);
137 
138 	buf_dblwr->b_event = os_event_create("dblwr_batch_event");
139 	buf_dblwr->s_event = os_event_create("dblwr_single_event");
140 	buf_dblwr->first_free = 0;
141 	buf_dblwr->s_reserved = 0;
142 	buf_dblwr->b_reserved = 0;
143 
144 	buf_dblwr->block1 = mach_read_from_4(
145 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK1);
146 	buf_dblwr->block2 = mach_read_from_4(
147 		doublewrite + TRX_SYS_DOUBLEWRITE_BLOCK2);
148 
149 	buf_dblwr->in_use = static_cast<bool*>(
150 		ut_zalloc_nokey(buf_size * sizeof(bool)));
151 
152 	buf_dblwr->write_buf_unaligned = static_cast<byte*>(
153 		ut_malloc_nokey((1 + buf_size) << srv_page_size_shift));
154 
155 	buf_dblwr->write_buf = static_cast<byte*>(
156 		ut_align(buf_dblwr->write_buf_unaligned,
157 			 srv_page_size));
158 
159 	buf_dblwr->buf_block_arr = static_cast<buf_page_t**>(
160 		ut_zalloc_nokey(buf_size * sizeof(void*)));
161 }
162 
163 /** Create the doublewrite buffer if the doublewrite buffer header
164 is not present in the TRX_SYS page.
165 @return	whether the operation succeeded
166 @retval	true	if the doublewrite buffer exists or was created
167 @retval	false	if the creation failed (too small first data file) */
168 bool
buf_dblwr_create()169 buf_dblwr_create()
170 {
171 	buf_block_t*	block2;
172 	buf_block_t*	new_block;
173 	buf_block_t*	trx_sys_block;
174 	byte*	doublewrite;
175 	byte*	fseg_header;
176 	ulint	page_no;
177 	ulint	prev_page_no;
178 	ulint	i;
179 	mtr_t	mtr;
180 
181 	if (buf_dblwr) {
182 		/* Already inited */
183 		return(true);
184 	}
185 
186 start_again:
187 	mtr.start();
188 	buf_dblwr_being_created = TRUE;
189 
190 	doublewrite = buf_dblwr_get(&mtr);
191 
192 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
193 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
194 		/* The doublewrite buffer has already been created:
195 		just read in some numbers */
196 
197 		buf_dblwr_init(doublewrite);
198 
199 		mtr.commit();
200 		buf_dblwr_being_created = FALSE;
201 		return(true);
202 	} else {
203 		if (UT_LIST_GET_FIRST(fil_system.sys_space->chain)->size
204 		    < 3 * FSP_EXTENT_SIZE) {
205 			goto too_small;
206 		}
207 	}
208 
209 	trx_sys_block = buf_page_get(page_id_t(TRX_SYS_SPACE, TRX_SYS_PAGE_NO),
210 				     0, RW_X_LATCH, &mtr);
211 
212 	block2 = fseg_create(fil_system.sys_space,
213 			     TRX_SYS_DOUBLEWRITE + TRX_SYS_DOUBLEWRITE_FSEG,
214 			     &mtr, false, trx_sys_block);
215 
216 	if (block2 == NULL) {
217 too_small:
218 		ib::error()
219 			<< "Cannot create doublewrite buffer: "
220 			"the first file in innodb_data_file_path"
221 			" must be at least "
222 			<< (3 * (FSP_EXTENT_SIZE
223 				 >> (20U - srv_page_size_shift)))
224 			<< "M.";
225 		mtr.commit();
226 		return(false);
227 	}
228 
229 	ib::info() << "Doublewrite buffer not found: creating new";
230 
231 	/* FIXME: After this point, the doublewrite buffer creation
232 	is not atomic. The doublewrite buffer should not exist in
233 	the InnoDB system tablespace file in the first place.
234 	It could be located in separate optional file(s) in a
235 	user-specified location. */
236 
237 	/* fseg_create acquires a second latch on the page,
238 	therefore we must declare it: */
239 
240 	buf_block_dbg_add_level(block2, SYNC_NO_ORDER_CHECK);
241 
242 	fseg_header = doublewrite + TRX_SYS_DOUBLEWRITE_FSEG;
243 	prev_page_no = 0;
244 
245 	for (i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE
246 		     + FSP_EXTENT_SIZE / 2; i++) {
247 		new_block = fseg_alloc_free_page(
248 			fseg_header, prev_page_no + 1, FSP_UP, &mtr);
249 		if (new_block == NULL) {
250 			ib::error() << "Cannot create doublewrite buffer: "
251 				" you must increase your tablespace size."
252 				" Cannot continue operation.";
253 			/* This may essentially corrupt the doublewrite
254 			buffer. However, usually the doublewrite buffer
255 			is created at database initialization, and it
256 			should not matter (just remove all newly created
257 			InnoDB files and restart). */
258 			mtr.commit();
259 			return(false);
260 		}
261 
262 		/* We read the allocated pages to the buffer pool;
263 		when they are written to disk in a flush, the space
264 		id and page number fields are also written to the
265 		pages. When we at database startup read pages
266 		from the doublewrite buffer, we know that if the
267 		space id and page number in them are the same as
268 		the page position in the tablespace, then the page
269 		has not been written to in doublewrite. */
270 
271 		ut_ad(rw_lock_get_x_lock_count(&new_block->lock) == 1);
272 		page_no = new_block->page.id.page_no();
273 		/* We only do this in the debug build, to ensure that
274 		both the check in buf_flush_init_for_writing() and
275 		recv_parse_or_apply_log_rec_body() will see a valid
276 		page type. The flushes of new_block are actually
277 		unnecessary here.  */
278 		ut_d(mlog_write_ulint(FIL_PAGE_TYPE + new_block->frame,
279 				      FIL_PAGE_TYPE_SYS, MLOG_2BYTES, &mtr));
280 
281 		if (i == FSP_EXTENT_SIZE / 2) {
282 			ut_a(page_no == FSP_EXTENT_SIZE);
283 			mlog_write_ulint(doublewrite
284 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
285 					 page_no, MLOG_4BYTES, &mtr);
286 			mlog_write_ulint(doublewrite
287 					 + TRX_SYS_DOUBLEWRITE_REPEAT
288 					 + TRX_SYS_DOUBLEWRITE_BLOCK1,
289 					 page_no, MLOG_4BYTES, &mtr);
290 
291 		} else if (i == FSP_EXTENT_SIZE / 2
292 			   + TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
293 			ut_a(page_no == 2 * FSP_EXTENT_SIZE);
294 			mlog_write_ulint(doublewrite
295 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
296 					 page_no, MLOG_4BYTES, &mtr);
297 			mlog_write_ulint(doublewrite
298 					 + TRX_SYS_DOUBLEWRITE_REPEAT
299 					 + TRX_SYS_DOUBLEWRITE_BLOCK2,
300 					 page_no, MLOG_4BYTES, &mtr);
301 
302 		} else if (i > FSP_EXTENT_SIZE / 2) {
303 			ut_a(page_no == prev_page_no + 1);
304 		}
305 
306 		if (((i + 1) & 15) == 0) {
307 			/* rw_locks can only be recursively x-locked
308 			2048 times. (on 32 bit platforms,
309 			(lint) 0 - (X_LOCK_DECR * 2049)
310 			is no longer a negative number, and thus
311 			lock_word becomes like a shared lock).
312 			For 4k page size this loop will
313 			lock the fseg header too many times. Since
314 			this code is not done while any other threads
315 			are active, restart the MTR occasionally. */
316 			mtr_commit(&mtr);
317 			mtr_start(&mtr);
318 			doublewrite = buf_dblwr_get(&mtr);
319 			fseg_header = doublewrite
320 				      + TRX_SYS_DOUBLEWRITE_FSEG;
321 		}
322 
323 		prev_page_no = page_no;
324 	}
325 
326 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC,
327 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
328 			 MLOG_4BYTES, &mtr);
329 	mlog_write_ulint(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC
330 			 + TRX_SYS_DOUBLEWRITE_REPEAT,
331 			 TRX_SYS_DOUBLEWRITE_MAGIC_N,
332 			 MLOG_4BYTES, &mtr);
333 
334 	mlog_write_ulint(doublewrite
335 			 + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED,
336 			 TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N,
337 			 MLOG_4BYTES, &mtr);
338 	mtr_commit(&mtr);
339 
340 	/* Flush the modified pages to disk and make a checkpoint */
341 	log_make_checkpoint();
342 	buf_dblwr_being_created = FALSE;
343 
344 	/* Remove doublewrite pages from LRU */
345 	buf_pool_invalidate();
346 
347 	ib::info() <<  "Doublewrite buffer created";
348 
349 	goto start_again;
350 }
351 
352 /**
353 At database startup initializes the doublewrite buffer memory structure if
354 we already have a doublewrite buffer created in the data files. If we are
355 upgrading to an InnoDB version which supports multiple tablespaces, then this
356 function performs the necessary update operations. If we are in a crash
357 recovery, this function loads the pages from double write buffer into memory.
358 @param[in]	file		File handle
359 @param[in]	path		Path name of file
360 @return DB_SUCCESS or error code */
361 dberr_t
buf_dblwr_init_or_load_pages(pfs_os_file_t file,const char * path)362 buf_dblwr_init_or_load_pages(
363 	pfs_os_file_t	file,
364 	const char*	path)
365 {
366 	byte*		buf;
367 	byte*		page;
368 	ulint		block1;
369 	ulint		block2;
370 	ulint		space_id;
371 	byte*		read_buf;
372 	byte*		doublewrite;
373 	byte*		unaligned_read_buf;
374 	ibool		reset_space_ids = FALSE;
375 	recv_dblwr_t&	recv_dblwr = recv_sys.dblwr;
376 
377 	/* We do the file i/o past the buffer pool */
378 
379 	unaligned_read_buf = static_cast<byte*>(
380 		ut_malloc_nokey(3U << srv_page_size_shift));
381 
382 	read_buf = static_cast<byte*>(
383 		ut_align(unaligned_read_buf, srv_page_size));
384 
385 	/* Read the trx sys header to check if we are using the doublewrite
386 	buffer */
387 	dberr_t		err;
388 
389 	IORequest	read_request(IORequest::READ);
390 
391 	err = os_file_read(
392 		read_request,
393 		file, read_buf, TRX_SYS_PAGE_NO << srv_page_size_shift,
394 		srv_page_size);
395 
396 	if (err != DB_SUCCESS) {
397 
398 		ib::error()
399 			<< "Failed to read the system tablespace header page";
400 
401 		ut_free(unaligned_read_buf);
402 
403 		return(err);
404 	}
405 
406 	doublewrite = read_buf + TRX_SYS_DOUBLEWRITE;
407 
408 	/* TRX_SYS_PAGE_NO is not encrypted see fil_crypt_rotate_page() */
409 
410 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_MAGIC)
411 	    == TRX_SYS_DOUBLEWRITE_MAGIC_N) {
412 		/* The doublewrite buffer has been created */
413 
414 		buf_dblwr_init(doublewrite);
415 
416 		block1 = buf_dblwr->block1;
417 		block2 = buf_dblwr->block2;
418 
419 		buf = buf_dblwr->write_buf;
420 	} else {
421 		ut_free(unaligned_read_buf);
422 		return(DB_SUCCESS);
423 	}
424 
425 	if (mach_read_from_4(doublewrite + TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED)
426 	    != TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N) {
427 
428 		/* We are upgrading from a version < 4.1.x to a version where
429 		multiple tablespaces are supported. We must reset the space id
430 		field in the pages in the doublewrite buffer because starting
431 		from this version the space id is stored to
432 		FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID. */
433 
434 		reset_space_ids = TRUE;
435 
436 		ib::info() << "Resetting space id's in the doublewrite buffer";
437 	}
438 
439 	/* Read the pages from the doublewrite buffer to memory */
440 	err = os_file_read(
441 		read_request,
442 		file, buf, block1 << srv_page_size_shift,
443 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
444 
445 	if (err != DB_SUCCESS) {
446 
447 		ib::error()
448 			<< "Failed to read the first double write buffer "
449 			"extent";
450 
451 		ut_free(unaligned_read_buf);
452 
453 		return(err);
454 	}
455 
456 	err = os_file_read(
457 		read_request,
458 		file,
459 		buf + (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift),
460 		block2 << srv_page_size_shift,
461 		TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
462 
463 	if (err != DB_SUCCESS) {
464 
465 		ib::error()
466 			<< "Failed to read the second double write buffer "
467 			"extent";
468 
469 		ut_free(unaligned_read_buf);
470 
471 		return(err);
472 	}
473 
474 	/* Check if any of these pages is half-written in data files, in the
475 	intended position */
476 
477 	page = buf;
478 
479 	for (ulint i = 0; i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE * 2; i++) {
480 
481 		if (reset_space_ids) {
482 			ulint source_page_no;
483 
484 			space_id = 0;
485 			mach_write_to_4(page + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID,
486 					space_id);
487 			/* We do not need to calculate new checksums for the
488 			pages because the field .._SPACE_ID does not affect
489 			them. Write the page back to where we read it from. */
490 
491 			if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
492 				source_page_no = block1 + i;
493 			} else {
494 				source_page_no = block2
495 					+ i - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
496 			}
497 
498 			IORequest	write_request(IORequest::WRITE);
499 
500 			err = os_file_write(
501 				write_request, path, file, page,
502 				source_page_no << srv_page_size_shift,
503 				srv_page_size);
504 			if (err != DB_SUCCESS) {
505 
506 				ib::error()
507 					<< "Failed to write to the double write"
508 					" buffer";
509 
510 				ut_free(unaligned_read_buf);
511 
512 				return(err);
513 			}
514 
515 		} else if (memcmp(field_ref_zero, page + FIL_PAGE_LSN, 8)) {
516 			/* Each valid page header must contain
517 			a nonzero FIL_PAGE_LSN field. */
518 			recv_dblwr.add(page);
519 		}
520 
521 		page += srv_page_size;
522 	}
523 
524 	if (reset_space_ids) {
525 		os_file_flush(file);
526 	}
527 
528 	ut_free(unaligned_read_buf);
529 
530 	return(DB_SUCCESS);
531 }
532 
533 /** Process and remove the double write buffer pages for all tablespaces. */
534 void
buf_dblwr_process()535 buf_dblwr_process()
536 {
537 	ut_ad(recv_sys.parse_start_lsn);
538 
539 	ulint		page_no_dblwr	= 0;
540 	byte*		read_buf;
541 	recv_dblwr_t&	recv_dblwr	= recv_sys.dblwr;
542 
543 	if (!buf_dblwr) {
544 		return;
545 	}
546 
547 	read_buf = static_cast<byte*>(
548 		aligned_malloc(3 * srv_page_size, srv_page_size));
549 	byte* const buf = read_buf + srv_page_size;
550 
551 	for (recv_dblwr_t::list::iterator i = recv_dblwr.pages.begin();
552 	     i != recv_dblwr.pages.end();
553 	     ++i, ++page_no_dblwr) {
554 		byte* page = *i;
555 		const ulint page_no = page_get_page_no(page);
556 
557 		if (!page_no) {
558 			/* page 0 should have been recovered
559 			already via Datafile::restore_from_doublewrite() */
560 			continue;
561 		}
562 
563 		const ulint space_id = page_get_space_id(page);
564 		const lsn_t lsn = mach_read_from_8(page + FIL_PAGE_LSN);
565 
566 		if (recv_sys.parse_start_lsn > lsn) {
567 			/* Pages written before the checkpoint are
568 			not useful for recovery. */
569 			continue;
570 		}
571 
572 		const page_id_t page_id(space_id, page_no);
573 
574 		if (recv_sys.scanned_lsn < lsn) {
575 			ib::warn() << "Ignoring a doublewrite copy of page "
576 				   << page_id
577 				   << " with future log sequence number "
578 				   << lsn;
579 			continue;
580 		}
581 
582 		fil_space_t* space = fil_space_acquire_for_io(space_id);
583 
584 		if (!space) {
585 			/* Maybe we have dropped the tablespace
586 			and this page once belonged to it: do nothing */
587 			continue;
588 		}
589 
590 		fil_space_open_if_needed(space);
591 
592 		if (UNIV_UNLIKELY(page_no >= space->size)) {
593 
594 			/* Do not report the warning for undo
595 			tablespaces, because they can be truncated in place. */
596 			if (!srv_is_undo_tablespace(space_id)) {
597 				ib::warn() << "A copy of page " << page_no
598 					<< " in the doublewrite buffer slot "
599 					<< page_no_dblwr
600 					<< " is beyond the end of tablespace "
601 					<< space->name
602 					<< " (" << space->size << " pages)";
603 			}
604 next_page:
605 			space->release_for_io();
606 			continue;
607 		}
608 
609 		const ulint physical_size = space->physical_size();
610 		const ulint zip_size = space->zip_size();
611 		ut_ad(!buf_is_zeroes(span<const byte>(page, physical_size)));
612 
613 		/* We want to ensure that for partial reads the
614 		unread portion of the page is NUL. */
615 		memset(read_buf, 0x0, physical_size);
616 
617 		IORequest	request;
618 
619 		request.dblwr_recover();
620 
621 		/* Read in the actual page from the file */
622 		dberr_t	err = fil_io(
623 			request, true,
624 			page_id, zip_size,
625 			0, physical_size, read_buf, NULL);
626 
627 		if (UNIV_UNLIKELY(err != DB_SUCCESS)) {
628 			ib::warn()
629 				<< "Double write buffer recovery: "
630 				<< page_id << " read failed with "
631 				<< "error: " << err;
632 		}
633 
634 		if (buf_is_zeroes(span<const byte>(read_buf, physical_size))) {
635 			/* We will check if the copy in the
636 			doublewrite buffer is valid. If not, we will
637 			ignore this page (there should be redo log
638 			records to initialize it). */
639 		} else if (recv_dblwr.validate_page(
640 				page_id, read_buf, space, buf)) {
641 			goto next_page;
642 		} else {
643 			/* We intentionally skip this message for
644 			all-zero pages. */
645 			ib::info()
646 				<< "Trying to recover page " << page_id
647 				<< " from the doublewrite buffer.";
648 		}
649 
650 		page = recv_dblwr.find_page(page_id, space, buf);
651 
652 		if (!page) {
653 			goto next_page;
654 		}
655 
656 		/* Write the good page from the doublewrite buffer to
657 		the intended position. */
658 
659 		IORequest	write_request(IORequest::WRITE);
660 
661 		fil_io(write_request, true, page_id, zip_size,
662 		       0, physical_size, page, nullptr);
663 
664 		ib::info() << "Recovered page " << page_id
665 			<< " from the doublewrite buffer.";
666 
667 		goto next_page;
668 	}
669 
670 	recv_dblwr.pages.clear();
671 
672 	fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
673 	aligned_free(read_buf);
674 }
675 
676 /****************************************************************//**
677 Frees doublewrite buffer. */
678 void
buf_dblwr_free()679 buf_dblwr_free()
680 {
681 	/* Free the double write data structures. */
682 	ut_a(buf_dblwr != NULL);
683 	ut_ad(buf_dblwr->s_reserved == 0);
684 	ut_ad(buf_dblwr->b_reserved == 0);
685 
686 	os_event_destroy(buf_dblwr->b_event);
687 	os_event_destroy(buf_dblwr->s_event);
688 	ut_free(buf_dblwr->write_buf_unaligned);
689 	buf_dblwr->write_buf_unaligned = NULL;
690 
691 	ut_free(buf_dblwr->buf_block_arr);
692 	buf_dblwr->buf_block_arr = NULL;
693 
694 	ut_free(buf_dblwr->in_use);
695 	buf_dblwr->in_use = NULL;
696 
697 	mutex_free(&buf_dblwr->mutex);
698 	ut_free(buf_dblwr);
699 	buf_dblwr = NULL;
700 }
701 
702 /********************************************************************//**
703 Updates the doublewrite buffer when an IO request is completed. */
704 void
buf_dblwr_update(const buf_page_t * bpage,buf_flush_t flush_type)705 buf_dblwr_update(
706 /*=============*/
707 	const buf_page_t*	bpage,	/*!< in: buffer block descriptor */
708 	buf_flush_t		flush_type)/*!< in: flush type */
709 {
710 	ut_ad(srv_use_doublewrite_buf);
711 	ut_ad(buf_dblwr);
712 	ut_ad(!fsp_is_system_temporary(bpage->id.space()));
713 	ut_ad(!srv_read_only_mode);
714 
715 	switch (flush_type) {
716 	case BUF_FLUSH_LIST:
717 	case BUF_FLUSH_LRU:
718 		mutex_enter(&buf_dblwr->mutex);
719 
720 		ut_ad(buf_dblwr->batch_running);
721 		ut_ad(buf_dblwr->b_reserved > 0);
722 		ut_ad(buf_dblwr->b_reserved <= buf_dblwr->first_free);
723 
724 		buf_dblwr->b_reserved--;
725 
726 		if (buf_dblwr->b_reserved == 0) {
727 			mutex_exit(&buf_dblwr->mutex);
728 			/* This will finish the batch. Sync data files
729 			to the disk. */
730 			fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
731 			mutex_enter(&buf_dblwr->mutex);
732 
733 			/* We can now reuse the doublewrite memory buffer: */
734 			buf_dblwr->first_free = 0;
735 			buf_dblwr->batch_running = false;
736 			os_event_set(buf_dblwr->b_event);
737 		}
738 
739 		mutex_exit(&buf_dblwr->mutex);
740 		break;
741 	case BUF_FLUSH_SINGLE_PAGE:
742 		{
743 			const ulint size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
744 			ulint i;
745 			mutex_enter(&buf_dblwr->mutex);
746 			for (i = srv_doublewrite_batch_size; i < size; ++i) {
747 				if (buf_dblwr->buf_block_arr[i] == bpage) {
748 					buf_dblwr->s_reserved--;
749 					buf_dblwr->buf_block_arr[i] = NULL;
750 					buf_dblwr->in_use[i] = false;
751 					break;
752 				}
753 			}
754 
755 			/* The block we are looking for must exist as a
756 			reserved block. */
757 			ut_a(i < size);
758 		}
759 		os_event_set(buf_dblwr->s_event);
760 		mutex_exit(&buf_dblwr->mutex);
761 		break;
762 	case BUF_FLUSH_N_TYPES:
763 		ut_error;
764 	}
765 }
766 
767 #ifdef UNIV_DEBUG
768 /** Check the LSN values on the page.
769 @param[in]	page	page to check
770 @param[in]	s	tablespace */
buf_dblwr_check_page_lsn(const page_t * page,const fil_space_t & s)771 static void buf_dblwr_check_page_lsn(const page_t* page, const fil_space_t& s)
772 {
773 	/* Ignore page compressed or encrypted pages */
774 	if (s.is_compressed()
775 	    || buf_page_get_key_version(page, s.flags)) {
776 		return;
777 	}
778 
779 	const unsigned lsn1 = mach_read_from_4(page + FIL_PAGE_LSN + 4),
780 		lsn2 = mach_read_from_4(page + srv_page_size
781 					- (s.full_crc32()
782 					   ? FIL_PAGE_FCRC32_END_LSN
783 					   : FIL_PAGE_END_LSN_OLD_CHKSUM - 4));
784 	if (UNIV_UNLIKELY(lsn1 != lsn2)) {
785 		ib::error() << "The page to be written to "
786 			    << s.chain.start->name <<
787 			" seems corrupt!"
788 			" The low 4 bytes of LSN fields do not match"
789 			" (" << lsn1 << " != " << lsn2 << ")!"
790 			" Noticed in the buffer pool.";
791 	}
792 }
793 
buf_dblwr_check_page_lsn(const buf_page_t & b,const byte * page)794 static void buf_dblwr_check_page_lsn(const buf_page_t& b, const byte* page)
795 {
796 	if (fil_space_t* space = fil_space_acquire_for_io(b.id.space())) {
797 		buf_dblwr_check_page_lsn(page, *space);
798 		space->release_for_io();
799 	}
800 }
801 #endif /* UNIV_DEBUG */
802 
803 /********************************************************************//**
804 Asserts when a corrupt block is find during writing out data to the
805 disk. */
806 static
807 void
buf_dblwr_assert_on_corrupt_block(const buf_block_t * block)808 buf_dblwr_assert_on_corrupt_block(
809 /*==============================*/
810 	const buf_block_t*	block)	/*!< in: block to check */
811 {
812 	buf_page_print(block->frame);
813 
814 	ib::fatal() << "Apparent corruption of an index page "
815 		<< block->page.id
816 		<< " to be written to data file. We intentionally crash"
817 		" the server to prevent corrupt data from ending up in"
818 		" data files.";
819 }
820 
821 /********************************************************************//**
822 Check the LSN values on the page with which this block is associated.
823 Also validate the page if the option is set. */
824 static
825 void
buf_dblwr_check_block(const buf_block_t * block)826 buf_dblwr_check_block(
827 /*==================*/
828 	const buf_block_t*	block)	/*!< in: block to check */
829 {
830 	ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
831 
832 	switch (fil_page_get_type(block->frame)) {
833 	case FIL_PAGE_INDEX:
834 	case FIL_PAGE_TYPE_INSTANT:
835 	case FIL_PAGE_RTREE:
836 		if (page_is_comp(block->frame)) {
837 			if (page_simple_validate_new(block->frame)) {
838 				return;
839 			}
840 		} else if (page_simple_validate_old(block->frame)) {
841 			return;
842 		}
843 		/* While it is possible that this is not an index page
844 		but just happens to have wrongly set FIL_PAGE_TYPE,
845 		such pages should never be modified to without also
846 		adjusting the page type during page allocation or
847 		buf_flush_init_for_writing() or fil_block_reset_type(). */
848 		break;
849 	case FIL_PAGE_TYPE_FSP_HDR:
850 	case FIL_PAGE_IBUF_BITMAP:
851 	case FIL_PAGE_TYPE_UNKNOWN:
852 		/* Do not complain again, we already reset this field. */
853 	case FIL_PAGE_UNDO_LOG:
854 	case FIL_PAGE_INODE:
855 	case FIL_PAGE_IBUF_FREE_LIST:
856 	case FIL_PAGE_TYPE_SYS:
857 	case FIL_PAGE_TYPE_TRX_SYS:
858 	case FIL_PAGE_TYPE_XDES:
859 	case FIL_PAGE_TYPE_BLOB:
860 	case FIL_PAGE_TYPE_ZBLOB:
861 	case FIL_PAGE_TYPE_ZBLOB2:
862 		/* TODO: validate also non-index pages */
863 		return;
864 	case FIL_PAGE_TYPE_ALLOCATED:
865 		/* empty pages should never be flushed */
866 		return;
867 	}
868 
869 	buf_dblwr_assert_on_corrupt_block(block);
870 }
871 
872 /********************************************************************//**
873 Writes a page that has already been written to the doublewrite buffer
874 to the datafile. It is the job of the caller to sync the datafile. */
875 static
876 void
buf_dblwr_write_block_to_datafile(const buf_page_t * bpage,bool sync)877 buf_dblwr_write_block_to_datafile(
878 /*==============================*/
879 	const buf_page_t*	bpage,	/*!< in: page to write */
880 	bool			sync)	/*!< in: true if sync IO
881 					is requested */
882 {
883 	ut_a(buf_page_in_file(bpage));
884 
885 	ulint	type = IORequest::WRITE;
886 
887 	if (sync) {
888 		type |= IORequest::DO_NOT_WAKE;
889 	}
890 
891 	IORequest	request(type, const_cast<buf_page_t*>(bpage));
892 
893 	/* We request frame here to get correct buffer in case of
894 	encryption and/or page compression */
895 	void * frame = buf_page_get_frame(bpage);
896 
897 	if (bpage->zip.data != NULL) {
898 		ut_ad(bpage->zip_size());
899 
900 		fil_io(request, sync, bpage->id, bpage->zip_size(), 0,
901 		       bpage->zip_size(),
902 		       (void*) frame,
903 		       (void*) bpage);
904 	} else {
905 		ut_ad(!bpage->zip_size());
906 
907 		/* Our IO API is common for both reads and writes and is
908 		therefore geared towards a non-const parameter. */
909 
910 		buf_block_t*	block = reinterpret_cast<buf_block_t*>(
911 			const_cast<buf_page_t*>(bpage));
912 
913 		ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
914 		ut_d(buf_dblwr_check_page_lsn(block->page, block->frame));
915 		fil_io(request,
916 		       sync, bpage->id, bpage->zip_size(), 0, bpage->real_size,
917 		       frame, block);
918 	}
919 }
920 
921 /********************************************************************//**
922 Flushes possible buffered writes from the doublewrite memory buffer to disk,
923 and also wakes up the aio thread if simulated aio is used. It is very
924 important to call this function after a batch of writes has been posted,
925 and also when we may have to wait for a page latch! Otherwise a deadlock
926 of threads can occur. */
927 void
buf_dblwr_flush_buffered_writes()928 buf_dblwr_flush_buffered_writes()
929 {
930 	byte*		write_buf;
931 	ulint		first_free;
932 	ulint		len;
933 
934 	if (!srv_use_doublewrite_buf || buf_dblwr == NULL) {
935 		/* Sync the writes to the disk. */
936 		buf_dblwr_sync_datafiles();
937 		/* Now we flush the data to disk (for example, with fsync) */
938 		fil_flush_file_spaces(FIL_TYPE_TABLESPACE);
939 		return;
940 	}
941 
942 	ut_ad(!srv_read_only_mode);
943 
944 try_again:
945 	mutex_enter(&buf_dblwr->mutex);
946 
947 	/* Write first to doublewrite buffer blocks. We use synchronous
948 	aio and thus know that file write has been completed when the
949 	control returns. */
950 
951 	if (buf_dblwr->first_free == 0) {
952 
953 		mutex_exit(&buf_dblwr->mutex);
954 
955 		/* Wake possible simulated aio thread as there could be
956 		system temporary tablespace pages active for flushing.
957 		Note: system temporary tablespace pages are not scheduled
958 		for doublewrite. */
959 		os_aio_simulated_wake_handler_threads();
960 
961 		return;
962 	}
963 
964 	if (buf_dblwr->batch_running) {
965 		/* Another thread is running the batch right now. Wait
966 		for it to finish. */
967 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
968 		mutex_exit(&buf_dblwr->mutex);
969 
970 		os_aio_simulated_wake_handler_threads();
971 		os_event_wait_low(buf_dblwr->b_event, sig_count);
972 		goto try_again;
973 	}
974 
975 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
976 
977 	/* Disallow anyone else to post to doublewrite buffer or to
978 	start another batch of flushing. */
979 	buf_dblwr->batch_running = true;
980 	first_free = buf_dblwr->first_free;
981 
982 	/* Now safe to release the mutex. Note that though no other
983 	thread is allowed to post to the doublewrite batch flushing
984 	but any threads working on single page flushes are allowed
985 	to proceed. */
986 	mutex_exit(&buf_dblwr->mutex);
987 
988 	write_buf = buf_dblwr->write_buf;
989 
990 	for (ulint len2 = 0, i = 0;
991 	     i < buf_dblwr->first_free;
992 	     len2 += srv_page_size, i++) {
993 
994 		const buf_block_t*	block;
995 
996 		block = (buf_block_t*) buf_dblwr->buf_block_arr[i];
997 
998 		if (buf_block_get_state(block) != BUF_BLOCK_FILE_PAGE
999 		    || block->page.zip.data) {
1000 			/* No simple validate for compressed
1001 			pages exists. */
1002 			continue;
1003 		}
1004 
1005 		/* Check that the actual page in the buffer pool is
1006 		not corrupt and the LSN values are sane. */
1007 		buf_dblwr_check_block(block);
1008 		ut_d(buf_dblwr_check_page_lsn(block->page, write_buf + len2));
1009 	}
1010 
1011 	/* Write out the first block of the doublewrite buffer */
1012 	len = std::min<ulint>(TRX_SYS_DOUBLEWRITE_BLOCK_SIZE,
1013 			      buf_dblwr->first_free) << srv_page_size_shift;
1014 
1015 	fil_io(IORequestWrite, true,
1016 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block1), 0,
1017 	       0, len, (void*) write_buf, NULL);
1018 
1019 	if (buf_dblwr->first_free <= TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1020 		/* No unwritten pages in the second block. */
1021 		goto flush;
1022 	}
1023 
1024 	/* Write out the second block of the doublewrite buffer. */
1025 	len = (buf_dblwr->first_free - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE)
1026 	       << srv_page_size_shift;
1027 
1028 	write_buf = buf_dblwr->write_buf
1029 		+ (TRX_SYS_DOUBLEWRITE_BLOCK_SIZE << srv_page_size_shift);
1030 
1031 	fil_io(IORequestWrite, true,
1032 	       page_id_t(TRX_SYS_SPACE, buf_dblwr->block2), 0,
1033 	       0, len, (void*) write_buf, NULL);
1034 
1035 flush:
1036 	/* increment the doublewrite flushed pages counter */
1037 	srv_stats.dblwr_pages_written.add(buf_dblwr->first_free);
1038 	srv_stats.dblwr_writes.inc();
1039 
1040 	/* Now flush the doublewrite buffer data to disk */
1041 	fil_flush(TRX_SYS_SPACE);
1042 
1043 	/* We know that the writes have been flushed to disk now
1044 	and in recovery we will find them in the doublewrite buffer
1045 	blocks. Next do the writes to the intended positions. */
1046 
1047 	/* Up to this point first_free and buf_dblwr->first_free are
1048 	same because we have set the buf_dblwr->batch_running flag
1049 	disallowing any other thread to post any request but we
1050 	can't safely access buf_dblwr->first_free in the loop below.
1051 	This is so because it is possible that after we are done with
1052 	the last iteration and before we terminate the loop, the batch
1053 	gets finished in the IO helper thread and another thread posts
1054 	a new batch setting buf_dblwr->first_free to a higher value.
1055 	If this happens and we are using buf_dblwr->first_free in the
1056 	loop termination condition then we'll end up dispatching
1057 	the same block twice from two different threads. */
1058 	ut_ad(first_free == buf_dblwr->first_free);
1059 	for (ulint i = 0; i < first_free; i++) {
1060 		buf_dblwr_write_block_to_datafile(
1061 			buf_dblwr->buf_block_arr[i], false);
1062 	}
1063 
1064 	/* Wake possible simulated aio thread to actually post the
1065 	writes to the operating system. We don't flush the files
1066 	at this point. We leave it to the IO helper thread to flush
1067 	datafiles when the whole batch has been processed. */
1068 	os_aio_simulated_wake_handler_threads();
1069 }
1070 
1071 /********************************************************************//**
1072 Posts a buffer page for writing. If the doublewrite memory buffer is
1073 full, calls buf_dblwr_flush_buffered_writes and waits for for free
1074 space to appear. */
1075 void
buf_dblwr_add_to_batch(buf_page_t * bpage)1076 buf_dblwr_add_to_batch(
1077 /*====================*/
1078 	buf_page_t*	bpage)	/*!< in: buffer block to write */
1079 {
1080 	ut_a(buf_page_in_file(bpage));
1081 
1082 try_again:
1083 	mutex_enter(&buf_dblwr->mutex);
1084 
1085 	ut_a(buf_dblwr->first_free <= srv_doublewrite_batch_size);
1086 
1087 	if (buf_dblwr->batch_running) {
1088 
1089 		/* This not nearly as bad as it looks. There is only
1090 		page_cleaner thread which does background flushing
1091 		in batches therefore it is unlikely to be a contention
1092 		point. The only exception is when a user thread is
1093 		forced to do a flush batch because of a sync
1094 		checkpoint. */
1095 		int64_t	sig_count = os_event_reset(buf_dblwr->b_event);
1096 		mutex_exit(&buf_dblwr->mutex);
1097 		os_aio_simulated_wake_handler_threads();
1098 
1099 		os_event_wait_low(buf_dblwr->b_event, sig_count);
1100 		goto try_again;
1101 	}
1102 
1103 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1104 		mutex_exit(&(buf_dblwr->mutex));
1105 
1106 		buf_dblwr_flush_buffered_writes();
1107 
1108 		goto try_again;
1109 	}
1110 
1111 	byte*	p = buf_dblwr->write_buf
1112 		+ srv_page_size * buf_dblwr->first_free;
1113 
1114 	/* We request frame here to get correct buffer in case of
1115 	encryption and/or page compression */
1116 	void * frame = buf_page_get_frame(bpage);
1117 
1118 	if (auto zip_size = bpage->zip_size()) {
1119 		MEM_CHECK_DEFINED(bpage->zip.data, zip_size);
1120 		/* Copy the compressed page and clear the rest. */
1121 		memcpy(p, frame, zip_size);
1122 		memset(p + zip_size, 0x0, srv_page_size - zip_size);
1123 	} else {
1124 		ut_a(buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE);
1125 		MEM_CHECK_DEFINED(frame, srv_page_size);
1126 		memcpy(p, frame, srv_page_size);
1127 	}
1128 
1129 	buf_dblwr->buf_block_arr[buf_dblwr->first_free] = bpage;
1130 
1131 	buf_dblwr->first_free++;
1132 	buf_dblwr->b_reserved++;
1133 
1134 	ut_ad(!buf_dblwr->batch_running);
1135 	ut_ad(buf_dblwr->first_free == buf_dblwr->b_reserved);
1136 	ut_ad(buf_dblwr->b_reserved <= srv_doublewrite_batch_size);
1137 
1138 	if (buf_dblwr->first_free == srv_doublewrite_batch_size) {
1139 		mutex_exit(&(buf_dblwr->mutex));
1140 
1141 		buf_dblwr_flush_buffered_writes();
1142 
1143 		return;
1144 	}
1145 
1146 	mutex_exit(&(buf_dblwr->mutex));
1147 }
1148 
1149 /********************************************************************//**
1150 Writes a page to the doublewrite buffer on disk, sync it, then write
1151 the page to the datafile and sync the datafile. This function is used
1152 for single page flushes. If all the buffers allocated for single page
1153 flushes in the doublewrite buffer are in use we wait here for one to
1154 become free. We are guaranteed that a slot will become free because any
1155 thread that is using a slot must also release the slot before leaving
1156 this function. */
1157 void
buf_dblwr_write_single_page(buf_page_t * bpage,bool sync)1158 buf_dblwr_write_single_page(
1159 /*========================*/
1160 	buf_page_t*	bpage,	/*!< in: buffer block to write */
1161 	bool		sync)	/*!< in: true if sync IO requested */
1162 {
1163 	ulint		n_slots;
1164 	ulint		size;
1165 	ulint		offset;
1166 	ulint		i;
1167 
1168 	ut_a(buf_page_in_file(bpage));
1169 	ut_a(srv_use_doublewrite_buf);
1170 	ut_a(buf_dblwr != NULL);
1171 
1172 	/* total number of slots available for single page flushes
1173 	starts from srv_doublewrite_batch_size to the end of the
1174 	buffer. */
1175 	size = TRX_SYS_DOUBLEWRITE_BLOCKS * TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1176 	ut_a(size > srv_doublewrite_batch_size);
1177 	n_slots = size - srv_doublewrite_batch_size;
1178 
1179 	if (buf_page_get_state(bpage) == BUF_BLOCK_FILE_PAGE) {
1180 
1181 		/* Check that the actual page in the buffer pool is
1182 		not corrupt and the LSN values are sane. */
1183 		buf_dblwr_check_block((buf_block_t*) bpage);
1184 
1185 		/* Check that the page as written to the doublewrite
1186 		buffer has sane LSN values. */
1187 		if (!bpage->zip.data) {
1188 			ut_d(buf_dblwr_check_page_lsn(
1189 				     *bpage, ((buf_block_t*) bpage)->frame));
1190 		}
1191 	}
1192 
1193 retry:
1194 	mutex_enter(&buf_dblwr->mutex);
1195 	if (buf_dblwr->s_reserved == n_slots) {
1196 
1197 		/* All slots are reserved. */
1198 		int64_t	sig_count = os_event_reset(buf_dblwr->s_event);
1199 		mutex_exit(&buf_dblwr->mutex);
1200 		os_event_wait_low(buf_dblwr->s_event, sig_count);
1201 
1202 		goto retry;
1203 	}
1204 
1205 	for (i = srv_doublewrite_batch_size; i < size; ++i) {
1206 
1207 		if (!buf_dblwr->in_use[i]) {
1208 			break;
1209 		}
1210 	}
1211 
1212 	/* We are guaranteed to find a slot. */
1213 	ut_a(i < size);
1214 	buf_dblwr->in_use[i] = true;
1215 	buf_dblwr->s_reserved++;
1216 	buf_dblwr->buf_block_arr[i] = bpage;
1217 
1218 	/* increment the doublewrite flushed pages counter */
1219 	srv_stats.dblwr_pages_written.inc();
1220 	srv_stats.dblwr_writes.inc();
1221 
1222 	mutex_exit(&buf_dblwr->mutex);
1223 
1224 	/* Lets see if we are going to write in the first or second
1225 	block of the doublewrite buffer. */
1226 	if (i < TRX_SYS_DOUBLEWRITE_BLOCK_SIZE) {
1227 		offset = buf_dblwr->block1 + i;
1228 	} else {
1229 		offset = buf_dblwr->block2 + i
1230 			 - TRX_SYS_DOUBLEWRITE_BLOCK_SIZE;
1231 	}
1232 
1233 	/* We deal with compressed and uncompressed pages a little
1234 	differently here. In case of uncompressed pages we can
1235 	directly write the block to the allocated slot in the
1236 	doublewrite buffer in the system tablespace and then after
1237 	syncing the system table space we can proceed to write the page
1238 	in the datafile.
1239 	In case of compressed page we first do a memcpy of the block
1240 	to the in-memory buffer of doublewrite before proceeding to
1241 	write it. This is so because we want to pad the remaining
1242 	bytes in the doublewrite page with zeros. */
1243 
1244 	/* We request frame here to get correct buffer in case of
1245 	encryption and/or page compression */
1246 	void * frame = buf_page_get_frame(bpage);
1247 
1248 	if (auto zip_size = bpage->zip_size()) {
1249 		memcpy(buf_dblwr->write_buf + srv_page_size * i,
1250 		       frame, zip_size);
1251 
1252 		memset(buf_dblwr->write_buf + srv_page_size * i
1253 		       + zip_size, 0x0,
1254 		       srv_page_size - zip_size);
1255 
1256 		fil_io(IORequestWrite,
1257 		       true,
1258 		       page_id_t(TRX_SYS_SPACE, offset),
1259 		       0,
1260 		       0,
1261 		       srv_page_size,
1262 		       (void *)(buf_dblwr->write_buf + srv_page_size * i),
1263 		       NULL);
1264 	} else {
1265 		/* It is a regular page. Write it directly to the
1266 		doublewrite buffer */
1267 		fil_io(IORequestWrite,
1268 		       true,
1269 		       page_id_t(TRX_SYS_SPACE, offset),
1270 		       0,
1271 		       0,
1272 		       srv_page_size,
1273 		       (void*) frame,
1274 		       NULL);
1275 	}
1276 
1277 	/* Now flush the doublewrite buffer data to disk */
1278 	fil_flush(TRX_SYS_SPACE);
1279 
1280 	/* We know that the write has been flushed to disk now
1281 	and during recovery we will find it in the doublewrite buffer
1282 	blocks. Next do the write to the intended position. */
1283 	buf_dblwr_write_block_to_datafile(bpage, sync);
1284 }
1285