1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21 
22 Created            Jonas Oreland Google
23 Modified           Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25 
26 #include "fil0crypt.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0zip.h"
30 #include "buf0checksum.h"
31 #ifdef UNIV_INNOCHECKSUM
32 # include "buf0buf.h"
33 #else
34 #include "srv0srv.h"
35 #include "srv0start.h"
36 #include "mtr0mtr.h"
37 #include "mtr0log.h"
38 #include "ut0ut.h"
39 #include "btr0scrub.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43 
44 static bool fil_crypt_threads_inited = false;
45 
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48 
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51 
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54 
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57 
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60 
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63 
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66 
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69 
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72 
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75 
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100;	 // 10ms per iop
78 static uint srv_alloc_time = 3;		    // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80 
81 /** Variables for scrubbing */
82 extern uint srv_background_scrub_data_interval;
83 extern uint srv_background_scrub_data_check_interval;
84 
85 #define DEBUG_KEYROTATION_THROTTLING 0
86 
87 /** Statistics variables */
88 static fil_crypt_stat_t crypt_stat;
89 static ib_mutex_t crypt_stat_mutex;
90 
91 /** Is background scrubbing enabled, defined on btr0scrub.cc */
92 extern my_bool srv_background_scrub_data_uncompressed;
93 extern my_bool srv_background_scrub_data_compressed;
94 
95 /***********************************************************************
96 Check if a key needs rotation given a key_state
97 @param[in]	crypt_data		Encryption information
98 @param[in]	key_version		Current key version
99 @param[in]	latest_key_version	Latest key version
100 @param[in]	rotate_key_age		when to rotate
101 @return true if key needs rotation, false if not */
102 static bool
103 fil_crypt_needs_rotation(
104 	const fil_space_crypt_t*	crypt_data,
button_2_on()105 	uint				key_version,
106 	uint				latest_key_version,
107 	uint				rotate_key_age)
108 	MY_ATTRIBUTE((warn_unused_result));
109 
button_2_off()110 /*********************************************************************
111 Init space crypt */
112 UNIV_INTERN
113 void
114 fil_space_crypt_init()
115 {
116 	fil_crypt_throttle_sleep_event = os_event_create(0);
117 
118 	mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
119 	memset(&crypt_stat, 0, sizeof(crypt_stat));
120 }
121 
122 /*********************************************************************
123 Cleanup space crypt */
124 UNIV_INTERN
125 void
126 fil_space_crypt_cleanup()
127 {
128 	os_event_destroy(fil_crypt_throttle_sleep_event);
129 	mutex_free(&crypt_stat_mutex);
130 }
131 
132 /**
133 Get latest key version from encryption plugin.
134 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
135 uint
136 fil_space_crypt_t::key_get_latest_version(void)
137 {
138 	uint key_version = key_found;
139 
140 	if (is_key_found()) {
141 		key_version = encryption_key_get_latest_version(key_id);
142 		/* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
143 		It doesn't matter because srv_encrypt_rotate
144 		can be set to true only once */
145 		if (!srv_encrypt_rotate
146 		    && key_version > srv_fil_crypt_rotate_key_age) {
147 			srv_encrypt_rotate = true;
148 		}
149 
150 		srv_stats.n_key_requests.inc();
151 		key_found = key_version;
152 	}
153 
154 	return key_version;
155 }
156 
157 /******************************************************************
158 Get the latest(key-version), waking the encrypt thread, if needed
159 @param[in,out]	crypt_data	Crypt data */
160 static inline
161 uint
162 fil_crypt_get_latest_key_version(
163 	fil_space_crypt_t* crypt_data)
164 {
165 	ut_ad(crypt_data != NULL);
166 
167 	uint key_version = crypt_data->key_get_latest_version();
168 
169 	if (crypt_data->is_key_found()) {
170 
171 		if (fil_crypt_needs_rotation(
172 				crypt_data,
173 				crypt_data->min_key_version,
174 				key_version,
175 				srv_fil_crypt_rotate_key_age)) {
176 			/* Below event seen as NULL-pointer at startup
177 			when new database was created and we create a
178 			checkpoint. Only seen when debugging. */
179 			if (fil_crypt_threads_inited) {
180 				os_event_set(fil_crypt_threads_event);
181 			}
182 		}
183 	}
184 
185 	return key_version;
186 }
187 
188 /******************************************************************
189 Mutex helper for crypt_data->scheme */
190 void
191 crypt_data_scheme_locker(
192 /*=====================*/
193 	st_encryption_scheme*	scheme,
194 	int			exit)
195 {
196 	fil_space_crypt_t* crypt_data =
197 		static_cast<fil_space_crypt_t*>(scheme);
198 
199 	if (exit) {
200 		mutex_exit(&crypt_data->mutex);
201 	} else {
202 		mutex_enter(&crypt_data->mutex);
203 	}
204 }
205 
206 /******************************************************************
207 Create a fil_space_crypt_t object
208 @param[in]	type		CRYPT_SCHEME_UNENCRYPTE or
209 				CRYPT_SCHEME_1
210 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
211 				FIL_ENCRYPTION_ON or
212 				FIL_ENCRYPTION_OFF
213 @param[in]	min_key_version key_version or 0
214 @param[in]	key_id		Used key id
215 @return crypt object */
216 static
217 fil_space_crypt_t*
218 fil_space_create_crypt_data(
219 	uint			type,
220 	fil_encryption_t	encrypt_mode,
221 	uint			min_key_version,
222 	uint			key_id)
223 {
224 	fil_space_crypt_t* crypt_data = NULL;
225 	if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
226 		crypt_data = new(buf)
227 			fil_space_crypt_t(
228 				type,
229 				min_key_version,
230 				key_id,
231 				encrypt_mode);
232 	}
233 
234 	return crypt_data;
235 }
236 
237 /******************************************************************
238 Create a fil_space_crypt_t object
239 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
240 				FIL_ENCRYPTION_ON or
241 				FIL_ENCRYPTION_OFF
242 
243 @param[in]	key_id		Encryption key id
244 @return crypt object */
245 UNIV_INTERN
246 fil_space_crypt_t*
247 fil_space_create_crypt_data(
248 	fil_encryption_t	encrypt_mode,
249 	uint			key_id)
250 {
251 	return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
252 }
253 
254 /******************************************************************
255 Merge fil_space_crypt_t object
256 @param[in,out]	dst		Destination cryp data
257 @param[in]	src		Source crypt data */
258 UNIV_INTERN
259 void
260 fil_space_merge_crypt_data(
261 	fil_space_crypt_t* dst,
262 	const fil_space_crypt_t* src)
263 {
264 	mutex_enter(&dst->mutex);
265 
266 	/* validate that they are mergeable */
267 	ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
268 	     src->type == CRYPT_SCHEME_1);
269 
270 	ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
271 	     dst->type == CRYPT_SCHEME_1);
272 
273 	dst->encryption = src->encryption;
274 	dst->type = src->type;
275 	dst->min_key_version = src->min_key_version;
276 	dst->keyserver_requests += src->keyserver_requests;
277 
278 	mutex_exit(&dst->mutex);
279 }
280 
281 /** Initialize encryption parameters from a tablespace header page.
282 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
283 @param[in]	page		first page of the tablespace
284 @return crypt data from page 0
285 @retval	NULL	if not present or not valid */
286 fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page)
287 {
288 	const ulint offset = FSP_HEADER_OFFSET
289 		+ fsp_header_get_encryption_offset(zip_size);
290 
291 	if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
292 		/* Crypt data is not stored. */
293 		return NULL;
294 	}
295 
296 	uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
297 	uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
298 	fil_space_crypt_t* crypt_data;
299 
300 	if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
301 	      type == CRYPT_SCHEME_1)
302 	    || iv_length != sizeof crypt_data->iv) {
303 		ib::error() << "Found non sensible crypt scheme: "
304 			    << type << "," << iv_length
305 			    << " for space: "
306 			    << page_get_space_id(page);
307 		return NULL;
308 	}
309 
310 	uint min_key_version = mach_read_from_4
311 		(page + offset + MAGIC_SZ + 2 + iv_length);
312 
313 	uint key_id = mach_read_from_4
314 		(page + offset + MAGIC_SZ + 2 + iv_length + 4);
315 
316 	fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
317 		page + offset + MAGIC_SZ + 2 + iv_length + 8);
318 
319 	crypt_data = fil_space_create_crypt_data(encryption, key_id);
320 	/* We need to overwrite these as above function will initialize
321 	members */
322 	crypt_data->type = type;
323 	crypt_data->min_key_version = min_key_version;
324 	crypt_data->page0_offset = offset;
325 	memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
326 
327 	return crypt_data;
328 }
329 
330 /******************************************************************
331 Free a crypt data object
332 @param[in,out] crypt_data	crypt data to be freed */
333 UNIV_INTERN
334 void
335 fil_space_destroy_crypt_data(
336 	fil_space_crypt_t **crypt_data)
337 {
338 	if (crypt_data != NULL && (*crypt_data) != NULL) {
339 		fil_space_crypt_t* c;
340 		if (UNIV_LIKELY(fil_crypt_threads_inited)) {
341 			mutex_enter(&fil_crypt_threads_mutex);
342 			c = *crypt_data;
343 			*crypt_data = NULL;
344 			mutex_exit(&fil_crypt_threads_mutex);
345 		} else {
346 			ut_ad(srv_read_only_mode || !srv_was_started);
347 			c = *crypt_data;
348 			*crypt_data = NULL;
349 		}
350 		if (c) {
351 			c->~fil_space_crypt_t();
352 			ut_free(c);
353 		}
354 	}
355 }
356 
357 /** Fill crypt data information to the give page.
358 It should be called during ibd file creation.
359 @param[in]	flags	tablespace flags
360 @param[in,out]	page	first page of the tablespace */
361 void
362 fil_space_crypt_t::fill_page0(
363 	ulint	flags,
364 	byte*	page)
365 {
366 	const uint len = sizeof(iv);
367 	const ulint offset = FSP_HEADER_OFFSET
368 		+ fsp_header_get_encryption_offset(
369 			fil_space_t::zip_size(flags));
370 	page0_offset = offset;
371 
372 	memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
373 	mach_write_to_1(page + offset + MAGIC_SZ, type);
374 	mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
375 	memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
376 
377 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
378 			min_key_version);
379 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
380 			key_id);
381 	mach_write_to_1(page + offset + MAGIC_SZ + 2  + len + 8,
382 			encryption);
383 }
384 
385 /******************************************************************
386 Write crypt data to a page (0)
387 @param[in]	space	tablespace
388 @param[in,out]	page0	first page of the tablespace
389 @param[in,out]	mtr	mini-transaction */
390 UNIV_INTERN
391 void
392 fil_space_crypt_t::write_page0(
393 	const fil_space_t*	space,
394 	byte* 			page,
395 	mtr_t*			mtr)
396 {
397 	ut_ad(this == space->crypt_data);
398 	const uint len = sizeof(iv);
399 	const ulint offset = FSP_HEADER_OFFSET
400 		+ fsp_header_get_encryption_offset(space->zip_size());
401 	page0_offset = offset;
402 
403 	/*
404 	redo log this as bytewise updates to page 0
405 	followed by an MLOG_FILE_WRITE_CRYPT_DATA
406 	(that will during recovery update fil_space_t)
407 	*/
408 	mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
409 	mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
410 	mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
411 	mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
412 			  mtr);
413 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
414 			 MLOG_4BYTES, mtr);
415 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 4, key_id,
416 			 MLOG_4BYTES, mtr);
417 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 8, encryption,
418 		MLOG_1BYTE, mtr);
419 
420 	DBUG_EXECUTE_IF("ib_do_not_log_crypt_data", return;);
421 
422 	byte* log_ptr = mlog_open(mtr, 11 + 17 + len);
423 
424 	if (log_ptr != NULL) {
425 		log_ptr = mlog_write_initial_log_record_fast(
426 			page,
427 			MLOG_FILE_WRITE_CRYPT_DATA,
428 			log_ptr, mtr);
429 		mach_write_to_4(log_ptr, space->id);
430 		log_ptr += 4;
431 		mach_write_to_2(log_ptr, offset);
432 		log_ptr += 2;
set_beginmark_label(int value)433 		mach_write_to_1(log_ptr, type);
434 		log_ptr += 1;
435 		mach_write_to_1(log_ptr, len);
436 		log_ptr += 1;
437 		mach_write_to_4(log_ptr, min_key_version);
438 		log_ptr += 4;
set_lengthmark_label(int value,int freq)439 		mach_write_to_4(log_ptr, key_id);
440 		log_ptr += 4;
441 		mach_write_to_1(log_ptr, encryption);
442 		log_ptr += 1;
443 		mlog_close(mtr, log_ptr);
444 
445 		mlog_catenate_string(mtr, iv, len);
446 	}
447 }
448 
449 /******************************************************************
450 Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
set_zoom_label(int value)451 @param[in]	ptr		Log entry start
452 @param[in]	end_ptr		Log entry end
453 @param[in]	block		buffer block
454 @return position on log buffer */
455 UNIV_INTERN
456 byte*
457 fil_parse_write_crypt_data(
458 	byte*			ptr,
459 	const byte*		end_ptr,
460 	dberr_t*		err)
461 {
462 	/* check that redo log entry is complete */
463 	uint entry_size =
464 		4 + // size of space_id
465 		2 + // size of offset
466 		1 + // size of type
467 		1 + // size of iv-len
468 		4 +  // size of min_key_version
469 		4 +  // size of key_id
470 		1; // fil_encryption_t
471 
472 	*err = DB_SUCCESS;
473 
474 	if (ptr + entry_size > end_ptr) {
475 		return NULL;
476 	}
477 
478 	ulint space_id = mach_read_from_4(ptr);
479 	ptr += 4;
480 	uint offset = mach_read_from_2(ptr);
481 	ptr += 2;
482 	uint type = mach_read_from_1(ptr);
483 	ptr += 1;
484 	uint len = mach_read_from_1(ptr);
485 	ptr += 1;
486 
487 	if ((type != CRYPT_SCHEME_1 && type != CRYPT_SCHEME_UNENCRYPTED)
488 	    || len != CRYPT_SCHEME_1_IV_LEN) {
489 		*err = DB_CORRUPTION;
490 		return NULL;
491 	}
492 
493 	uint min_key_version = mach_read_from_4(ptr);
494 	ptr += 4;
495 
496 	uint key_id = mach_read_from_4(ptr);
497 	ptr += 4;
498 
499 	fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
500 	ptr +=1;
501 
502 	if (ptr + len > end_ptr) {
503 		return NULL;
504 	}
505 
506 	mutex_enter(&fil_system.mutex);
507 
508 	fil_space_t* space = fil_space_get_by_id(space_id);
509 
510 	if (!space) {
511 		mutex_exit(&fil_system.mutex);
512 		return ptr + len;
513 	}
514 
515 	fil_space_crypt_t* crypt_data = fil_space_create_crypt_data(
516 		encryption, key_id);
517 
518 	crypt_data->page0_offset = offset;
519 	crypt_data->min_key_version = min_key_version;
520 	crypt_data->type = type;
521 	memcpy(crypt_data->iv, ptr, len);
522 	ptr += len;
523 
524 	if (space->crypt_data) {
525 		fil_space_merge_crypt_data(space->crypt_data, crypt_data);
526 		fil_space_destroy_crypt_data(&crypt_data);
527 		crypt_data = space->crypt_data;
528 	} else {
529 		space->crypt_data = crypt_data;
530 	}
531 
532 	mutex_exit(&fil_system.mutex);
533 
534 	if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
535 		*err = DB_DECRYPTION_FAILED;
536 	}
537 
538 	return ptr;
539 }
540 
541 /** Encrypt a buffer for non full checksum.
542 @param[in,out]		crypt_data		Crypt data
543 @param[in]		space			space_id
544 @param[in]		offset			Page offset
545 @param[in]		lsn			Log sequence number
546 @param[in]		src_frame		Page to encrypt
547 @param[in]		zip_size		ROW_FORMAT=COMPRESSED
548 						page size, or 0
549 @param[in,out]		dst_frame		Output buffer
550 @return encrypted buffer or NULL */
551 static byte* fil_encrypt_buf_for_non_full_checksum(
552 	fil_space_crypt_t*	crypt_data,
553 	ulint			space,
554 	ulint			offset,
555 	lsn_t			lsn,
556 	const byte*		src_frame,
557 	ulint			zip_size,
558 	byte*			dst_frame)
play_it(Widget w,XtPointer client_data,XtPointer call_data)559 {
560 	uint size = uint(zip_size ? zip_size : srv_page_size);
561 	uint key_version = fil_crypt_get_latest_key_version(crypt_data);
562 	ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
563 
564 	ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
565 	ibool page_compressed = (orig_page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
566 	uint header_len = FIL_PAGE_DATA;
567 
568 	if (page_compressed) {
569 		header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
570 	}
571 
572 	/* FIL page header is not encrypted */
573 	memcpy(dst_frame, src_frame, header_len);
574 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
575 			key_version);
576 
577 	/* Calculate the start offset in a page */
578 	uint		unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
579 	uint		srclen = size - unencrypted_bytes;
580 	const byte*	src = src_frame + header_len;
581 	byte*		dst = dst_frame + header_len;
582 	uint32		dstlen = 0;
583 	ib_uint32_t	checksum = 0;
584 
585 	if (page_compressed) {
586 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
587 	}
588 
589 	int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
590 					   crypt_data, key_version,
591 					   (uint32)space, (uint32)offset, lsn);
592 	ut_a(rc == MY_AES_OK);
593 	ut_a(dstlen == srclen);
594 
595 	/* For compressed tables we do not store the FIL header because
596 	the whole page is not stored to the disk. In compressed tables only
597 	the FIL header + compressed (and now encrypted) payload alligned
598 	to sector boundary is written. */
599 	if (!page_compressed) {
600 		/* FIL page trailer is also not encrypted */
601 		memcpy(dst_frame + size - FIL_PAGE_DATA_END,
602 			src_frame + size - FIL_PAGE_DATA_END,
603 			FIL_PAGE_DATA_END);
604 	} else {
605 		/* Clean up rest of buffer */
606 		memset(dst_frame+header_len+srclen, 0,
607 		       size - (header_len + srclen));
608 	}
609 
610 	checksum = fil_crypt_calculate_checksum(zip_size, dst_frame);
611 
612 	/* store the post-encryption checksum after the key-version */
613 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4,
614 			checksum);
615 
616 	ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
617 
618 	srv_stats.pages_encrypted.inc();
619 
620 	return dst_frame;
621 }
622 
623 /** Encrypt a buffer for full checksum format.
624 @param[in,out]		crypt_data		Crypt data
625 @param[in]		space			space_id
626 @param[in]		offset			Page offset
627 @param[in]		lsn			Log sequence number
628 @param[in]		src_frame		Page to encrypt
629 @param[in,out]		dst_frame		Output buffer
630 @return encrypted buffer or NULL */
631 static byte* fil_encrypt_buf_for_full_crc32(
632 	fil_space_crypt_t*	crypt_data,
633 	ulint			space,
634 	ulint			offset,
635 	lsn_t			lsn,
636 	const byte*		src_frame,
637 	byte*			dst_frame)
638 {
639 	uint key_version = fil_crypt_get_latest_key_version(crypt_data);
640 	ut_d(bool corrupted = false);
641 	const uint size = buf_page_full_crc32_size(src_frame, NULL,
abort_playing()642 #ifdef UNIV_DEBUG
643 						   &corrupted
644 #else
645 						   NULL
646 #endif
647 						   );
648 	ut_ad(!corrupted);
649 	uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
650 			      + FIL_PAGE_FCRC32_CHECKSUM);
651 	const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
652 	byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
653 	uint dstlen = 0;
654 
655 	ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
656 
657 	/* Till FIL_PAGE_LSN, page is not encrypted */
658 	memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
659 
660 	/* Write key version to the page. */
661 	mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version);
662 
663 	int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
664 					   crypt_data, key_version,
665 					   uint(space), uint(offset), lsn);
666 	ut_a(rc == MY_AES_OK);
667 	ut_a(dstlen == srclen);
668 
669 	const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
670 	mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
set_playline(XtPointer client_data)671 	/* Clean the rest of the buffer. FIXME: Punch holes when writing! */
672 	memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
673 
674 	srv_stats.pages_encrypted.inc();
675 
676 	return dst_frame;
677 }
678 
679 /** Encrypt a buffer.
680 @param[in,out]		crypt_data		Crypt data
681 @param[in]		space			space_id
682 @param[in]		offset			Page offset
683 @param[in]		lsn			Log sequence number
684 @param[in]		src_frame		Page to encrypt
685 @param[in]		zip_size		ROW_FORMAT=COMPRESSED
686 						page size, or 0
687 @param[in,out]		dst_frame		Output buffer
688 @param[in]		use_full_checksum	full crc32 algo is used
689 @return encrypted buffer or NULL */
690 UNIV_INTERN
691 byte*
692 fil_encrypt_buf(
693 	fil_space_crypt_t*	crypt_data,
694 	ulint			space,
695 	ulint			offset,
696 	lsn_t			lsn,
697 	const byte*		src_frame,
698 	ulint			zip_size,
699 	byte*			dst_frame,
700 	bool			use_full_checksum)
701 {
702 	if (use_full_checksum) {
703 		return fil_encrypt_buf_for_full_crc32(
704 			crypt_data, space, offset,
705 			lsn, src_frame, dst_frame);
706 	}
707 
708 	return fil_encrypt_buf_for_non_full_checksum(
709 		crypt_data, space, offset, lsn,
710 		src_frame, zip_size, dst_frame);
711 }
712 
713 /** Check whether these page types are allowed to encrypt.
714 @param[in]	space		tablespace object
715 @param[in]	src_frame	source page
716 @return true if it is valid page type */
NEW_set_playline(XtPointer client_data)717 static bool fil_space_encrypt_valid_page_type(
718 	const fil_space_t*	space,
719 	byte*			src_frame)
720 {
721 	switch (mach_read_from_2(src_frame+FIL_PAGE_TYPE)) {
722 	case FIL_PAGE_RTREE:
723 		return space->full_crc32();
724 	case FIL_PAGE_TYPE_FSP_HDR:
725 	case FIL_PAGE_TYPE_XDES:
726 		return false;
727 	}
728 
729 	return true;
730 }
731 
732 /******************************************************************
733 Encrypt a page
734 
735 @param[in]		space		Tablespace
736 @param[in]		offset		Page offset
737 @param[in]		lsn		Log sequence number
738 @param[in]		src_frame	Page to encrypt
739 @param[in,out]		dst_frame	Output buffer
740 @return encrypted buffer or NULL */
741 UNIV_INTERN
742 byte*
743 fil_space_encrypt(
744 	const fil_space_t*	space,
745 	ulint			offset,
746 	lsn_t			lsn,
747 	byte*			src_frame,
748 	byte*			dst_frame)
749 {
750 	if (!fil_space_encrypt_valid_page_type(space, src_frame)) {
751 		return src_frame;
752 	}
753 
754 	if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
755 		return (src_frame);
756 	}
757 
758 	ut_ad(space->pending_io());
759 
760 	return fil_encrypt_buf(space->crypt_data, space->id, offset, lsn,
761 			       src_frame, space->zip_size(),
762 			       dst_frame, space->full_crc32());
763 }
764 
765 /** Decrypt a page for full checksum format.
766 @param[in]	space			space id
767 @param[in]	crypt_data		crypt_data
768 @param[in]	tmp_frame		Temporary buffer
769 @param[in,out]	src_frame		Page to decrypt
770 @return DB_SUCCESS or error */
771 static dberr_t fil_space_decrypt_full_crc32(
772 	ulint			space,
773 	fil_space_crypt_t*	crypt_data,
774 	byte*			tmp_frame,
775 	byte*			src_frame)
776 {
777 	uint key_version = mach_read_from_4(
778 		src_frame + FIL_PAGE_FCRC32_KEY_VERSION);
779 	lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
780 	uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
781 
782 	ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
783 
784 	ut_ad(crypt_data);
785 	ut_ad(crypt_data->is_encrypted());
786 
787 	memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
788 
789 	/* Calculate the offset where decryption starts */
790 	const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
791 	byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
792 	uint dstlen = 0;
793 	bool corrupted = false;
794 	uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
795 	if (UNIV_UNLIKELY(corrupted)) {
796 		return DB_DECRYPTION_FAILED;
797 	}
798 
799 	uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
800 			      + FIL_PAGE_FCRC32_CHECKSUM);
801 
802 	int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
803 					   crypt_data, key_version,
804 					   (uint) space, offset, lsn);
805 
806 	if (rc != MY_AES_OK || dstlen != srclen) {
807 		if (rc == -1) {
808 			return DB_DECRYPTION_FAILED;
809 		}
810 
811 		ib::fatal() << "Unable to decrypt data-block "
812 			    << " src: " << src << "srclen: "
813 			    << srclen << " buf: " << dst << "buflen: "
814 			    << dstlen << " return-code: " << rc
815 			    << " Can't continue!";
816 	}
817 
818 	/* Copy only checksum part in the trailer */
819 	memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
820 	       src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
821 	       FIL_PAGE_FCRC32_CHECKSUM);
822 
823 	srv_stats.pages_decrypted.inc();
824 
825 	return DB_SUCCESS; /* page was decrypted */
826 }
827 
828 /** Decrypt a page for non full checksum format.
829 @param[in]	crypt_data		crypt_data
830 @param[in]	tmp_frame		Temporary buffer
831 @param[in]	physical_size		page size
832 @param[in,out]	src_frame		Page to decrypt
833 @return DB_SUCCESS or error */
834 static dberr_t fil_space_decrypt_for_non_full_checksum(
835 	fil_space_crypt_t*	crypt_data,
836 	byte*			tmp_frame,
837 	ulint			physical_size,
838 	byte*			src_frame)
839 {
840 	ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
841 	uint key_version = mach_read_from_4(
842 			src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
843 	bool page_compressed = (page_type
844 				== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
845 	uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
846 	uint space = mach_read_from_4(
847 			src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
848 	ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
849 
850 	ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
851 	ut_a(crypt_data != NULL && crypt_data->is_encrypted());
852 
853 	/* read space & lsn */
854 	uint header_len = FIL_PAGE_DATA;
855 
856 	if (page_compressed) {
857 		header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
858 	}
859 
860 	/* Copy FIL page header, it is not encrypted */
861 	memcpy(tmp_frame, src_frame, header_len);
862 
863 	/* Calculate the offset where decryption starts */
864 	const byte* src = src_frame + header_len;
865 	byte* dst = tmp_frame + header_len;
866 	uint32 dstlen = 0;
867 	uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END;
868 
869 	if (page_compressed) {
870 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
871 	}
872 
873 	int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
874 					   crypt_data, key_version,
875 					   space, offset, lsn);
876 
877 	if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
878 
879 		if (rc == -1) {
880 			return DB_DECRYPTION_FAILED;
881 		}
882 
883 		ib::fatal() << "Unable to decrypt data-block "
884 			    << " src: " << static_cast<const void*>(src)
885 			    << "srclen: "
886 			    << srclen << " buf: "
887 			    << static_cast<const void*>(dst) << "buflen: "
888 			    << dstlen << " return-code: " << rc
889 			    << " Can't continue!";
890 	}
891 
892 	/* For compressed tables we do not store the FIL header because
893 	the whole page is not stored to the disk. In compressed tables only
894 	the FIL header + compressed (and now encrypted) payload alligned
895 	to sector boundary is written. */
896 	if (!page_compressed) {
897 		/* Copy FIL trailer */
898 		memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END,
899 		       src_frame + physical_size - FIL_PAGE_DATA_END,
900 		       FIL_PAGE_DATA_END);
901 	}
902 
903 	srv_stats.pages_decrypted.inc();
904 
905 	return DB_SUCCESS; /* page was decrypted */
906 }
907 
908 /** Decrypt a page.
909 @param[in]	space_id		tablespace id
910 @param[in]	crypt_data		crypt_data
911 @param[in]	tmp_frame		Temporary buffer
912 @param[in]	physical_size		page size
913 @param[in]	fsp_flags		Tablespace flags
914 @param[in,out]	src_frame		Page to decrypt
915 @param[out]	err			DB_SUCCESS or DB_DECRYPTION_FAILED
916 @return DB_SUCCESS or error */
917 UNIV_INTERN
918 dberr_t
919 fil_space_decrypt(
920 	ulint			space_id,
921 	fil_space_crypt_t*	crypt_data,
922 	byte*			tmp_frame,
923 	ulint			physical_size,
924 	ulint			fsp_flags,
925 	byte*			src_frame)
926 {
927 	if (fil_space_t::full_crc32(fsp_flags)) {
928 		return fil_space_decrypt_full_crc32(
929 			space_id, crypt_data, tmp_frame, src_frame);
930 	}
931 
932 	return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame,
933 						       physical_size,
934 						       src_frame);
935 }
936 
937 /**
938 Decrypt a page.
939 @param[in]	space			Tablespace
940 @param[in]	tmp_frame		Temporary buffer used for decrypting
941 @param[in,out]	src_frame		Page to decrypt
942 @return decrypted page, or original not encrypted page if decryption is
943 not needed.*/
944 UNIV_INTERN
945 byte*
946 fil_space_decrypt(
947 	const fil_space_t* space,
948 	byte*		tmp_frame,
949 	byte*		src_frame)
950 {
951 	const ulint physical_size = space->physical_size();
952 
953 	ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
954 	ut_ad(space->pending_io());
955 
956 	if (DB_SUCCESS != fil_space_decrypt(space->id, space->crypt_data,
957 					    tmp_frame, physical_size,
958 					    space->flags, src_frame)) {
959 		return nullptr;
960 	}
961 
962 	/* Copy the decrypted page back to page buffer, not
963 	really any other options. */
964 	memcpy(src_frame, tmp_frame, physical_size);
965 
966 	return src_frame;
967 }
968 
969 /**
970 Calculate post encryption checksum
971 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
972 @param[in]	dst_frame	Block where checksum is calculated
973 @return page checksum
974 not needed. */
975 uint32_t
976 fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame)
977 {
978 	/* For encrypted tables we use only crc32 and strict_crc32 */
979 	return zip_size
980 		? page_zip_calc_checksum(dst_frame, zip_size,
981 					 SRV_CHECKSUM_ALGORITHM_CRC32)
982 		: buf_calc_page_crc32(dst_frame);
983 }
984 
985 /***********************************************************************/
986 
987 /** A copy of global key state */
988 struct key_state_t {
989 	key_state_t() : key_id(0), key_version(0),
990 			rotate_key_age(srv_fil_crypt_rotate_key_age) {}
991 	bool operator==(const key_state_t& other) const {
992 		return key_version == other.key_version &&
993 			rotate_key_age == other.rotate_key_age;
994 	}
995 	uint key_id;
996 	uint key_version;
997 	uint rotate_key_age;
998 };
999 
1000 /***********************************************************************
1001 Copy global key state
1002 @param[in,out]	new_state	key state
1003 @param[in]	crypt_data	crypt data */
1004 static void
1005 fil_crypt_get_key_state(
1006 	key_state_t*			new_state,
1007 	fil_space_crypt_t*		crypt_data)
1008 {
1009 	if (srv_encrypt_tables) {
1010 		new_state->key_version = crypt_data->key_get_latest_version();
1011 		new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
1012 
1013 		ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
1014 	} else {
1015 		new_state->key_version = 0;
1016 		new_state->rotate_key_age = 0;
1017 	}
1018 }
1019 
1020 /***********************************************************************
1021 Check if a key needs rotation given a key_state
1022 @param[in]	crypt_data		Encryption information
1023 @param[in]	key_version		Current key version
1024 @param[in]	latest_key_version	Latest key version
1025 @param[in]	rotate_key_age		when to rotate
1026 @return true if key needs rotation, false if not */
1027 static bool
1028 fil_crypt_needs_rotation(
1029 	const fil_space_crypt_t*	crypt_data,
1030 	uint				key_version,
1031 	uint				latest_key_version,
1032 	uint				rotate_key_age)
1033 {
1034 	if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
1035 		return false;
1036 	}
1037 
1038 	if (key_version == 0 && latest_key_version != 0) {
1039 		/* this is rotation unencrypted => encrypted
1040 		* ignore rotate_key_age */
1041 		return true;
1042 	}
1043 
1044 	if (latest_key_version == 0 && key_version != 0) {
1045 		if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
1046 			/* this is rotation encrypted => unencrypted */
1047 			return true;
1048 		}
1049 		return false;
1050 	}
1051 
1052 	if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
1053 	    && crypt_data->type == CRYPT_SCHEME_1
1054 	    && !srv_encrypt_tables) {
1055 		/* This is rotation encrypted => unencrypted */
1056 		return true;
1057 	}
1058 
1059 	if (rotate_key_age == 0) {
1060 		return false;
1061 	}
1062 
1063 	/* this is rotation encrypted => encrypted,
1064 	* only reencrypt if key is sufficiently old */
1065 	if (key_version + rotate_key_age < latest_key_version) {
1066 		return true;
1067 	}
1068 
1069 	return false;
1070 }
1071 
1072 /** Read page 0 and possible crypt data from there.
1073 @param[in,out]	space		Tablespace */
1074 static inline
1075 void
1076 fil_crypt_read_crypt_data(fil_space_t* space)
1077 {
1078 	if (space->crypt_data || space->size
1079 	    || !fil_space_get_size(space->id)) {
1080 		/* The encryption metadata has already been read, or
1081 		the tablespace is not encrypted and the file has been
1082 		opened already, or the file cannot be accessed,
1083 		likely due to a concurrent DROP
1084 		(possibly as part of TRUNCATE or ALTER TABLE).
1085 		FIXME: The file can become unaccessible any time
1086 		after this check! We should really remove this
1087 		function and instead make crypt_data an integral
1088 		part of fil_space_t. */
1089 		return;
1090 	}
1091 
1092 	const ulint zip_size = space->zip_size();
1093 	mtr_t	mtr;
1094 	mtr.start();
1095 	if (buf_block_t* block = buf_page_get(page_id_t(space->id, 0),
1096 					      zip_size, RW_S_LATCH, &mtr)) {
1097 		mutex_enter(&fil_system.mutex);
1098 		if (!space->crypt_data) {
1099 			space->crypt_data = fil_space_read_crypt_data(
1100 				zip_size, block->frame);
1101 		}
1102 		mutex_exit(&fil_system.mutex);
1103 	}
1104 	mtr.commit();
1105 }
1106 
1107 /** Start encrypting a space
1108 @param[in,out]		space		Tablespace
1109 @return true if a recheck of tablespace is needed by encryption thread. */
1110 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
1111 {
1112 	bool recheck = false;
1113 
1114 	mutex_enter(&fil_crypt_threads_mutex);
1115 
1116 	fil_space_crypt_t *crypt_data = space->crypt_data;
1117 
1118 	/* If space is not encrypted and encryption is not enabled, then
1119 	do not continue encrypting the space. */
1120 	if (!crypt_data && !srv_encrypt_tables) {
1121 		mutex_exit(&fil_crypt_threads_mutex);
1122 		return false;
1123 	}
1124 
1125 	if (crypt_data != NULL || fil_crypt_start_converting) {
1126 		/* someone beat us to it */
1127 		if (fil_crypt_start_converting) {
1128 			recheck = true;
1129 		}
1130 
1131 		mutex_exit(&fil_crypt_threads_mutex);
1132 		return recheck;
1133 	}
1134 
1135 	/* NOTE: we need to write and flush page 0 before publishing
1136 	* the crypt data. This so that after restart there is no
1137 	* risk of finding encrypted pages without having
1138 	* crypt data in page 0 */
1139 
1140 	/* 1 - create crypt data */
1141 	crypt_data = fil_space_create_crypt_data(
1142 		FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
1143 
1144 	if (crypt_data == NULL) {
1145 		mutex_exit(&fil_crypt_threads_mutex);
1146 		return false;
1147 	}
1148 
1149 	crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
1150 	crypt_data->min_key_version = 0; // all pages are unencrypted
1151 	crypt_data->rotate_state.start_time = time(0);
1152 	crypt_data->rotate_state.starting = true;
1153 	crypt_data->rotate_state.active_threads = 1;
1154 
1155 	mutex_enter(&fil_system.mutex);
1156 	space->crypt_data = crypt_data;
1157 	mutex_exit(&fil_system.mutex);
1158 
1159 	fil_crypt_start_converting = true;
1160 	mutex_exit(&fil_crypt_threads_mutex);
1161 
1162 	do
1163 	{
1164 		mtr_t mtr;
1165 		mtr.start();
1166 		mtr.set_named_space(space);
1167 
1168 		/* 2 - get page 0 */
1169 		dberr_t err = DB_SUCCESS;
1170 		buf_block_t* block = buf_page_get_gen(
1171 			page_id_t(space->id, 0), space->zip_size(),
1172 			RW_X_LATCH, NULL, BUF_GET,
1173 			__FILE__, __LINE__,
1174 			&mtr, &err);
1175 
1176 
1177 		/* 3 - write crypt data to page 0 */
1178 		byte* frame = buf_block_get_frame(block);
1179 		crypt_data->type = CRYPT_SCHEME_1;
1180 		crypt_data->write_page0(space, frame, &mtr);
1181 
1182 		mtr.commit();
1183 
1184 		/* record lsn of update */
1185 		lsn_t end_lsn = mtr.commit_lsn();
1186 
1187 		/* 4 - sync tablespace before publishing crypt data */
1188 
1189 		bool success = false;
1190 
1191 		do {
1192 			ulint n_pages = 0;
1193 			success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
1194 			buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
1195 		} while (!success);
1196 
1197 		/* 5 - publish crypt data */
1198 		mutex_enter(&fil_crypt_threads_mutex);
1199 		mutex_enter(&crypt_data->mutex);
1200 		crypt_data->type = CRYPT_SCHEME_1;
1201 		ut_a(crypt_data->rotate_state.active_threads == 1);
1202 		crypt_data->rotate_state.active_threads = 0;
1203 		crypt_data->rotate_state.starting = false;
1204 
1205 		fil_crypt_start_converting = false;
1206 		mutex_exit(&crypt_data->mutex);
1207 		mutex_exit(&fil_crypt_threads_mutex);
1208 
1209 		return recheck;
1210 	} while (0);
1211 
1212 	mutex_enter(&crypt_data->mutex);
1213 	ut_a(crypt_data->rotate_state.active_threads == 1);
1214 	crypt_data->rotate_state.active_threads = 0;
1215 	mutex_exit(&crypt_data->mutex);
1216 
1217 	mutex_enter(&fil_crypt_threads_mutex);
1218 	fil_crypt_start_converting = false;
1219 	mutex_exit(&fil_crypt_threads_mutex);
1220 
1221 	return recheck;
1222 }
1223 
1224 /** State of a rotation thread */
1225 struct rotate_thread_t {
1226 	explicit rotate_thread_t(uint no) {
1227 		memset(this, 0, sizeof(* this));
1228 		thread_no = no;
1229 		first = true;
1230 		estimated_max_iops = 20;
1231 	}
1232 
1233 	uint thread_no;
1234 	bool first;		    /*!< is position before first space */
1235 	fil_space_t* space;	    /*!< current space or NULL */
1236 	ulint offset;		    /*!< current offset */
1237 	ulint batch;		    /*!< #pages to rotate */
1238 	uint  min_key_version_found;/*!< min key version found but not rotated */
1239 	lsn_t end_lsn;		    /*!< max lsn when rotating this space */
1240 
1241 	uint estimated_max_iops;   /*!< estimation of max iops */
1242 	uint allocated_iops;	   /*!< allocated iops */
1243 	ulint cnt_waited;	   /*!< #times waited during this slot */
1244 	uintmax_t sum_waited_us;   /*!< wait time during this slot */
1245 
1246 	fil_crypt_stat_t crypt_stat; // statistics
1247 
1248 	btr_scrub_t scrub_data;      /* thread local data used by btr_scrub-functions
1249 				     * when iterating pages of tablespace */
1250 
1251 	/** @return whether this thread should terminate */
1252 	bool should_shutdown() const {
1253 		switch (srv_shutdown_state) {
1254 		case SRV_SHUTDOWN_NONE:
1255 			return thread_no >= srv_n_fil_crypt_threads;
1256 		case SRV_SHUTDOWN_EXIT_THREADS:
1257 			/* srv_init_abort() must have been invoked */
1258 		case SRV_SHUTDOWN_CLEANUP:
1259 		case SRV_SHUTDOWN_INITIATED:
1260 			return true;
1261 		case SRV_SHUTDOWN_FLUSH_PHASE:
1262 		case SRV_SHUTDOWN_LAST_PHASE:
1263 			break;
1264 		}
1265 		ut_ad(0);
1266 		return true;
1267 	}
1268 };
1269 
1270 /** Avoid the removal of the tablespace from
1271 default_encrypt_list only when
1272 1) Another active encryption thread working on tablespace
1273 2) Eligible for tablespace key rotation
1274 3) Tablespace is in flushing phase
1275 @return true if tablespace should be removed from
1276 default encrypt */
1277 static bool fil_crypt_must_remove(const fil_space_t &space)
1278 {
1279   ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1280   fil_space_crypt_t *crypt_data = space.crypt_data;
1281   ut_ad(mutex_own(&fil_system.mutex));
1282   const ulong encrypt_tables= srv_encrypt_tables;
1283   if (!crypt_data)
1284     return !encrypt_tables;
1285   if (!crypt_data->is_key_found())
1286     return true;
1287 
1288   mutex_enter(&crypt_data->mutex);
1289   const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1290     (!crypt_data->rotate_state.flushing &&
1291      !encrypt_tables == !!crypt_data->min_key_version &&
1292      !crypt_data->rotate_state.active_threads);
1293   mutex_exit(&crypt_data->mutex);
1294   return remove;
1295 }
1296 
1297 /***********************************************************************
1298 Check if space needs rotation given a key_state
1299 @param[in,out]		state		Key rotation state
1300 @param[in,out]		key_state	Key state
1301 @param[in,out]		recheck		needs recheck ?
1302 @return true if space needs key rotation */
1303 static
1304 bool
1305 fil_crypt_space_needs_rotation(
1306 	rotate_thread_t*	state,
1307 	key_state_t*		key_state,
1308 	bool*			recheck)
1309 {
1310 	fil_space_t* space = state->space;
1311 
1312 	/* Make sure that tablespace is normal tablespace */
1313 	if (space->purpose != FIL_TYPE_TABLESPACE) {
1314 		return false;
1315 	}
1316 
1317 	ut_ad(space->referenced());
1318 
1319 	fil_space_crypt_t *crypt_data = space->crypt_data;
1320 
1321 	if (crypt_data == NULL) {
1322 		/**
1323 		* space has no crypt data
1324 		*   start encrypting it...
1325 		*/
1326 		*recheck = fil_crypt_start_encrypting_space(space);
1327 		crypt_data = space->crypt_data;
1328 
1329 		if (crypt_data == NULL) {
1330 			return false;
1331 		}
1332 
1333 		crypt_data->key_get_latest_version();
1334 	}
1335 
1336 	/* If used key_id is not found from encryption plugin we can't
1337 	continue to rotate the tablespace */
1338 	if (!crypt_data->is_key_found()) {
1339 		return false;
1340 	}
1341 
1342 	mutex_enter(&crypt_data->mutex);
1343 
1344 	do {
1345 		/* prevent threads from starting to rotate space */
1346 		if (crypt_data->rotate_state.starting) {
1347 			/* recheck this space later */
1348 			*recheck = true;
1349 			break;
1350 		}
1351 
1352 		/* prevent threads from starting to rotate space */
1353 		if (space->is_stopping()) {
1354 			break;
1355 		}
1356 
1357 		if (crypt_data->rotate_state.flushing) {
1358 			break;
1359 		}
1360 
1361 		/* No need to rotate space if encryption is disabled */
1362 		if (crypt_data->not_encrypted()) {
1363 			break;
1364 		}
1365 
1366 		if (crypt_data->key_id != key_state->key_id) {
1367 			key_state->key_id= crypt_data->key_id;
1368 			fil_crypt_get_key_state(key_state, crypt_data);
1369 		}
1370 
1371 		bool need_key_rotation = fil_crypt_needs_rotation(
1372 			crypt_data,
1373 			crypt_data->min_key_version,
1374 			key_state->key_version,
1375 			key_state->rotate_key_age);
1376 
1377 		crypt_data->rotate_state.scrubbing.is_active =
1378 			btr_scrub_start_space(*space, &state->scrub_data);
1379 
1380 		time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
1381 			last_scrub_completed;
1382 
1383 		bool need_scrubbing =
1384 			(srv_background_scrub_data_uncompressed ||
1385 			 srv_background_scrub_data_compressed) &&
1386 			crypt_data->rotate_state.scrubbing.is_active
1387 			&& diff >= 0
1388 			&& ulint(diff) >= srv_background_scrub_data_interval;
1389 
1390 		if (need_key_rotation == false && need_scrubbing == false) {
1391 			break;
1392 		}
1393 
1394 		mutex_exit(&crypt_data->mutex);
1395 
1396 		return true;
1397 	} while (0);
1398 
1399 	mutex_exit(&crypt_data->mutex);
1400 
1401 
1402 	return false;
1403 }
1404 
1405 /***********************************************************************
1406 Update global statistics with thread statistics
1407 @param[in,out]	state		key rotation statistics */
1408 static void
1409 fil_crypt_update_total_stat(
1410 	rotate_thread_t *state)
1411 {
1412 	mutex_enter(&crypt_stat_mutex);
1413 	crypt_stat.pages_read_from_cache +=
1414 		state->crypt_stat.pages_read_from_cache;
1415 	crypt_stat.pages_read_from_disk +=
1416 		state->crypt_stat.pages_read_from_disk;
1417 	crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1418 	crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1419 	// remote old estimate
1420 	crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1421 	// add new estimate
1422 	crypt_stat.estimated_iops += state->estimated_max_iops;
1423 	mutex_exit(&crypt_stat_mutex);
1424 
1425 	// make new estimate "current" estimate
1426 	memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1427 	// record our old (current) estimate
1428 	state->crypt_stat.estimated_iops = state->estimated_max_iops;
1429 }
1430 
1431 /***********************************************************************
1432 Allocate iops to thread from global setting,
1433 used before starting to rotate a space.
1434 @param[in,out]		state		Rotation state
1435 @return true if allocation succeeded, false if failed */
1436 static
1437 bool
1438 fil_crypt_alloc_iops(
1439 	rotate_thread_t *state)
1440 {
1441 	ut_ad(state->allocated_iops == 0);
1442 
1443 	/* We have not yet selected the space to rotate, thus
1444 	state might not contain space and we can't check
1445 	its status yet. */
1446 
1447 	uint max_iops = state->estimated_max_iops;
1448 	mutex_enter(&fil_crypt_threads_mutex);
1449 
1450 	if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1451 		/* this can happen when user decreases srv_fil_crypt_iops */
1452 		mutex_exit(&fil_crypt_threads_mutex);
1453 		return false;
1454 	}
1455 
1456 	uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1457 
1458 	if (alloc > max_iops) {
1459 		alloc = max_iops;
1460 	}
1461 
1462 	n_fil_crypt_iops_allocated += alloc;
1463 	mutex_exit(&fil_crypt_threads_mutex);
1464 
1465 	state->allocated_iops = alloc;
1466 
1467 	return alloc > 0;
1468 }
1469 
1470 /***********************************************************************
1471 Reallocate iops to thread,
1472 used when inside a space
1473 @param[in,out]		state		Rotation state */
1474 static
1475 void
1476 fil_crypt_realloc_iops(
1477 	rotate_thread_t *state)
1478 {
1479 	ut_a(state->allocated_iops > 0);
1480 
1481 	if (10 * state->cnt_waited > state->batch) {
1482 		/* if we waited more than 10% re-estimate max_iops */
1483 		ulint avg_wait_time_us =
1484 			ulint(state->sum_waited_us / state->cnt_waited);
1485 
1486 		if (avg_wait_time_us == 0) {
1487 			avg_wait_time_us = 1; // prevent division by zero
1488 		}
1489 
1490 		DBUG_PRINT("ib_crypt",
1491 			("thr_no: %u - update estimated_max_iops from %u to "
1492 			 ULINTPF ".",
1493 			state->thread_no,
1494 			state->estimated_max_iops,
1495 			1000000 / avg_wait_time_us));
1496 
1497 		state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1498 		state->cnt_waited = 0;
1499 		state->sum_waited_us = 0;
1500 	} else {
1501 		DBUG_PRINT("ib_crypt",
1502 			   ("thr_no: %u only waited " ULINTPF
1503 			    "%% skip re-estimate.",
1504 			    state->thread_no,
1505 			    (100 * state->cnt_waited)
1506 			    / (state->batch ? state->batch : 1)));
1507 	}
1508 
1509 	if (state->estimated_max_iops <= state->allocated_iops) {
1510 		/* return extra iops */
1511 		uint extra = state->allocated_iops - state->estimated_max_iops;
1512 
1513 		if (extra > 0) {
1514 			mutex_enter(&fil_crypt_threads_mutex);
1515 			if (n_fil_crypt_iops_allocated < extra) {
1516 				/* unknown bug!
1517 				* crash in debug
1518 				* keep n_fil_crypt_iops_allocated unchanged
1519 				* in release */
1520 				ut_ad(0);
1521 				extra = 0;
1522 			}
1523 			n_fil_crypt_iops_allocated -= extra;
1524 			state->allocated_iops -= extra;
1525 
1526 			if (state->allocated_iops == 0) {
1527 				/* no matter how slow io system seems to be
1528 				* never decrease allocated_iops to 0... */
1529 				state->allocated_iops ++;
1530 				n_fil_crypt_iops_allocated ++;
1531 			}
1532 
1533 			os_event_set(fil_crypt_threads_event);
1534 			mutex_exit(&fil_crypt_threads_mutex);
1535 		}
1536 	} else {
1537 		/* see if there are more to get */
1538 		mutex_enter(&fil_crypt_threads_mutex);
1539 		if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1540 			/* there are extra iops free */
1541 			uint extra = srv_n_fil_crypt_iops -
1542 				n_fil_crypt_iops_allocated;
1543 			if (state->allocated_iops + extra >
1544 			    state->estimated_max_iops) {
1545 				/* but don't alloc more than our max */
1546 				extra = state->estimated_max_iops -
1547 					state->allocated_iops;
1548 			}
1549 			n_fil_crypt_iops_allocated += extra;
1550 			state->allocated_iops += extra;
1551 
1552 			DBUG_PRINT("ib_crypt",
1553 				("thr_no: %u increased iops from %u to %u.",
1554 				state->thread_no,
1555 				state->allocated_iops - extra,
1556 				state->allocated_iops));
1557 
1558 		}
1559 		mutex_exit(&fil_crypt_threads_mutex);
1560 	}
1561 
1562 	fil_crypt_update_total_stat(state);
1563 }
1564 
1565 /***********************************************************************
1566 Return allocated iops to global
1567 @param[in,out]		state		Rotation state */
1568 static
1569 void
1570 fil_crypt_return_iops(
1571 	rotate_thread_t *state)
1572 {
1573 	if (state->allocated_iops > 0) {
1574 		uint iops = state->allocated_iops;
1575 		mutex_enter(&fil_crypt_threads_mutex);
1576 		if (n_fil_crypt_iops_allocated < iops) {
1577 			/* unknown bug!
1578 			* crash in debug
1579 			* keep n_fil_crypt_iops_allocated unchanged
1580 			* in release */
1581 			ut_ad(0);
1582 			iops = 0;
1583 		}
1584 
1585 		n_fil_crypt_iops_allocated -= iops;
1586 		state->allocated_iops = 0;
1587 		os_event_set(fil_crypt_threads_event);
1588 		mutex_exit(&fil_crypt_threads_mutex);
1589 	}
1590 
1591 	fil_crypt_update_total_stat(state);
1592 }
1593 
1594 bool fil_crypt_must_default_encrypt()
1595 {
1596   return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1597 }
1598 
1599 /** Return the next tablespace from default_encrypt_tables.
1600 @param space   previous tablespace (NULL to start from the start)
1601 @param recheck whether the removal condition needs to be rechecked after
1602 the encryption parameters were changed
1603 @param encrypt expected state of innodb_encrypt_tables
1604 @return the next tablespace to process (n_pending_ops incremented)
1605 @retval NULL if this was the last */
1606 inline fil_space_t *fil_system_t::default_encrypt_next(
1607   fil_space_t *space, bool recheck, bool encrypt)
1608 {
1609   ut_ad(mutex_own(&mutex));
1610 
1611   sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1612     space && space->is_in_default_encrypt
1613     ? space
1614     : default_encrypt_tables.begin();
1615   const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1616     default_encrypt_tables.end();
1617 
1618   if (space)
1619   {
1620     const bool released= !space->release();
1621 
1622     if (space->is_in_default_encrypt)
1623     {
1624       while (++it != end &&
1625              (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1626 
1627       /* If one of the encryption threads already started
1628       the encryption of the table then don't remove the
1629       unencrypted spaces from default encrypt list.
1630 
1631       If there is a change in innodb_encrypt_tables variables
1632       value then don't remove the last processed tablespace
1633       from the default encrypt list. */
1634       if (released && !recheck && fil_crypt_must_remove(*space))
1635       {
1636         ut_a(!default_encrypt_tables.empty());
1637         default_encrypt_tables.remove(*space);
1638         space->is_in_default_encrypt= false;
1639       }
1640     }
1641   }
1642   else while (it != end &&
1643 	      (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1644   {
1645     /* Find the next suitable default encrypt table if
1646     beginning of default_encrypt_tables list has been scheduled
1647     to be deleted */
1648     it++;
1649   }
1650 
1651   while (it != end)
1652   {
1653     space= &*it;
1654     if (space->acquire())
1655       return space;
1656     while (++it != end && (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1657   }
1658 
1659   return NULL;
1660 }
1661 
1662 /** Return the next tablespace.
1663 @param space    previous tablespace (NULL to start from the beginning)
1664 @param recheck  whether the removal condition needs to be rechecked after
1665 the encryption parameters were changed
1666 @param encrypt  expected state of innodb_encrypt_tables
1667 @return pointer to the next tablespace (with n_pending_ops incremented)
1668 @retval NULL if this was the last */
1669 static fil_space_t *fil_space_next(fil_space_t *space, bool recheck,
1670                                    bool encrypt)
1671 {
1672   mutex_enter(&fil_system.mutex);
1673 
1674   if (fil_crypt_must_default_encrypt())
1675     space= fil_system.default_encrypt_next(space, recheck, encrypt);
1676   else if (!space)
1677   {
1678     space= UT_LIST_GET_FIRST(fil_system.space_list);
1679     /* We can trust that space is not NULL because at least the
1680     system tablespace is always present and loaded first. */
1681     if (!space->acquire())
1682       goto next;
1683   }
1684   else
1685   {
1686     /* Move on to the next fil_space_t */
1687     space->release();
1688 next:
1689     space= UT_LIST_GET_NEXT(space_list, space);
1690 
1691     /* Skip abnormal tablespaces or those that are being created by
1692     fil_ibd_create(), or being dropped. */
1693     while (space &&
1694            (UT_LIST_GET_LEN(space->chain) == 0 ||
1695             space->is_stopping() || space->purpose != FIL_TYPE_TABLESPACE))
1696       space= UT_LIST_GET_NEXT(space_list, space);
1697 
1698     if (space && !space->acquire())
1699       goto next;
1700   }
1701 
1702   mutex_exit(&fil_system.mutex);
1703   return space;
1704 }
1705 
1706 /** Search for a space needing rotation
1707 @param[in,out]	key_state	Key state
1708 @param[in,out]	state		Rotation state
1709 @param[in,out]	recheck		recheck of the tablespace is needed or
1710 				still encryption thread does write page 0 */
1711 static bool fil_crypt_find_space_to_rotate(
1712 	key_state_t*		key_state,
1713 	rotate_thread_t*	state,
1714 	bool*			recheck)
1715 {
1716 	/* we need iops to start rotating */
1717 	while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1718 		if (state->space && state->space->is_stopping()) {
1719 			state->space->release();
1720 			state->space = NULL;
1721 		}
1722 
1723 		os_event_reset(fil_crypt_threads_event);
1724 		os_event_wait_time(fil_crypt_threads_event, 100000);
1725 	}
1726 
1727 	if (state->should_shutdown()) {
1728 		if (state->space) {
1729 			state->space->release();
1730 			state->space = NULL;
1731 		}
1732 		return false;
1733 	}
1734 
1735 	if (state->first) {
1736 		state->first = false;
1737 		if (state->space) {
1738 			state->space->release();
1739 		}
1740 		state->space = NULL;
1741 	}
1742 
1743 	state->space = fil_space_next(state->space, *recheck,
1744 				      key_state->key_version != 0);
1745 
1746 	while (!state->should_shutdown() && state->space) {
1747 		/* If there is no crypt data and we have not yet read
1748 		page 0 for this tablespace, we need to read it before
1749 		we can continue. */
1750 		if (!state->space->crypt_data) {
1751 			fil_crypt_read_crypt_data(state->space);
1752 		}
1753 
1754 		if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1755 			ut_ad(key_state->key_id);
1756 			/* init state->min_key_version_found before
1757 			* starting on a space */
1758 			state->min_key_version_found = key_state->key_version;
1759 			return true;
1760 		}
1761 
1762 		state->space = fil_space_next(state->space, *recheck,
1763 					      key_state->key_version != 0);
1764 	}
1765 
1766 	if (state->space) {
1767 		state->space->release();
1768 		state->space = NULL;
1769 	}
1770 
1771 	/* no work to do; release our allocation of I/O capacity */
1772 	fil_crypt_return_iops(state);
1773 
1774 	return false;
1775 
1776 }
1777 
1778 /***********************************************************************
1779 Start rotating a space
1780 @param[in]	key_state		Key state
1781 @param[in,out]	state			Rotation state */
1782 static
1783 void
1784 fil_crypt_start_rotate_space(
1785 	const key_state_t*	key_state,
1786 	rotate_thread_t*	state)
1787 {
1788 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
1789 
1790 	ut_ad(crypt_data);
1791 	mutex_enter(&crypt_data->mutex);
1792 	ut_ad(key_state->key_id == crypt_data->key_id);
1793 
1794 	if (crypt_data->rotate_state.active_threads == 0) {
1795 		/* only first thread needs to init */
1796 		crypt_data->rotate_state.next_offset = 1; // skip page 0
1797 		/* no need to rotate beyond current max
1798 		* if space extends, it will be encrypted with newer version */
1799 		/* FIXME: max_offset could be removed and instead
1800 		space->size consulted.*/
1801 		crypt_data->rotate_state.max_offset = state->space->size;
1802 		crypt_data->rotate_state.end_lsn = 0;
1803 		crypt_data->rotate_state.min_key_version_found =
1804 			key_state->key_version;
1805 
1806 		crypt_data->rotate_state.start_time = time(0);
1807 
1808 		if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1809 			crypt_data->is_encrypted() &&
1810 			key_state->key_version != 0) {
1811 			/* this is rotation unencrypted => encrypted */
1812 			crypt_data->type = CRYPT_SCHEME_1;
1813 		}
1814 	}
1815 
1816 	/* count active threads in space */
1817 	crypt_data->rotate_state.active_threads++;
1818 
1819 	/* Initialize thread local state */
1820 	state->end_lsn = crypt_data->rotate_state.end_lsn;
1821 	state->min_key_version_found =
1822 		crypt_data->rotate_state.min_key_version_found;
1823 
1824 	mutex_exit(&crypt_data->mutex);
1825 }
1826 
1827 /***********************************************************************
1828 Search for batch of pages needing rotation
1829 @param[in]	key_state		Key state
1830 @param[in,out]	state			Rotation state
1831 @return true if page needing key rotation found, false if not found */
1832 static
1833 bool
1834 fil_crypt_find_page_to_rotate(
1835 	const key_state_t*	key_state,
1836 	rotate_thread_t*	state)
1837 {
1838 	ulint batch = srv_alloc_time * state->allocated_iops;
1839 	fil_space_t* space = state->space;
1840 
1841 	ut_ad(!space || space->referenced());
1842 
1843 	/* If space is marked to be dropped stop rotation. */
1844 	if (!space || space->is_stopping()) {
1845 		return false;
1846 	}
1847 
1848 	fil_space_crypt_t *crypt_data = space->crypt_data;
1849 
1850 	mutex_enter(&crypt_data->mutex);
1851 	ut_ad(key_state->key_id == crypt_data->key_id);
1852 
1853 	bool found = crypt_data->rotate_state.max_offset >=
1854 		crypt_data->rotate_state.next_offset;
1855 
1856 	if (found) {
1857 		state->offset = crypt_data->rotate_state.next_offset;
1858 		ulint remaining = crypt_data->rotate_state.max_offset -
1859 			crypt_data->rotate_state.next_offset;
1860 
1861 		if (batch <= remaining) {
1862 			state->batch = batch;
1863 		} else {
1864 			state->batch = remaining;
1865 		}
1866 	}
1867 
1868 	crypt_data->rotate_state.next_offset += batch;
1869 	mutex_exit(&crypt_data->mutex);
1870 	return found;
1871 }
1872 
1873 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1874 	fil_crypt_get_page_throttle_func(state, offset, mtr, \
1875 					 sleeptime_ms, __FILE__, __LINE__)
1876 
1877 /***********************************************************************
1878 Get a page and compute sleep time
1879 @param[in,out]		state		Rotation state
1880 @param[in]		offset		Page offset
1881 @param[in,out]		mtr		Minitransaction
1882 @param[out]		sleeptime_ms	Sleep time
1883 @param[in]		file		File where called
1884 @param[in]		line		Line where called
1885 @return page or NULL*/
1886 static
1887 buf_block_t*
1888 fil_crypt_get_page_throttle_func(
1889 	rotate_thread_t*	state,
1890 	ulint 			offset,
1891 	mtr_t*			mtr,
1892 	ulint*			sleeptime_ms,
1893 	const char*		file,
1894 	unsigned		line)
1895 {
1896 	fil_space_t* space = state->space;
1897 	const ulint zip_size = space->zip_size();
1898 	const page_id_t page_id(space->id, offset);
1899 	ut_ad(space->referenced());
1900 
1901 	/* Before reading from tablespace we need to make sure that
1902 	the tablespace is not about to be dropped. */
1903 	if (space->is_stopping()) {
1904 		return NULL;
1905 	}
1906 
1907 	dberr_t err = DB_SUCCESS;
1908 	buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
1909 					      NULL,
1910 					      BUF_PEEK_IF_IN_POOL, file, line,
1911 					      mtr, &err);
1912 	if (block != NULL) {
1913 		/* page was in buffer pool */
1914 		state->crypt_stat.pages_read_from_cache++;
1915 		return block;
1916 	}
1917 
1918 	if (space->is_stopping()) {
1919 		return NULL;
1920 	}
1921 
1922 	state->crypt_stat.pages_read_from_disk++;
1923 
1924 	const ulonglong start = my_interval_timer();
1925 	block = buf_page_get_gen(page_id, zip_size,
1926 				 RW_X_LATCH,
1927 				 NULL, BUF_GET_POSSIBLY_FREED,
1928 				file, line, mtr, &err);
1929 	const ulonglong end = my_interval_timer();
1930 
1931 	state->cnt_waited++;
1932 
1933 	if (end > start) {
1934 		state->sum_waited_us += (end - start) / 1000;
1935 	}
1936 
1937 	/* average page load */
1938 	ulint add_sleeptime_ms = 0;
1939 	ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1940 	ulint alloc_wait_us = 1000000 / state->allocated_iops;
1941 
1942 	if (avg_wait_time_us < alloc_wait_us) {
1943 		/* we reading faster than we allocated */
1944 		add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1945 	} else {
1946 		/* if page load time is longer than we want, skip sleeping */
1947 	}
1948 
1949 	*sleeptime_ms += add_sleeptime_ms;
1950 
1951 	return block;
1952 }
1953 
1954 
1955 /***********************************************************************
1956 Get block and allocation status
1957 
1958 note: innodb locks fil_space_latch and then block when allocating page
1959 but locks block and then fil_space_latch when freeing page.
1960 
1961 @param[in,out]		state		Rotation state
1962 @param[in]		offset		Page offset
1963 @param[in,out]		mtr		Minitransaction
1964 @param[out]		allocation_status Allocation status
1965 @param[out]		sleeptime_ms	Sleep time
1966 @return block or NULL
1967 */
1968 static
1969 buf_block_t*
1970 btr_scrub_get_block_and_allocation_status(
1971 	rotate_thread_t*	state,
1972 	ulint 			offset,
1973 	mtr_t*			mtr,
1974 	btr_scrub_page_allocation_status_t *allocation_status,
1975 	ulint*			sleeptime_ms)
1976 {
1977 	mtr_t local_mtr;
1978 	buf_block_t *block = NULL;
1979 	fil_space_t* space = state->space;
1980 
1981 	ut_ad(space->referenced());
1982 
1983 	mtr_start(&local_mtr);
1984 
1985 	*allocation_status = fseg_page_is_free(space, (uint32_t)offset) ?
1986 		BTR_SCRUB_PAGE_FREE :
1987 		BTR_SCRUB_PAGE_ALLOCATED;
1988 
1989 	if (*allocation_status == BTR_SCRUB_PAGE_FREE) {
1990 		/* this is easy case, we lock fil_space_latch first and
1991 		then block */
1992 		block = fil_crypt_get_page_throttle(state,
1993 						    offset, mtr,
1994 						    sleeptime_ms);
1995 		mtr_commit(&local_mtr);
1996 	} else {
1997 		/* page is allocated according to xdes */
1998 
1999 		/* release fil_space_latch *before* fetching block */
2000 		mtr_commit(&local_mtr);
2001 
2002 		/* NOTE: when we have locked dict_index_get_lock(),
2003 		* it's safe to release fil_space_latch and then fetch block
2004 		* as dict_index_get_lock() is needed to make tree modifications
2005 		* such as free-ing a page
2006 		*/
2007 
2008 		block = fil_crypt_get_page_throttle(state,
2009 						    offset, mtr,
2010 						    sleeptime_ms);
2011 	}
2012 
2013 	return block;
2014 }
2015 
2016 
2017 /***********************************************************************
2018 Rotate one page
2019 @param[in,out]		key_state		Key state
2020 @param[in,out]		state			Rotation state */
2021 static
2022 void
2023 fil_crypt_rotate_page(
2024 	const key_state_t*	key_state,
2025 	rotate_thread_t*	state)
2026 {
2027 	fil_space_t*space = state->space;
2028 	ulint space_id = space->id;
2029 	ulint offset = state->offset;
2030 	ulint sleeptime_ms = 0;
2031 	fil_space_crypt_t *crypt_data = space->crypt_data;
2032 
2033 	ut_ad(space->referenced());
2034 	ut_ad(offset > 0);
2035 
2036 	/* In fil_crypt_thread where key rotation is done we have
2037 	acquired space and checked that this space is not yet
2038 	marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
2039 	Check here also to give DROP TABLE or similar a change. */
2040 	if (space->is_stopping()) {
2041 		return;
2042 	}
2043 
2044 	if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
2045 		/* don't encrypt this as it contains address to dblwr buffer */
2046 		return;
2047 	}
2048 
2049 	mtr_t mtr;
2050 	mtr.start();
2051 	if (buf_block_t* block = fil_crypt_get_page_throttle(state,
2052 							     offset, &mtr,
2053 							     &sleeptime_ms)) {
2054 		bool modified = false;
2055 		int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
2056 		lsn_t block_lsn = block->page.newest_modification;
2057 		byte* frame = buf_block_get_frame(block);
2058 		uint kv = buf_page_get_key_version(frame, space->flags);
2059 
2060 		if (space->is_stopping()) {
2061 			/* The tablespace is closing (in DROP TABLE or
2062 			TRUNCATE TABLE or similar): avoid further access */
2063 		} else if (!kv && !*reinterpret_cast<uint16_t*>
2064 			   (&frame[FIL_PAGE_TYPE])) {
2065 			/* It looks like this page is not
2066 			allocated. Because key rotation is accessing
2067 			pages in a pattern that is unlike the normal
2068 			B-tree and undo log access pattern, we cannot
2069 			invoke fseg_page_is_free() here, because that
2070 			could result in a deadlock. If we invoked
2071 			fseg_page_is_free() and released the
2072 			tablespace latch before acquiring block->lock,
2073 			then the fseg_page_is_free() information
2074 			could be stale already. */
2075 
2076 			/* If the data file was originally created
2077 			before MariaDB 10.0 or MySQL 5.6, some
2078 			allocated data pages could carry 0 in
2079 			FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
2080 			pages will be updated in
2081 			buf_flush_init_for_writing() when the page
2082 			is modified the next time.
2083 
2084 			Also, when the doublewrite buffer pages are
2085 			allocated on bootstrap in a non-debug build,
2086 			some dummy pages will be allocated, with 0 in
2087 			the FIL_PAGE_TYPE. Those pages should be
2088 			skipped from key rotation forever. */
2089 		} else if (fil_crypt_needs_rotation(
2090 				crypt_data,
2091 				kv,
2092 				key_state->key_version,
2093 				key_state->rotate_key_age)) {
2094 
2095 			mtr.set_named_space(space);
2096 			modified = true;
2097 
2098 			/* force rotation by dummy updating page */
2099 			mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
2100 					 space_id, MLOG_4BYTES, &mtr);
2101 
2102 			/* statistics */
2103 			state->crypt_stat.pages_modified++;
2104 		} else {
2105 			if (crypt_data->is_encrypted()) {
2106 				if (kv < state->min_key_version_found) {
2107 					state->min_key_version_found = kv;
2108 				}
2109 			}
2110 
2111 			needs_scrubbing = btr_page_needs_scrubbing(
2112 				&state->scrub_data, block,
2113 				BTR_SCRUB_PAGE_ALLOCATION_UNKNOWN);
2114 		}
2115 
2116 		mtr.commit();
2117 		lsn_t end_lsn = mtr.commit_lsn();
2118 
2119 		if (needs_scrubbing == BTR_SCRUB_PAGE) {
2120 			mtr.start();
2121 			/*
2122 			* refetch page and allocation status
2123 			*/
2124 			btr_scrub_page_allocation_status_t allocated;
2125 
2126 			block = btr_scrub_get_block_and_allocation_status(
2127 				state, offset, &mtr,
2128 				&allocated,
2129 				&sleeptime_ms);
2130 
2131 			if (block) {
2132 				mtr.set_named_space(space);
2133 
2134 				/* get required table/index and index-locks */
2135 				needs_scrubbing = btr_scrub_recheck_page(
2136 					&state->scrub_data, block, allocated, &mtr);
2137 
2138 				if (needs_scrubbing == BTR_SCRUB_PAGE) {
2139 					/* we need to refetch it once more now that we have
2140 					* index locked */
2141 					block = btr_scrub_get_block_and_allocation_status(
2142 						state, offset, &mtr,
2143 						&allocated,
2144 						&sleeptime_ms);
2145 
2146 					needs_scrubbing = btr_scrub_page(&state->scrub_data,
2147 						block, allocated,
2148 						&mtr);
2149 				}
2150 
2151 				/* NOTE: mtr is committed inside btr_scrub_recheck_page()
2152 				* and/or btr_scrub_page. This is to make sure that
2153 				* locks & pages are latched in corrected order,
2154 				* the mtr is in some circumstances restarted.
2155 				* (mtr_commit() + mtr_start())
2156 				*/
2157 			}
2158 		}
2159 
2160 		if (needs_scrubbing != BTR_SCRUB_PAGE) {
2161 			/* if page didn't need scrubbing it might be that cleanups
2162 			are needed. do those outside of any mtr to prevent deadlocks.
2163 
2164 			the information what kinds of cleanups that are needed are
2165 			encoded inside the needs_scrubbing, but this is opaque to
2166 			this function (except the value BTR_SCRUB_PAGE) */
2167 			btr_scrub_skip_page(&state->scrub_data, needs_scrubbing);
2168 		}
2169 
2170 		if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
2171 			/* if we just detected that scrubbing was turned off
2172 			* update global state to reflect this */
2173 			ut_ad(crypt_data);
2174 			mutex_enter(&crypt_data->mutex);
2175 			crypt_data->rotate_state.scrubbing.is_active = false;
2176 			mutex_exit(&crypt_data->mutex);
2177 		}
2178 
2179 		if (modified) {
2180 			/* if we modified page, we take lsn from mtr */
2181 			ut_a(end_lsn > state->end_lsn);
2182 			ut_a(end_lsn > block_lsn);
2183 			state->end_lsn = end_lsn;
2184 		} else {
2185 			/* if we did not modify page, check for max lsn */
2186 			if (block_lsn > state->end_lsn) {
2187 				state->end_lsn = block_lsn;
2188 			}
2189 		}
2190 	} else {
2191 		/* If block read failed mtr memo and log should be empty. */
2192 		ut_ad(!mtr.has_modifications());
2193 		ut_ad(!mtr.is_dirty());
2194 		ut_ad(mtr.get_memo()->size() == 0);
2195 		ut_ad(mtr.get_log()->size() == 0);
2196 		mtr.commit();
2197 	}
2198 
2199 	if (sleeptime_ms) {
2200 		os_event_reset(fil_crypt_throttle_sleep_event);
2201 		os_event_wait_time(fil_crypt_throttle_sleep_event,
2202 				   1000 * sleeptime_ms);
2203 	}
2204 }
2205 
2206 /***********************************************************************
2207 Rotate a batch of pages
2208 @param[in,out]		key_state		Key state
2209 @param[in,out]		state			Rotation state */
2210 static
2211 void
2212 fil_crypt_rotate_pages(
2213 	const key_state_t*	key_state,
2214 	rotate_thread_t*	state)
2215 {
2216 	ulint space = state->space->id;
2217 	ulint end = std::min(state->offset + state->batch,
2218 			     state->space->free_limit);
2219 
2220 	ut_ad(state->space->referenced());
2221 
2222 	for (; state->offset < end; state->offset++) {
2223 
2224 		/* we can't rotate pages in dblwr buffer as
2225 		* it's not possible to read those due to lots of asserts
2226 		* in buffer pool.
2227 		*
2228 		* However since these are only (short-lived) copies of
2229 		* real pages, they will be updated anyway when the
2230 		* real page is updated
2231 		*/
2232 		if (space == TRX_SYS_SPACE &&
2233 		    buf_dblwr_page_inside(state->offset)) {
2234 			continue;
2235 		}
2236 
2237 		/* If space is marked as stopping, stop rotating
2238 		pages. */
2239 		if (state->space->is_stopping()) {
2240 			break;
2241 		}
2242 
2243 		fil_crypt_rotate_page(key_state, state);
2244 	}
2245 }
2246 
2247 /***********************************************************************
2248 Flush rotated pages and then update page 0
2249 
2250 @param[in,out]		state	rotation state */
2251 static
2252 void
2253 fil_crypt_flush_space(
2254 	rotate_thread_t*	state)
2255 {
2256 	fil_space_t* space = state->space;
2257 	fil_space_crypt_t *crypt_data = space->crypt_data;
2258 
2259 	ut_ad(space->referenced());
2260 
2261 	/* flush tablespace pages so that there are no pages left with old key */
2262 	lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2263 
2264 	if (end_lsn > 0 && !space->is_stopping()) {
2265 		bool success = false;
2266 		ulint n_pages = 0;
2267 		ulint sum_pages = 0;
2268 		const ulonglong start = my_interval_timer();
2269 
2270 		do {
2271 			success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
2272 			buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
2273 			sum_pages += n_pages;
2274 		} while (!success && !space->is_stopping());
2275 
2276 		const ulonglong end = my_interval_timer();
2277 
2278 		if (sum_pages && end > start) {
2279 			state->cnt_waited += sum_pages;
2280 			state->sum_waited_us += (end - start) / 1000;
2281 
2282 			/* statistics */
2283 			state->crypt_stat.pages_flushed += sum_pages;
2284 		}
2285 	}
2286 
2287 	if (crypt_data->min_key_version == 0) {
2288 		crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2289 	}
2290 
2291 	if (space->is_stopping()) {
2292 		return;
2293 	}
2294 
2295 	/* update page 0 */
2296 	mtr_t mtr;
2297 	mtr.start();
2298 
2299 	dberr_t err;
2300 
2301 	if (buf_block_t* block = buf_page_get_gen(
2302 		    page_id_t(space->id, 0), space->zip_size(),
2303 		    RW_X_LATCH, NULL, BUF_GET,
2304 		    __FILE__, __LINE__, &mtr, &err)) {
2305 		mtr.set_named_space(space);
2306 		crypt_data->write_page0(space, block->frame, &mtr);
2307 	}
2308 
2309 	mtr.commit();
2310 }
2311 
2312 /***********************************************************************
2313 Complete rotating a space
2314 @param[in,out]		state			Rotation state */
2315 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2316 {
2317 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
2318 
2319 	ut_ad(crypt_data);
2320 	ut_ad(state->space->referenced());
2321 
2322 	/* Space might already be dropped */
2323 	if (!state->space->is_stopping()) {
2324 		mutex_enter(&crypt_data->mutex);
2325 
2326 		/**
2327 		* Update crypt data state with state from thread
2328 		*/
2329 		if (state->min_key_version_found <
2330 			crypt_data->rotate_state.min_key_version_found) {
2331 			crypt_data->rotate_state.min_key_version_found =
2332 				state->min_key_version_found;
2333 		}
2334 
2335 		if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2336 			crypt_data->rotate_state.end_lsn = state->end_lsn;
2337 		}
2338 
2339 		ut_a(crypt_data->rotate_state.active_threads > 0);
2340 		crypt_data->rotate_state.active_threads--;
2341 		bool last = crypt_data->rotate_state.active_threads == 0;
2342 
2343 		/**
2344 		* check if space is fully done
2345 		* this as when threads shutdown, it could be that we "complete"
2346 		* iterating before we have scanned the full space.
2347 		*/
2348 		bool done = crypt_data->rotate_state.next_offset >=
2349 			crypt_data->rotate_state.max_offset;
2350 
2351 		/**
2352 		* we should flush space if we're last thread AND
2353 		* the iteration is done
2354 		*/
2355 		bool should_flush = last && done;
2356 
2357 		if (should_flush) {
2358 			/* we're the last active thread */
2359 			crypt_data->rotate_state.flushing = true;
2360 			crypt_data->min_key_version =
2361 				crypt_data->rotate_state.min_key_version_found;
2362 		}
2363 
2364 		/* inform scrubbing */
2365 		crypt_data->rotate_state.scrubbing.is_active = false;
2366 		mutex_exit(&crypt_data->mutex);
2367 
2368 		/* all threads must call btr_scrub_complete_space wo/ mutex held */
2369 		if (state->scrub_data.scrubbing) {
2370 			btr_scrub_complete_space(&state->scrub_data);
2371 			if (should_flush) {
2372 				/* only last thread updates last_scrub_completed */
2373 				ut_ad(crypt_data);
2374 				mutex_enter(&crypt_data->mutex);
2375 				crypt_data->rotate_state.scrubbing.
2376 					last_scrub_completed = time(0);
2377 				mutex_exit(&crypt_data->mutex);
2378 			}
2379 		}
2380 
2381 		if (should_flush) {
2382 			fil_crypt_flush_space(state);
2383 
2384 			mutex_enter(&crypt_data->mutex);
2385 			crypt_data->rotate_state.flushing = false;
2386 			mutex_exit(&crypt_data->mutex);
2387 		}
2388 	} else {
2389 		mutex_enter(&crypt_data->mutex);
2390 		ut_a(crypt_data->rotate_state.active_threads > 0);
2391 		crypt_data->rotate_state.active_threads--;
2392 		mutex_exit(&crypt_data->mutex);
2393 	}
2394 }
2395 
2396 /*********************************************************************//**
2397 A thread which monitors global key state and rotates tablespaces accordingly
2398 @return a dummy parameter */
2399 extern "C" UNIV_INTERN
2400 os_thread_ret_t
2401 DECLARE_THREAD(fil_crypt_thread)(void*)
2402 {
2403 	mutex_enter(&fil_crypt_threads_mutex);
2404 	uint thread_no = srv_n_fil_crypt_threads_started;
2405 	srv_n_fil_crypt_threads_started++;
2406 	os_event_set(fil_crypt_event); /* signal that we started */
2407 	mutex_exit(&fil_crypt_threads_mutex);
2408 
2409 	/* state of this thread */
2410 	rotate_thread_t thr(thread_no);
2411 
2412 	/* if we find a space that is starting, skip over it and recheck it later */
2413 	bool recheck = false;
2414 
2415 	while (!thr.should_shutdown()) {
2416 
2417 		key_state_t new_state;
2418 
2419 		time_t wait_start = time(0);
2420 
2421 		while (!thr.should_shutdown()) {
2422 
2423 			/* wait for key state changes
2424 			* i.e either new key version of change or
2425 			* new rotate_key_age */
2426 			os_event_reset(fil_crypt_threads_event);
2427 
2428 			if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2429 				break;
2430 			}
2431 
2432 			if (recheck) {
2433 				/* check recheck here, after sleep, so
2434 				* that we don't busy loop while when one thread is starting
2435 				* a space*/
2436 				break;
2437 			}
2438 
2439 			time_t waited = time(0) - wait_start;
2440 
2441 			/* Break if we have waited the background scrub
2442 			internal and background scrubbing is enabled */
2443 			if (waited >= 0
2444 			    && ulint(waited) >= srv_background_scrub_data_check_interval
2445 			    && (srv_background_scrub_data_uncompressed
2446 			        || srv_background_scrub_data_compressed)) {
2447 				break;
2448 			}
2449 		}
2450 
2451 		recheck = false;
2452 		thr.first = true;      // restart from first tablespace
2453 
2454 		/* iterate all spaces searching for those needing rotation */
2455 		while (!thr.should_shutdown() &&
2456 		       fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2457 
2458 			/* we found a space to rotate */
2459 			fil_crypt_start_rotate_space(&new_state, &thr);
2460 
2461 			/* iterate all pages (cooperativly with other threads) */
2462 			while (!thr.should_shutdown() &&
2463 			       fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2464 
2465 				if (!thr.space->is_stopping()) {
2466 					/* rotate a (set) of pages */
2467 					fil_crypt_rotate_pages(&new_state, &thr);
2468 				}
2469 
2470 				/* If space is marked as stopping, release
2471 				space and stop rotation. */
2472 				if (thr.space->is_stopping()) {
2473 					fil_crypt_complete_rotate_space(&thr);
2474 					thr.space->release();
2475 					thr.space = NULL;
2476 					break;
2477 				}
2478 
2479 				/* realloc iops */
2480 				fil_crypt_realloc_iops(&thr);
2481 			}
2482 
2483 			/* complete rotation */
2484 			if (thr.space) {
2485 				fil_crypt_complete_rotate_space(&thr);
2486 			}
2487 
2488 			/* force key state refresh */
2489 			new_state.key_id = 0;
2490 
2491 			/* return iops */
2492 			fil_crypt_return_iops(&thr);
2493 		}
2494 	}
2495 
2496 	/* return iops if shutting down */
2497 	fil_crypt_return_iops(&thr);
2498 
2499 	/* release current space if shutting down */
2500 	if (thr.space) {
2501 		thr.space->release();
2502 		thr.space = NULL;
2503 	}
2504 
2505 	mutex_enter(&fil_crypt_threads_mutex);
2506 	srv_n_fil_crypt_threads_started--;
2507 	os_event_set(fil_crypt_event); /* signal that we stopped */
2508 	mutex_exit(&fil_crypt_threads_mutex);
2509 
2510 	/* We count the number of threads in os_thread_exit(). A created
2511 	thread should always use that to exit and not use return() to exit. */
2512 
2513 	os_thread_exit();
2514 
2515 	OS_THREAD_DUMMY_RETURN;
2516 }
2517 
2518 /*********************************************************************
2519 Adjust thread count for key rotation
2520 @param[in]	enw_cnt		Number of threads to be used */
2521 UNIV_INTERN
2522 void
2523 fil_crypt_set_thread_cnt(
2524 	const uint	new_cnt)
2525 {
2526 	if (!fil_crypt_threads_inited) {
2527 		fil_crypt_threads_init();
2528 	}
2529 
2530 	mutex_enter(&fil_crypt_threads_mutex);
2531 
2532 	if (new_cnt > srv_n_fil_crypt_threads) {
2533 		uint add = new_cnt - srv_n_fil_crypt_threads;
2534 		srv_n_fil_crypt_threads = new_cnt;
2535 		for (uint i = 0; i < add; i++) {
2536 			os_thread_id_t rotation_thread_id;
2537 			os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
2538 			ib::info() << "Creating #"
2539 				   << i+1 << " encryption thread id "
2540 				   << os_thread_pf(rotation_thread_id)
2541 				   << " total threads " << new_cnt << ".";
2542 		}
2543 	} else if (new_cnt < srv_n_fil_crypt_threads) {
2544 		srv_n_fil_crypt_threads = new_cnt;
2545 		os_event_set(fil_crypt_threads_event);
2546 	}
2547 
2548 	mutex_exit(&fil_crypt_threads_mutex);
2549 
2550 	while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2551 		os_event_reset(fil_crypt_event);
2552 		os_event_wait_time(fil_crypt_event, 100000);
2553 	}
2554 
2555 	/* Send a message to encryption threads that there could be
2556 	something to do. */
2557 	if (srv_n_fil_crypt_threads) {
2558 		os_event_set(fil_crypt_threads_event);
2559 	}
2560 }
2561 
2562 /** Initialize the tablespace default_encrypt_tables
2563 if innodb_encryption_rotate_key_age=0. */
2564 static void fil_crypt_default_encrypt_tables_fill()
2565 {
2566 	ut_ad(mutex_own(&fil_system.mutex));
2567 
2568 	for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2569 	     space != NULL;
2570 	     space = UT_LIST_GET_NEXT(space_list, space)) {
2571 		if (space->purpose != FIL_TYPE_TABLESPACE
2572 		    || space->is_in_default_encrypt
2573 		    || UT_LIST_GET_LEN(space->chain) == 0
2574 		    || !space->acquire()) {
2575 			continue;
2576 		}
2577 
2578 		/* Ensure that crypt_data has been initialized. */
2579 		if (!space->size) {
2580 			ut_d(const fil_space_t* s=)
2581 			        fil_system.read_page0(space->id);
2582 			ut_ad(!s || s == space);
2583 			if (!space->size) {
2584 				/* Page 0 was not loaded.
2585 				Skip this tablespace. */
2586 				goto next;
2587 			}
2588 		}
2589 
2590 		/* Skip ENCRYPTION!=DEFAULT tablespaces. */
2591 		if (space->crypt_data
2592 		    && !space->crypt_data->is_default_encryption()) {
2593 			goto next;
2594 		}
2595 
2596 		if (srv_encrypt_tables) {
2597 			/* Skip encrypted tablespaces if
2598 			innodb_encrypt_tables!=OFF */
2599 			if (space->crypt_data
2600 			    && space->crypt_data->min_key_version) {
2601 				goto next;
2602 			}
2603 		} else {
2604 			/* Skip unencrypted tablespaces if
2605 			innodb_encrypt_tables=OFF */
2606 			if (!space->crypt_data
2607 			    || !space->crypt_data->min_key_version) {
2608 				goto next;
2609 			}
2610 		}
2611 
2612 		fil_system.default_encrypt_tables.push_back(*space);
2613 		space->is_in_default_encrypt = true;
2614 next:
2615 		space->release();
2616 	}
2617 }
2618 
2619 /*********************************************************************
2620 Adjust max key age
2621 @param[in]	val		New max key age */
2622 UNIV_INTERN
2623 void
2624 fil_crypt_set_rotate_key_age(
2625 	uint	val)
2626 {
2627 	mutex_enter(&fil_system.mutex);
2628 	srv_fil_crypt_rotate_key_age = val;
2629 	if (val == 0) {
2630 		fil_crypt_default_encrypt_tables_fill();
2631 	}
2632 	mutex_exit(&fil_system.mutex);
2633 	os_event_set(fil_crypt_threads_event);
2634 }
2635 
2636 /*********************************************************************
2637 Adjust rotation iops
2638 @param[in]	val		New max roation iops */
2639 UNIV_INTERN
2640 void
2641 fil_crypt_set_rotation_iops(
2642 	uint val)
2643 {
2644 	srv_n_fil_crypt_iops = val;
2645 	os_event_set(fil_crypt_threads_event);
2646 }
2647 
2648 /*********************************************************************
2649 Adjust encrypt tables
2650 @param[in]	val		New setting for innodb-encrypt-tables */
2651 UNIV_INTERN
2652 void
2653 fil_crypt_set_encrypt_tables(
2654 	uint val)
2655 {
2656 	if (!fil_crypt_threads_inited) {
2657 		return;
2658 	}
2659 
2660 	mutex_enter(&fil_system.mutex);
2661 
2662 	srv_encrypt_tables = val;
2663 
2664 	if (fil_crypt_must_default_encrypt()) {
2665 		fil_crypt_default_encrypt_tables_fill();
2666 	}
2667 
2668 	mutex_exit(&fil_system.mutex);
2669 
2670 	os_event_set(fil_crypt_threads_event);
2671 }
2672 
2673 /*********************************************************************
2674 Init threads for key rotation */
2675 UNIV_INTERN
2676 void
2677 fil_crypt_threads_init()
2678 {
2679 	if (!fil_crypt_threads_inited) {
2680 		fil_crypt_event = os_event_create(0);
2681 		fil_crypt_threads_event = os_event_create(0);
2682 		mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2683 		     &fil_crypt_threads_mutex);
2684 
2685 		uint cnt = srv_n_fil_crypt_threads;
2686 		srv_n_fil_crypt_threads = 0;
2687 		fil_crypt_threads_inited = true;
2688 		fil_crypt_set_thread_cnt(cnt);
2689 	}
2690 }
2691 
2692 /*********************************************************************
2693 Clean up key rotation threads resources */
2694 UNIV_INTERN
2695 void
2696 fil_crypt_threads_cleanup()
2697 {
2698 	if (!fil_crypt_threads_inited) {
2699 		return;
2700 	}
2701 	ut_a(!srv_n_fil_crypt_threads_started);
2702 	os_event_destroy(fil_crypt_event);
2703 	os_event_destroy(fil_crypt_threads_event);
2704 	mutex_free(&fil_crypt_threads_mutex);
2705 	fil_crypt_threads_inited = false;
2706 }
2707 
2708 /*********************************************************************
2709 Wait for crypt threads to stop accessing space
2710 @param[in]	space		Tablespace */
2711 UNIV_INTERN
2712 void
2713 fil_space_crypt_close_tablespace(
2714 	const fil_space_t*	space)
2715 {
2716 	fil_space_crypt_t* crypt_data = space->crypt_data;
2717 
2718 	if (!crypt_data || srv_n_fil_crypt_threads == 0
2719 	    || !fil_crypt_threads_inited) {
2720 		return;
2721 	}
2722 
2723 	mutex_enter(&fil_crypt_threads_mutex);
2724 
2725 	time_t start = time(0);
2726 	time_t last = start;
2727 
2728 	mutex_enter(&crypt_data->mutex);
2729 	mutex_exit(&fil_crypt_threads_mutex);
2730 
2731 	ulint cnt = crypt_data->rotate_state.active_threads;
2732 	bool flushing = crypt_data->rotate_state.flushing;
2733 
2734 	while (cnt > 0 || flushing) {
2735 		mutex_exit(&crypt_data->mutex);
2736 		/* release dict mutex so that scrub threads can release their
2737 		* table references */
2738 		dict_mutex_exit_for_mysql();
2739 
2740 		/* wakeup throttle (all) sleepers */
2741 		os_event_set(fil_crypt_throttle_sleep_event);
2742 		os_event_set(fil_crypt_threads_event);
2743 
2744 		os_thread_sleep(20000);
2745 		dict_mutex_enter_for_mysql();
2746 		mutex_enter(&crypt_data->mutex);
2747 		cnt = crypt_data->rotate_state.active_threads;
2748 		flushing = crypt_data->rotate_state.flushing;
2749 
2750 		time_t now = time(0);
2751 
2752 		if (now >= last + 30) {
2753 			ib::warn() << "Waited "
2754 				   << now - start
2755 				   << " seconds to drop space: "
2756 				   << space->name << " ("
2757 				   << space->id << ") active threads "
2758 				   << cnt << "flushing="
2759 				   << flushing << ".";
2760 			last = now;
2761 		}
2762 	}
2763 
2764 	mutex_exit(&crypt_data->mutex);
2765 }
2766 
2767 /*********************************************************************
2768 Get crypt status for a space (used by information_schema)
2769 @param[in]	space		Tablespace
2770 @param[out]	status		Crypt status */
2771 UNIV_INTERN
2772 void
2773 fil_space_crypt_get_status(
2774 	const fil_space_t*			space,
2775 	struct fil_space_crypt_status_t*	status)
2776 {
2777 	memset(status, 0, sizeof(*status));
2778 
2779 	ut_ad(space->referenced());
2780 
2781 	/* If there is no crypt data and we have not yet read
2782 	page 0 for this tablespace, we need to read it before
2783 	we can continue. */
2784 	if (!space->crypt_data) {
2785 		fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2786 	}
2787 
2788 	status->space = ULINT_UNDEFINED;
2789 
2790 	if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2791 		status->space = space->id;
2792 		mutex_enter(&crypt_data->mutex);
2793 		status->scheme = crypt_data->type;
2794 		status->keyserver_requests = crypt_data->keyserver_requests;
2795 		status->min_key_version = crypt_data->min_key_version;
2796 		status->key_id = crypt_data->key_id;
2797 
2798 		if (crypt_data->rotate_state.active_threads > 0 ||
2799 		    crypt_data->rotate_state.flushing) {
2800 			status->rotating = true;
2801 			status->flushing =
2802 				crypt_data->rotate_state.flushing;
2803 			status->rotate_next_page_number =
2804 				crypt_data->rotate_state.next_offset;
2805 			status->rotate_max_page_number =
2806 				crypt_data->rotate_state.max_offset;
2807 		}
2808 
2809 		mutex_exit(&crypt_data->mutex);
2810 
2811 		if (srv_encrypt_tables || crypt_data->min_key_version) {
2812 			status->current_key_version =
2813 				fil_crypt_get_latest_key_version(crypt_data);
2814 		}
2815 	}
2816 }
2817 
2818 /*********************************************************************
2819 Return crypt statistics
2820 @param[out]	stat		Crypt statistics */
2821 UNIV_INTERN
2822 void
2823 fil_crypt_total_stat(
2824 	fil_crypt_stat_t *stat)
2825 {
2826 	mutex_enter(&crypt_stat_mutex);
2827 	*stat = crypt_stat;
2828 	mutex_exit(&crypt_stat_mutex);
2829 }
2830 
2831 /*********************************************************************
2832 Get scrub status for a space (used by information_schema)
2833 
2834 @param[in]	space		Tablespace
2835 @param[out]	status		Scrub status */
2836 UNIV_INTERN
2837 void
2838 fil_space_get_scrub_status(
2839 	const fil_space_t*			space,
2840 	struct fil_space_scrub_status_t*	status)
2841 {
2842 	memset(status, 0, sizeof(*status));
2843 
2844 	ut_ad(space->referenced());
2845 	fil_space_crypt_t* crypt_data = space->crypt_data;
2846 
2847 	status->space = space->id;
2848 
2849 	if (crypt_data != NULL) {
2850 		status->compressed = FSP_FLAGS_GET_ZIP_SSIZE(space->flags) > 0;
2851 		mutex_enter(&crypt_data->mutex);
2852 		status->last_scrub_completed =
2853 			crypt_data->rotate_state.scrubbing.last_scrub_completed;
2854 		if (crypt_data->rotate_state.active_threads > 0 &&
2855 		    crypt_data->rotate_state.scrubbing.is_active) {
2856 			status->scrubbing = true;
2857 			status->current_scrub_started =
2858 				crypt_data->rotate_state.start_time;
2859 			status->current_scrub_active_threads =
2860 				crypt_data->rotate_state.active_threads;
2861 			status->current_scrub_page_number =
2862 				crypt_data->rotate_state.next_offset;
2863 			status->current_scrub_max_page_number =
2864 				crypt_data->rotate_state.max_offset;
2865 		}
2866 
2867 		mutex_exit(&crypt_data->mutex);
2868 	}
2869 }
2870 #endif /* UNIV_INNOCHECKSUM */
2871 
2872 /**
2873 Verify that post encryption checksum match calculated checksum.
2874 This function should be called only if tablespace contains crypt_data
2875 metadata (this is strong indication that tablespace is encrypted).
2876 Function also verifies that traditional checksum does not match
2877 calculated checksum as if it does page could be valid unencrypted,
2878 encrypted, or corrupted.
2879 
2880 @param[in,out]	page		page frame (checksum is temporarily modified)
2881 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
2882 @return true if page is encrypted AND OK, false otherwise */
2883 bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
2884 {
2885 	if (ENCRYPTION_KEY_NOT_ENCRYPTED == mach_read_from_4(
2886 			page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) {
2887 		return false;
2888 	}
2889 
2890 	/* Compressed and encrypted pages do not have checksum. Assume not
2891 	corrupted. Page verification happens after decompression in
2892 	buf_page_io_complete() using buf_page_is_corrupted(). */
2893 	if (mach_read_from_2(page + FIL_PAGE_TYPE)
2894 	    == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2895 		return true;
2896 	}
2897 
2898 	/* Read stored post encryption checksum. */
2899 	const ib_uint32_t checksum = mach_read_from_4(
2900 		page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2901 
2902 	/* If stored checksum matches one of the calculated checksums
2903 	page is not corrupted. */
2904 
2905 	switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2906 	case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
2907 	case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2908 		if (zip_size) {
2909 			return checksum == page_zip_calc_checksum(
2910 				page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
2911 		}
2912 
2913 		return checksum == buf_calc_page_crc32(page);
2914 	case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2915 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2916 		due to MDEV-12114, fil_crypt_calculate_checksum()
2917 		is only using CRC32 for the encrypted pages.
2918 		Due to this, we must treat "strict_none" as "none". */
2919 	case SRV_CHECKSUM_ALGORITHM_NONE:
2920 		return true;
2921 	case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2922 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2923 		due to MDEV-12114, fil_crypt_calculate_checksum()
2924 		is only using CRC32 for the encrypted pages.
2925 		Due to this, we must treat "strict_innodb" as "innodb". */
2926 	case SRV_CHECKSUM_ALGORITHM_INNODB:
2927 	case SRV_CHECKSUM_ALGORITHM_CRC32:
2928 	case SRV_CHECKSUM_ALGORITHM_FULL_CRC32:
2929 		if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2930 			return true;
2931 		}
2932 		if (zip_size) {
2933 			return checksum == page_zip_calc_checksum(
2934 				page, zip_size,
2935 				SRV_CHECKSUM_ALGORITHM_CRC32)
2936 				|| checksum == page_zip_calc_checksum(
2937 					page, zip_size,
2938 					SRV_CHECKSUM_ALGORITHM_INNODB);
2939 		}
2940 
2941 		return checksum == buf_calc_page_crc32(page)
2942 			|| checksum == buf_calc_page_new_checksum(page);
2943 	}
2944 
2945 	ut_ad(!"unhandled innodb_checksum_algorithm");
2946 	return false;
2947 }
2948