1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21 
22 Created            Jonas Oreland Google
23 Modified           Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25 
26 #include "fil0fil.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0size.h"
30 #include "page0zip.h"
31 #ifndef UNIV_INNOCHECKSUM
32 #include "fil0crypt.h"
33 #include "srv0srv.h"
34 #include "srv0start.h"
35 #include "log0recv.h"
36 #include "mtr0mtr.h"
37 #include "mtr0log.h"
38 #include "ut0ut.h"
39 #include "btr0scrub.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43 
44 static bool fil_crypt_threads_inited = false;
45 
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48 
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51 
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54 
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57 
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60 
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63 
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66 
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69 
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72 
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75 
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100;	 // 10ms per iop
78 static uint srv_alloc_time = 3;		    // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80 
81 /** Variables for scrubbing */
82 extern uint srv_background_scrub_data_interval;
83 extern uint srv_background_scrub_data_check_interval;
84 
85 #define DEBUG_KEYROTATION_THROTTLING 0
86 
87 /** Statistics variables */
88 static fil_crypt_stat_t crypt_stat;
89 static ib_mutex_t crypt_stat_mutex;
90 
91 /** Is background scrubbing enabled, defined on btr0scrub.cc */
92 extern my_bool srv_background_scrub_data_uncompressed;
93 extern my_bool srv_background_scrub_data_compressed;
94 
95 /***********************************************************************
96 Check if a key needs rotation given a key_state
97 @param[in]	crypt_data		Encryption information
98 @param[in]	key_version		Current key version
99 @param[in]	latest_key_version	Latest key version
100 @param[in]	rotate_key_age		when to rotate
101 @return true if key needs rotation, false if not */
102 static bool
103 fil_crypt_needs_rotation(
104 	const fil_space_crypt_t*	crypt_data,
105 	uint				key_version,
106 	uint				latest_key_version,
107 	uint				rotate_key_age)
108 	MY_ATTRIBUTE((warn_unused_result));
109 
110 /*********************************************************************
111 Init space crypt */
112 UNIV_INTERN
113 void
fil_space_crypt_init()114 fil_space_crypt_init()
115 {
116 	fil_crypt_throttle_sleep_event = os_event_create(0);
117 
118 	mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
119 	memset(&crypt_stat, 0, sizeof(crypt_stat));
120 }
121 
122 /*********************************************************************
123 Cleanup space crypt */
124 UNIV_INTERN
125 void
fil_space_crypt_cleanup()126 fil_space_crypt_cleanup()
127 {
128 	os_event_destroy(fil_crypt_throttle_sleep_event);
129 	mutex_free(&crypt_stat_mutex);
130 }
131 
132 /**
133 Get latest key version from encryption plugin.
134 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
135 uint
key_get_latest_version(void)136 fil_space_crypt_t::key_get_latest_version(void)
137 {
138 	uint key_version = key_found;
139 
140 	if (is_key_found()) {
141 		key_version = encryption_key_get_latest_version(key_id);
142 		/* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
143 		It doesn't matter because srv_encrypt_rotate
144 		can be set to true only once */
145 		if (!srv_encrypt_rotate
146 		    && key_version > srv_fil_crypt_rotate_key_age) {
147 			srv_encrypt_rotate = true;
148 		}
149 
150 		srv_stats.n_key_requests.inc();
151 		key_found = key_version;
152 	}
153 
154 	return key_version;
155 }
156 
157 /******************************************************************
158 Get the latest(key-version), waking the encrypt thread, if needed
159 @param[in,out]	crypt_data	Crypt data */
160 static inline
161 uint
fil_crypt_get_latest_key_version(fil_space_crypt_t * crypt_data)162 fil_crypt_get_latest_key_version(
163 	fil_space_crypt_t* crypt_data)
164 {
165 	ut_ad(crypt_data != NULL);
166 
167 	uint key_version = crypt_data->key_get_latest_version();
168 
169 	if (crypt_data->is_key_found()) {
170 
171 		if (fil_crypt_needs_rotation(
172 				crypt_data,
173 				crypt_data->min_key_version,
174 				key_version,
175 				srv_fil_crypt_rotate_key_age)) {
176 			/* Below event seen as NULL-pointer at startup
177 			when new database was created and we create a
178 			checkpoint. Only seen when debugging. */
179 			if (fil_crypt_threads_inited) {
180 				os_event_set(fil_crypt_threads_event);
181 			}
182 		}
183 	}
184 
185 	return key_version;
186 }
187 
188 /******************************************************************
189 Mutex helper for crypt_data->scheme */
190 void
crypt_data_scheme_locker(st_encryption_scheme * scheme,int exit)191 crypt_data_scheme_locker(
192 /*=====================*/
193 	st_encryption_scheme*	scheme,
194 	int			exit)
195 {
196 	fil_space_crypt_t* crypt_data =
197 		static_cast<fil_space_crypt_t*>(scheme);
198 
199 	if (exit) {
200 		mutex_exit(&crypt_data->mutex);
201 	} else {
202 		mutex_enter(&crypt_data->mutex);
203 	}
204 }
205 
206 /******************************************************************
207 Create a fil_space_crypt_t object
208 @param[in]	type		CRYPT_SCHEME_UNENCRYPTE or
209 				CRYPT_SCHEME_1
210 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
211 				FIL_ENCRYPTION_ON or
212 				FIL_ENCRYPTION_OFF
213 @param[in]	min_key_version key_version or 0
214 @param[in]	key_id		Used key id
215 @return crypt object */
216 static
217 fil_space_crypt_t*
fil_space_create_crypt_data(uint type,fil_encryption_t encrypt_mode,uint min_key_version,uint key_id)218 fil_space_create_crypt_data(
219 	uint			type,
220 	fil_encryption_t	encrypt_mode,
221 	uint			min_key_version,
222 	uint			key_id)
223 {
224 	fil_space_crypt_t* crypt_data = NULL;
225 	if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
226 		crypt_data = new(buf)
227 			fil_space_crypt_t(
228 				type,
229 				min_key_version,
230 				key_id,
231 				encrypt_mode);
232 	}
233 
234 	return crypt_data;
235 }
236 
237 /******************************************************************
238 Create a fil_space_crypt_t object
239 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
240 				FIL_ENCRYPTION_ON or
241 				FIL_ENCRYPTION_OFF
242 
243 @param[in]	key_id		Encryption key id
244 @return crypt object */
245 UNIV_INTERN
246 fil_space_crypt_t*
fil_space_create_crypt_data(fil_encryption_t encrypt_mode,uint key_id)247 fil_space_create_crypt_data(
248 	fil_encryption_t	encrypt_mode,
249 	uint			key_id)
250 {
251 	return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
252 }
253 
254 /******************************************************************
255 Merge fil_space_crypt_t object
256 @param[in,out]	dst		Destination cryp data
257 @param[in]	src		Source crypt data */
258 UNIV_INTERN
259 void
fil_space_merge_crypt_data(fil_space_crypt_t * dst,const fil_space_crypt_t * src)260 fil_space_merge_crypt_data(
261 	fil_space_crypt_t* dst,
262 	const fil_space_crypt_t* src)
263 {
264 	mutex_enter(&dst->mutex);
265 
266 	/* validate that they are mergeable */
267 	ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
268 	     src->type == CRYPT_SCHEME_1);
269 
270 	ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
271 	     dst->type == CRYPT_SCHEME_1);
272 
273 	dst->encryption = src->encryption;
274 	dst->type = src->type;
275 	dst->min_key_version = src->min_key_version;
276 	dst->keyserver_requests += src->keyserver_requests;
277 
278 	mutex_exit(&dst->mutex);
279 }
280 
281 /** Initialize encryption parameters from a tablespace header page.
282 @param[in]	page_size	page size of the tablespace
283 @param[in]	page		first page of the tablespace
284 @return crypt data from page 0
285 @retval	NULL	if not present or not valid */
286 UNIV_INTERN
287 fil_space_crypt_t*
fil_space_read_crypt_data(const page_size_t & page_size,const byte * page)288 fil_space_read_crypt_data(const page_size_t& page_size, const byte* page)
289 {
290 	const ulint offset = FSP_HEADER_OFFSET
291 		+ fsp_header_get_encryption_offset(page_size);
292 
293 	if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
294 		/* Crypt data is not stored. */
295 		return NULL;
296 	}
297 
298 	uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
299 	uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
300 	fil_space_crypt_t* crypt_data;
301 
302 	if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
303 	      type == CRYPT_SCHEME_1)
304 	    || iv_length != sizeof crypt_data->iv) {
305 		ib::error() << "Found non sensible crypt scheme: "
306 			    << type << "," << iv_length
307 			    << " for space: "
308 			    << page_get_space_id(page);
309 		return NULL;
310 	}
311 
312 	uint min_key_version = mach_read_from_4
313 		(page + offset + MAGIC_SZ + 2 + iv_length);
314 
315 	uint key_id = mach_read_from_4
316 		(page + offset + MAGIC_SZ + 2 + iv_length + 4);
317 
318 	fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
319 		page + offset + MAGIC_SZ + 2 + iv_length + 8);
320 
321 	crypt_data = fil_space_create_crypt_data(encryption, key_id);
322 	/* We need to overwrite these as above function will initialize
323 	members */
324 	crypt_data->type = type;
325 	crypt_data->min_key_version = min_key_version;
326 	crypt_data->page0_offset = offset;
327 	memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
328 
329 	return crypt_data;
330 }
331 
332 /******************************************************************
333 Free a crypt data object
334 @param[in,out] crypt_data	crypt data to be freed */
335 UNIV_INTERN
336 void
fil_space_destroy_crypt_data(fil_space_crypt_t ** crypt_data)337 fil_space_destroy_crypt_data(
338 	fil_space_crypt_t **crypt_data)
339 {
340 	if (crypt_data != NULL && (*crypt_data) != NULL) {
341 		fil_space_crypt_t* c;
342 		if (UNIV_LIKELY(fil_crypt_threads_inited)) {
343 			mutex_enter(&fil_crypt_threads_mutex);
344 			c = *crypt_data;
345 			*crypt_data = NULL;
346 			mutex_exit(&fil_crypt_threads_mutex);
347 		} else {
348 			ut_ad(srv_read_only_mode || !srv_was_started);
349 			c = *crypt_data;
350 			*crypt_data = NULL;
351 		}
352 		if (c) {
353 			c->~fil_space_crypt_t();
354 			ut_free(c);
355 		}
356 	}
357 }
358 
359 /** Fill crypt data information to the give page.
360 It should be called during ibd file creation.
361 @param[in]	flags	tablespace flags
362 @param[in,out]	page	first page of the tablespace */
363 void
fill_page0(ulint flags,byte * page)364 fil_space_crypt_t::fill_page0(
365 	ulint	flags,
366 	byte*	page)
367 {
368 	const uint len = sizeof(iv);
369 	const ulint offset = FSP_HEADER_OFFSET
370 		+ fsp_header_get_encryption_offset(page_size_t(flags));
371 	page0_offset = offset;
372 
373 	memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
374 	mach_write_to_1(page + offset + MAGIC_SZ, type);
375 	mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
376 	memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
377 
378 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
379 			min_key_version);
380 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
381 			key_id);
382 	mach_write_to_1(page + offset + MAGIC_SZ + 2  + len + 8,
383 			encryption);
384 }
385 
386 /******************************************************************
387 Write crypt data to a page (0)
388 @param[in]	space	tablespace
389 @param[in,out]	page0	first page of the tablespace
390 @param[in,out]	mtr	mini-transaction */
391 UNIV_INTERN
392 void
write_page0(const fil_space_t * space,byte * page,mtr_t * mtr)393 fil_space_crypt_t::write_page0(
394 	const fil_space_t*	space,
395 	byte* 			page,
396 	mtr_t*			mtr)
397 {
398 	ut_ad(this == space->crypt_data);
399 	const uint len = sizeof(iv);
400 	const ulint offset = FSP_HEADER_OFFSET
401 		+ fsp_header_get_encryption_offset(page_size_t(space->flags));
402 	page0_offset = offset;
403 
404 	/*
405 	redo log this as bytewise updates to page 0
406 	followed by an MLOG_FILE_WRITE_CRYPT_DATA
407 	(that will during recovery update fil_space_t)
408 	*/
409 	mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
410 	mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
411 	mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
412 	mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
413 			  mtr);
414 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
415 			 MLOG_4BYTES, mtr);
416 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 4, key_id,
417 			 MLOG_4BYTES, mtr);
418 	mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 8, encryption,
419 		MLOG_1BYTE, mtr);
420 
421 	DBUG_EXECUTE_IF("ib_do_not_log_crypt_data", return;);
422 
423 	byte* log_ptr = mlog_open(mtr, 11 + 17 + len);
424 
425 	if (log_ptr != NULL) {
426 		log_ptr = mlog_write_initial_log_record_fast(
427 			page,
428 			MLOG_FILE_WRITE_CRYPT_DATA,
429 			log_ptr, mtr);
430 		mach_write_to_4(log_ptr, space->id);
431 		log_ptr += 4;
432 		mach_write_to_2(log_ptr, offset);
433 		log_ptr += 2;
434 		mach_write_to_1(log_ptr, type);
435 		log_ptr += 1;
436 		mach_write_to_1(log_ptr, len);
437 		log_ptr += 1;
438 		mach_write_to_4(log_ptr, min_key_version);
439 		log_ptr += 4;
440 		mach_write_to_4(log_ptr, key_id);
441 		log_ptr += 4;
442 		mach_write_to_1(log_ptr, encryption);
443 		log_ptr += 1;
444 		mlog_close(mtr, log_ptr);
445 
446 		mlog_catenate_string(mtr, iv, len);
447 	}
448 }
449 
450 /******************************************************************
451 Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
452 @param[in]	ptr		Log entry start
453 @param[in]	end_ptr		Log entry end
454 @param[in]	block		buffer block
455 @return position on log buffer */
456 UNIV_INTERN
457 byte*
fil_parse_write_crypt_data(byte * ptr,const byte * end_ptr,dberr_t * err)458 fil_parse_write_crypt_data(
459 	byte*			ptr,
460 	const byte*		end_ptr,
461 	dberr_t*		err)
462 {
463 	/* check that redo log entry is complete */
464 	uint entry_size =
465 		4 + // size of space_id
466 		2 + // size of offset
467 		1 + // size of type
468 		1 + // size of iv-len
469 		4 +  // size of min_key_version
470 		4 +  // size of key_id
471 		1; // fil_encryption_t
472 
473 	*err = DB_SUCCESS;
474 
475 	if (ptr + entry_size > end_ptr) {
476 		return NULL;
477 	}
478 
479 	ulint space_id = mach_read_from_4(ptr);
480 	ptr += 4;
481 	uint offset = mach_read_from_2(ptr);
482 	ptr += 2;
483 	uint type = mach_read_from_1(ptr);
484 	ptr += 1;
485 	uint len = mach_read_from_1(ptr);
486 	ptr += 1;
487 
488 	if ((type != CRYPT_SCHEME_1 && type != CRYPT_SCHEME_UNENCRYPTED)
489 	    || len != CRYPT_SCHEME_1_IV_LEN) {
490 		*err = DB_CORRUPTION;
491 		return NULL;
492 	}
493 
494 	uint min_key_version = mach_read_from_4(ptr);
495 	ptr += 4;
496 
497 	uint key_id = mach_read_from_4(ptr);
498 	ptr += 4;
499 
500 	fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
501 	ptr +=1;
502 
503 	if (ptr + len > end_ptr) {
504 		return NULL;
505 	}
506 
507 	mutex_enter(&fil_system.mutex);
508 
509 	fil_space_t* space = fil_space_get_by_id(space_id);
510 
511 	if (!space) {
512 		mutex_exit(&fil_system.mutex);
513 		return ptr + len;
514 	}
515 
516 	fil_space_crypt_t* crypt_data = fil_space_create_crypt_data(
517 		encryption, key_id);
518 
519 	crypt_data->page0_offset = offset;
520 	crypt_data->min_key_version = min_key_version;
521 	crypt_data->type = type;
522 	memcpy(crypt_data->iv, ptr, len);
523 	ptr += len;
524 
525 	if (space->crypt_data) {
526 		fil_space_merge_crypt_data(space->crypt_data, crypt_data);
527 		fil_space_destroy_crypt_data(&crypt_data);
528 		crypt_data = space->crypt_data;
529 	} else {
530 		space->crypt_data = crypt_data;
531 	}
532 
533 	mutex_exit(&fil_system.mutex);
534 
535 	if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
536 		*err = DB_DECRYPTION_FAILED;
537 	}
538 
539 	return ptr;
540 }
541 
542 /** Encrypt a buffer.
543 @param[in,out]		crypt_data	Crypt data
544 @param[in]		space		space_id
545 @param[in]		offset		Page offset
546 @param[in]		lsn		Log sequence number
547 @param[in]		src_frame	Page to encrypt
548 @param[in]		page_size	Page size
549 @param[in,out]		dst_frame	Output buffer
550 @return encrypted buffer or NULL */
551 UNIV_INTERN
552 byte*
fil_encrypt_buf(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,const page_size_t & page_size,byte * dst_frame)553 fil_encrypt_buf(
554 	fil_space_crypt_t*	crypt_data,
555 	ulint			space,
556 	ulint			offset,
557 	lsn_t			lsn,
558 	const byte*		src_frame,
559 	const page_size_t&	page_size,
560 	byte*			dst_frame)
561 {
562 	uint size = uint(page_size.physical());
563 	uint key_version = fil_crypt_get_latest_key_version(crypt_data);
564 
565 	ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
566 
567 	ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
568 	ibool page_compressed = (orig_page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
569 	uint header_len = FIL_PAGE_DATA;
570 
571 	if (page_compressed) {
572 		header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
573 	}
574 
575 	/* FIL page header is not encrypted */
576 	memcpy(dst_frame, src_frame, header_len);
577 
578 	/* Store key version */
579 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, key_version);
580 
581 	/* Calculate the start offset in a page */
582 	uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
583 	uint srclen = size - unencrypted_bytes;
584 	const byte* src = src_frame + header_len;
585 	byte* dst = dst_frame + header_len;
586 	uint32 dstlen = 0;
587 
588 	if (page_compressed) {
589 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
590 	}
591 
592 	int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
593 					   crypt_data, key_version,
594 					   (uint32)space, (uint32)offset, lsn);
595 	ut_a(rc == MY_AES_OK);
596 	ut_a(dstlen == srclen);
597 
598 	/* For compressed tables we do not store the FIL header because
599 	the whole page is not stored to the disk. In compressed tables only
600 	the FIL header + compressed (and now encrypted) payload alligned
601 	to sector boundary is written. */
602 	if (!page_compressed) {
603 		/* FIL page trailer is also not encrypted */
604 		memcpy(dst_frame + page_size.physical() - FIL_PAGE_DATA_END,
605 			src_frame + page_size.physical() - FIL_PAGE_DATA_END,
606 			FIL_PAGE_DATA_END);
607 	} else {
608 		/* Clean up rest of buffer */
609 		memset(dst_frame+header_len+srclen, 0,
610 		       page_size.physical() - (header_len + srclen));
611 	}
612 
613 	/* handle post encryption checksum */
614 	ib_uint32_t checksum = 0;
615 
616 	checksum = fil_crypt_calculate_checksum(page_size, dst_frame);
617 
618 	// store the post-encryption checksum after the key-version
619 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
620 
621 	ut_ad(fil_space_verify_crypt_checksum(dst_frame, page_size));
622 
623 	srv_stats.pages_encrypted.inc();
624 
625 	return dst_frame;
626 }
627 
628 /******************************************************************
629 Encrypt a page
630 
631 @param[in]		space		Tablespace
632 @param[in]		offset		Page offset
633 @param[in]		lsn		Log sequence number
634 @param[in]		src_frame	Page to encrypt
635 @param[in,out]		dst_frame	Output buffer
636 @return encrypted buffer or NULL */
637 UNIV_INTERN
638 byte*
fil_space_encrypt(const fil_space_t * space,ulint offset,lsn_t lsn,byte * src_frame,byte * dst_frame)639 fil_space_encrypt(
640 	const fil_space_t*	space,
641 	ulint			offset,
642 	lsn_t			lsn,
643 	byte*			src_frame,
644 	byte*			dst_frame)
645 {
646 	switch (mach_read_from_2(src_frame+FIL_PAGE_TYPE)) {
647 	case FIL_PAGE_TYPE_FSP_HDR:
648 	case FIL_PAGE_TYPE_XDES:
649 	case FIL_PAGE_RTREE:
650 		/* File space header, extent descriptor or spatial index
651 		are not encrypted. */
652 		return src_frame;
653 	}
654 
655 	if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
656 		return (src_frame);
657 	}
658 
659 	ut_ad(space->pending_io());
660 	return fil_encrypt_buf(space->crypt_data, space->id, offset, lsn,
661 			       src_frame, page_size_t(space->flags),
662 			       dst_frame);
663 }
664 
665 /** Decrypt a page.
666 @param[in]	crypt_data		crypt_data
667 @param[in]	tmp_frame		Temporary buffer
668 @param[in]	page_size		Page size
669 @param[in,out]	src_frame		Page to decrypt
670 @return DB_SUCCESS or error */
671 UNIV_INTERN
672 dberr_t
fil_space_decrypt(fil_space_crypt_t * crypt_data,byte * tmp_frame,const page_size_t & page_size,byte * src_frame)673 fil_space_decrypt(
674 	fil_space_crypt_t*	crypt_data,
675 	byte*			tmp_frame,
676 	const page_size_t&	page_size,
677 	byte*			src_frame)
678 {
679 	ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
680 	uint key_version = mach_read_from_4(src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
681 	bool page_compressed = (page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
682 	uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
683 	uint space = mach_read_from_4(src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
684 	ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
685 
686 	ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
687 	ut_a(crypt_data != NULL && crypt_data->is_encrypted());
688 
689 	/* read space & lsn */
690 	uint header_len = FIL_PAGE_DATA;
691 
692 	if (page_compressed) {
693 		header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
694 	}
695 
696 	/* Copy FIL page header, it is not encrypted */
697 	memcpy(tmp_frame, src_frame, header_len);
698 
699 	/* Calculate the offset where decryption starts */
700 	const byte* src = src_frame + header_len;
701 	byte* dst = tmp_frame + header_len;
702 	uint32 dstlen = 0;
703 	uint srclen = uint(page_size.physical())
704 		- header_len - FIL_PAGE_DATA_END;
705 
706 	if (page_compressed) {
707 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
708 	}
709 
710 	int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
711 					   crypt_data, key_version,
712 					   space, offset, lsn);
713 
714 	if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
715 
716 		if (rc == -1) {
717 			return DB_DECRYPTION_FAILED;
718 		}
719 
720 		ib::fatal() << "Unable to decrypt data-block "
721 			    << " src: " << static_cast<const void*>(src)
722 			    << "srclen: "
723 			    << srclen << " buf: "
724 			    << static_cast<const void*>(dst) << "buflen: "
725 			    << dstlen << " return-code: " << rc
726 			    << " Can't continue!";
727 	}
728 
729 	/* For compressed tables we do not store the FIL header because
730 	the whole page is not stored to the disk. In compressed tables only
731 	the FIL header + compressed (and now encrypted) payload alligned
732 	to sector boundary is written. */
733 	if (!page_compressed) {
734 		/* Copy FIL trailer */
735 		memcpy(tmp_frame + page_size.physical() - FIL_PAGE_DATA_END,
736 		       src_frame + page_size.physical() - FIL_PAGE_DATA_END,
737 		       FIL_PAGE_DATA_END);
738 	}
739 
740 	srv_stats.pages_decrypted.inc();
741 
742 	return DB_SUCCESS; /* page was decrypted */
743 }
744 
745 /**
746 Decrypt a page.
747 @param[in]	space			Tablespace
748 @param[in]	tmp_frame		Temporary buffer used for decrypting
749 @param[in,out]	src_frame		Page to decrypt
750 @return decrypted page, or original not encrypted page if decryption is
751 not needed.*/
752 UNIV_INTERN
753 byte*
fil_space_decrypt(const fil_space_t * space,byte * tmp_frame,byte * src_frame)754 fil_space_decrypt(
755 	const fil_space_t* space,
756 	byte*		tmp_frame,
757 	byte*		src_frame)
758 {
759 	const page_size_t page_size(space->flags);
760 
761 	ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
762 	ut_ad(space->pending_io());
763 
764 	if (DB_SUCCESS != fil_space_decrypt(space->crypt_data, tmp_frame,
765 					    page_size, src_frame)) {
766 		return NULL;
767 	}
768 
769 	/* Copy the decrypted page back to page buffer, not
770 	really any other options. */
771 	memcpy(src_frame, tmp_frame, page_size.physical());
772 
773 	return src_frame;
774 }
775 
776 /******************************************************************
777 Calculate post encryption checksum
778 @param[in]	page_size	page size
779 @param[in]	dst_frame	Block where checksum is calculated
780 @return page checksum
781 not needed. */
782 UNIV_INTERN
783 uint32_t
fil_crypt_calculate_checksum(const page_size_t & page_size,const byte * dst_frame)784 fil_crypt_calculate_checksum(
785 	const page_size_t&	page_size,
786 	const byte*		dst_frame)
787 {
788 	/* For encrypted tables we use only crc32 and strict_crc32 */
789 	return page_size.is_compressed()
790 		? page_zip_calc_checksum(dst_frame, page_size.physical(),
791 					 SRV_CHECKSUM_ALGORITHM_CRC32)
792 		: buf_calc_page_crc32(dst_frame);
793 }
794 
795 /***********************************************************************/
796 
797 /** A copy of global key state */
798 struct key_state_t {
key_state_tkey_state_t799 	key_state_t() : key_id(0), key_version(0),
800 			rotate_key_age(srv_fil_crypt_rotate_key_age) {}
operator ==key_state_t801 	bool operator==(const key_state_t& other) const {
802 		return key_version == other.key_version &&
803 			rotate_key_age == other.rotate_key_age;
804 	}
805 	uint key_id;
806 	uint key_version;
807 	uint rotate_key_age;
808 };
809 
810 /***********************************************************************
811 Copy global key state
812 @param[in,out]	new_state	key state
813 @param[in]	crypt_data	crypt data */
814 static void
fil_crypt_get_key_state(key_state_t * new_state,fil_space_crypt_t * crypt_data)815 fil_crypt_get_key_state(
816 	key_state_t*			new_state,
817 	fil_space_crypt_t*		crypt_data)
818 {
819 	if (srv_encrypt_tables) {
820 		new_state->key_version = crypt_data->key_get_latest_version();
821 		new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
822 
823 		ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
824 	} else {
825 		new_state->key_version = 0;
826 		new_state->rotate_key_age = 0;
827 	}
828 }
829 
830 /***********************************************************************
831 Check if a key needs rotation given a key_state
832 @param[in]	crypt_data		Encryption information
833 @param[in]	key_version		Current key version
834 @param[in]	latest_key_version	Latest key version
835 @param[in]	rotate_key_age		when to rotate
836 @return true if key needs rotation, false if not */
837 static bool
fil_crypt_needs_rotation(const fil_space_crypt_t * crypt_data,uint key_version,uint latest_key_version,uint rotate_key_age)838 fil_crypt_needs_rotation(
839 	const fil_space_crypt_t*	crypt_data,
840 	uint				key_version,
841 	uint				latest_key_version,
842 	uint				rotate_key_age)
843 {
844 	if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
845 		return false;
846 	}
847 
848 	if (key_version == 0 && latest_key_version != 0) {
849 		/* this is rotation unencrypted => encrypted
850 		* ignore rotate_key_age */
851 		return true;
852 	}
853 
854 	if (latest_key_version == 0 && key_version != 0) {
855 		if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
856 			/* this is rotation encrypted => unencrypted */
857 			return true;
858 		}
859 		return false;
860 	}
861 
862 	if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
863 	    && crypt_data->type == CRYPT_SCHEME_1
864 	    && !srv_encrypt_tables) {
865 		/* This is rotation encrypted => unencrypted */
866 		return true;
867 	}
868 
869 	if (rotate_key_age == 0) {
870 		return false;
871 	}
872 
873 	/* this is rotation encrypted => encrypted,
874 	* only reencrypt if key is sufficiently old */
875 	if (key_version + rotate_key_age < latest_key_version) {
876 		return true;
877 	}
878 
879 	return false;
880 }
881 
882 /** Read page 0 and possible crypt data from there.
883 @param[in,out]	space		Tablespace */
884 static inline
885 void
fil_crypt_read_crypt_data(fil_space_t * space)886 fil_crypt_read_crypt_data(fil_space_t* space)
887 {
888 	if (space->crypt_data || space->size
889 	    || !fil_space_get_size(space->id)) {
890 		/* The encryption metadata has already been read, or
891 		the tablespace is not encrypted and the file has been
892 		opened already, or the file cannot be accessed,
893 		likely due to a concurrent DROP
894 		(possibly as part of TRUNCATE or ALTER TABLE).
895 		FIXME: The file can become unaccessible any time
896 		after this check! We should really remove this
897 		function and instead make crypt_data an integral
898 		part of fil_space_t. */
899 		return;
900 	}
901 
902 	const page_size_t page_size(space->flags);
903 	mtr_t	mtr;
904 	mtr.start();
905 	if (buf_block_t* block = buf_page_get(page_id_t(space->id, 0),
906 					      page_size, RW_S_LATCH, &mtr)) {
907 		mutex_enter(&fil_system.mutex);
908 		if (!space->crypt_data) {
909 			space->crypt_data = fil_space_read_crypt_data(
910 				page_size, block->frame);
911 		}
912 		mutex_exit(&fil_system.mutex);
913 	}
914 	mtr.commit();
915 }
916 
917 /** Start encrypting a space
918 @param[in,out]		space		Tablespace
919 @return true if a recheck of tablespace is needed by encryption thread. */
fil_crypt_start_encrypting_space(fil_space_t * space)920 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
921 {
922 	bool recheck = false;
923 
924 	mutex_enter(&fil_crypt_threads_mutex);
925 
926 	fil_space_crypt_t *crypt_data = space->crypt_data;
927 
928 	/* If space is not encrypted and encryption is not enabled, then
929 	do not continue encrypting the space. */
930 	if (!crypt_data && !srv_encrypt_tables) {
931 		mutex_exit(&fil_crypt_threads_mutex);
932 		return false;
933 	}
934 
935 	if (crypt_data != NULL || fil_crypt_start_converting) {
936 		/* someone beat us to it */
937 		if (fil_crypt_start_converting) {
938 			recheck = true;
939 		}
940 
941 		mutex_exit(&fil_crypt_threads_mutex);
942 		return recheck;
943 	}
944 
945 	/* NOTE: we need to write and flush page 0 before publishing
946 	* the crypt data. This so that after restart there is no
947 	* risk of finding encrypted pages without having
948 	* crypt data in page 0 */
949 
950 	/* 1 - create crypt data */
951 	crypt_data = fil_space_create_crypt_data(
952 		FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
953 
954 	if (crypt_data == NULL) {
955 		mutex_exit(&fil_crypt_threads_mutex);
956 		return false;
957 	}
958 
959 	crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
960 	crypt_data->min_key_version = 0; // all pages are unencrypted
961 	crypt_data->rotate_state.start_time = time(0);
962 	crypt_data->rotate_state.starting = true;
963 	crypt_data->rotate_state.active_threads = 1;
964 
965 	mutex_enter(&fil_system.mutex);
966 	space->crypt_data = crypt_data;
967 	mutex_exit(&fil_system.mutex);
968 
969 	fil_crypt_start_converting = true;
970 	mutex_exit(&fil_crypt_threads_mutex);
971 
972 	do
973 	{
974 		mtr_t mtr;
975 		mtr.start();
976 		mtr.set_named_space(space);
977 
978 		/* 2 - get page 0 */
979 		dberr_t err = DB_SUCCESS;
980 		buf_block_t* block = buf_page_get_gen(
981 			page_id_t(space->id, 0), page_size_t(space->flags),
982 			RW_X_LATCH, NULL, BUF_GET,
983 			__FILE__, __LINE__,
984 			&mtr, &err);
985 
986 
987 		/* 3 - write crypt data to page 0 */
988 		byte* frame = buf_block_get_frame(block);
989 		crypt_data->type = CRYPT_SCHEME_1;
990 		crypt_data->write_page0(space, frame, &mtr);
991 
992 		mtr.commit();
993 
994 		/* record lsn of update */
995 		lsn_t end_lsn = mtr.commit_lsn();
996 
997 		/* 4 - sync tablespace before publishing crypt data */
998 
999 		bool success = false;
1000 
1001 		do {
1002 			ulint n_pages = 0;
1003 			success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
1004 			buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
1005 		} while (!success);
1006 
1007 		/* 5 - publish crypt data */
1008 		mutex_enter(&fil_crypt_threads_mutex);
1009 		mutex_enter(&crypt_data->mutex);
1010 		crypt_data->type = CRYPT_SCHEME_1;
1011 		ut_a(crypt_data->rotate_state.active_threads == 1);
1012 		crypt_data->rotate_state.active_threads = 0;
1013 		crypt_data->rotate_state.starting = false;
1014 
1015 		fil_crypt_start_converting = false;
1016 		mutex_exit(&crypt_data->mutex);
1017 		mutex_exit(&fil_crypt_threads_mutex);
1018 
1019 		return recheck;
1020 	} while (0);
1021 
1022 	mutex_enter(&crypt_data->mutex);
1023 	ut_a(crypt_data->rotate_state.active_threads == 1);
1024 	crypt_data->rotate_state.active_threads = 0;
1025 	mutex_exit(&crypt_data->mutex);
1026 
1027 	mutex_enter(&fil_crypt_threads_mutex);
1028 	fil_crypt_start_converting = false;
1029 	mutex_exit(&fil_crypt_threads_mutex);
1030 
1031 	return recheck;
1032 }
1033 
1034 /** State of a rotation thread */
1035 struct rotate_thread_t {
rotate_thread_trotate_thread_t1036 	explicit rotate_thread_t(uint no) {
1037 		memset(this, 0, sizeof(* this));
1038 		thread_no = no;
1039 		first = true;
1040 		estimated_max_iops = 20;
1041 	}
1042 
1043 	uint thread_no;
1044 	bool first;		    /*!< is position before first space */
1045 	fil_space_t* space;	    /*!< current space or NULL */
1046 	ulint offset;		    /*!< current offset */
1047 	ulint batch;		    /*!< #pages to rotate */
1048 	uint  min_key_version_found;/*!< min key version found but not rotated */
1049 	lsn_t end_lsn;		    /*!< max lsn when rotating this space */
1050 
1051 	uint estimated_max_iops;   /*!< estimation of max iops */
1052 	uint allocated_iops;	   /*!< allocated iops */
1053 	ulint cnt_waited;	   /*!< #times waited during this slot */
1054 	uintmax_t sum_waited_us;   /*!< wait time during this slot */
1055 
1056 	fil_crypt_stat_t crypt_stat; // statistics
1057 
1058 	btr_scrub_t scrub_data;      /* thread local data used by btr_scrub-functions
1059 				     * when iterating pages of tablespace */
1060 
1061 	/** @return whether this thread should terminate */
should_shutdownrotate_thread_t1062 	bool should_shutdown() const {
1063 		switch (srv_shutdown_state) {
1064 		case SRV_SHUTDOWN_NONE:
1065 			return thread_no >= srv_n_fil_crypt_threads;
1066 		case SRV_SHUTDOWN_EXIT_THREADS:
1067 			/* srv_init_abort() must have been invoked */
1068 		case SRV_SHUTDOWN_CLEANUP:
1069 		case SRV_SHUTDOWN_INITIATED:
1070 			return true;
1071 		case SRV_SHUTDOWN_FLUSH_PHASE:
1072 		case SRV_SHUTDOWN_LAST_PHASE:
1073 			break;
1074 		}
1075 		ut_ad(0);
1076 		return true;
1077 	}
1078 };
1079 
1080 /** Avoid the removal of the tablespace from
1081 default_encrypt_list only when
1082 1) Another active encryption thread working on tablespace
1083 2) Eligible for tablespace key rotation
1084 3) Tablespace is in flushing phase
1085 @return true if tablespace should be removed from
1086 default encrypt */
fil_crypt_must_remove(const fil_space_t & space)1087 static bool fil_crypt_must_remove(const fil_space_t &space)
1088 {
1089   ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1090   fil_space_crypt_t *crypt_data = space.crypt_data;
1091   ut_ad(mutex_own(&fil_system.mutex));
1092   const ulong encrypt_tables= srv_encrypt_tables;
1093   if (!crypt_data)
1094     return !encrypt_tables;
1095   if (!crypt_data->is_key_found())
1096     return true;
1097 
1098   mutex_enter(&crypt_data->mutex);
1099   const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1100     (!crypt_data->rotate_state.flushing &&
1101      !encrypt_tables == !!crypt_data->min_key_version &&
1102      !crypt_data->rotate_state.active_threads);
1103   mutex_exit(&crypt_data->mutex);
1104   return remove;
1105 }
1106 
1107 /***********************************************************************
1108 Check if space needs rotation given a key_state
1109 @param[in,out]		state		Key rotation state
1110 @param[in,out]		key_state	Key state
1111 @param[in,out]		recheck		needs recheck ?
1112 @return true if space needs key rotation */
1113 static
1114 bool
fil_crypt_space_needs_rotation(rotate_thread_t * state,key_state_t * key_state,bool * recheck)1115 fil_crypt_space_needs_rotation(
1116 	rotate_thread_t*	state,
1117 	key_state_t*		key_state,
1118 	bool*			recheck)
1119 {
1120 	fil_space_t* space = state->space;
1121 
1122 	/* Make sure that tablespace is normal tablespace */
1123 	if (space->purpose != FIL_TYPE_TABLESPACE) {
1124 		return false;
1125 	}
1126 
1127 	ut_ad(space->referenced());
1128 
1129 	fil_space_crypt_t *crypt_data = space->crypt_data;
1130 
1131 	if (crypt_data == NULL) {
1132 		/**
1133 		* space has no crypt data
1134 		*   start encrypting it...
1135 		*/
1136 		*recheck = fil_crypt_start_encrypting_space(space);
1137 		crypt_data = space->crypt_data;
1138 
1139 		if (crypt_data == NULL) {
1140 			return false;
1141 		}
1142 
1143 		crypt_data->key_get_latest_version();
1144 	}
1145 
1146 	/* If used key_id is not found from encryption plugin we can't
1147 	continue to rotate the tablespace */
1148 	if (!crypt_data->is_key_found()) {
1149 		return false;
1150 	}
1151 
1152 	mutex_enter(&crypt_data->mutex);
1153 
1154 	do {
1155 		/* prevent threads from starting to rotate space */
1156 		if (crypt_data->rotate_state.starting) {
1157 			/* recheck this space later */
1158 			*recheck = true;
1159 			break;
1160 		}
1161 
1162 		/* prevent threads from starting to rotate space */
1163 		if (space->is_stopping()) {
1164 			break;
1165 		}
1166 
1167 		if (crypt_data->rotate_state.flushing) {
1168 			break;
1169 		}
1170 
1171 		/* No need to rotate space if encryption is disabled */
1172 		if (crypt_data->not_encrypted()) {
1173 			break;
1174 		}
1175 
1176 		if (crypt_data->key_id != key_state->key_id) {
1177 			key_state->key_id= crypt_data->key_id;
1178 			fil_crypt_get_key_state(key_state, crypt_data);
1179 		}
1180 
1181 		bool need_key_rotation = fil_crypt_needs_rotation(
1182 			crypt_data,
1183 			crypt_data->min_key_version,
1184 			key_state->key_version,
1185 			key_state->rotate_key_age);
1186 
1187 		crypt_data->rotate_state.scrubbing.is_active =
1188 			btr_scrub_start_space(*space, &state->scrub_data);
1189 
1190 		time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
1191 			last_scrub_completed;
1192 
1193 		bool need_scrubbing =
1194 			(srv_background_scrub_data_uncompressed ||
1195 			 srv_background_scrub_data_compressed) &&
1196 			crypt_data->rotate_state.scrubbing.is_active
1197 			&& diff >= 0
1198 			&& ulint(diff) >= srv_background_scrub_data_interval;
1199 
1200 		if (need_key_rotation == false && need_scrubbing == false) {
1201 			break;
1202 		}
1203 
1204 		mutex_exit(&crypt_data->mutex);
1205 
1206 		return true;
1207 	} while (0);
1208 
1209 	mutex_exit(&crypt_data->mutex);
1210 
1211 
1212 	return false;
1213 }
1214 
1215 /***********************************************************************
1216 Update global statistics with thread statistics
1217 @param[in,out]	state		key rotation statistics */
1218 static void
fil_crypt_update_total_stat(rotate_thread_t * state)1219 fil_crypt_update_total_stat(
1220 	rotate_thread_t *state)
1221 {
1222 	mutex_enter(&crypt_stat_mutex);
1223 	crypt_stat.pages_read_from_cache +=
1224 		state->crypt_stat.pages_read_from_cache;
1225 	crypt_stat.pages_read_from_disk +=
1226 		state->crypt_stat.pages_read_from_disk;
1227 	crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1228 	crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1229 	// remote old estimate
1230 	crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1231 	// add new estimate
1232 	crypt_stat.estimated_iops += state->estimated_max_iops;
1233 	mutex_exit(&crypt_stat_mutex);
1234 
1235 	// make new estimate "current" estimate
1236 	memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1237 	// record our old (current) estimate
1238 	state->crypt_stat.estimated_iops = state->estimated_max_iops;
1239 }
1240 
1241 /***********************************************************************
1242 Allocate iops to thread from global setting,
1243 used before starting to rotate a space.
1244 @param[in,out]		state		Rotation state
1245 @return true if allocation succeeded, false if failed */
1246 static
1247 bool
fil_crypt_alloc_iops(rotate_thread_t * state)1248 fil_crypt_alloc_iops(
1249 	rotate_thread_t *state)
1250 {
1251 	ut_ad(state->allocated_iops == 0);
1252 
1253 	/* We have not yet selected the space to rotate, thus
1254 	state might not contain space and we can't check
1255 	its status yet. */
1256 
1257 	uint max_iops = state->estimated_max_iops;
1258 	mutex_enter(&fil_crypt_threads_mutex);
1259 
1260 	if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1261 		/* this can happen when user decreases srv_fil_crypt_iops */
1262 		mutex_exit(&fil_crypt_threads_mutex);
1263 		return false;
1264 	}
1265 
1266 	uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1267 
1268 	if (alloc > max_iops) {
1269 		alloc = max_iops;
1270 	}
1271 
1272 	n_fil_crypt_iops_allocated += alloc;
1273 	mutex_exit(&fil_crypt_threads_mutex);
1274 
1275 	state->allocated_iops = alloc;
1276 
1277 	return alloc > 0;
1278 }
1279 
1280 /***********************************************************************
1281 Reallocate iops to thread,
1282 used when inside a space
1283 @param[in,out]		state		Rotation state */
1284 static
1285 void
fil_crypt_realloc_iops(rotate_thread_t * state)1286 fil_crypt_realloc_iops(
1287 	rotate_thread_t *state)
1288 {
1289 	ut_a(state->allocated_iops > 0);
1290 
1291 	if (10 * state->cnt_waited > state->batch) {
1292 		/* if we waited more than 10% re-estimate max_iops */
1293 		ulint avg_wait_time_us =
1294 			ulint(state->sum_waited_us / state->cnt_waited);
1295 
1296 		if (avg_wait_time_us == 0) {
1297 			avg_wait_time_us = 1; // prevent division by zero
1298 		}
1299 
1300 		DBUG_PRINT("ib_crypt",
1301 			("thr_no: %u - update estimated_max_iops from %u to "
1302 			 ULINTPF ".",
1303 			state->thread_no,
1304 			state->estimated_max_iops,
1305 			1000000 / avg_wait_time_us));
1306 
1307 		state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1308 		state->cnt_waited = 0;
1309 		state->sum_waited_us = 0;
1310 	} else {
1311 		DBUG_PRINT("ib_crypt",
1312 			   ("thr_no: %u only waited " ULINTPF
1313 			    "%% skip re-estimate.",
1314 			    state->thread_no,
1315 			    (100 * state->cnt_waited)
1316 			    / (state->batch ? state->batch : 1)));
1317 	}
1318 
1319 	if (state->estimated_max_iops <= state->allocated_iops) {
1320 		/* return extra iops */
1321 		uint extra = state->allocated_iops - state->estimated_max_iops;
1322 
1323 		if (extra > 0) {
1324 			mutex_enter(&fil_crypt_threads_mutex);
1325 			if (n_fil_crypt_iops_allocated < extra) {
1326 				/* unknown bug!
1327 				* crash in debug
1328 				* keep n_fil_crypt_iops_allocated unchanged
1329 				* in release */
1330 				ut_ad(0);
1331 				extra = 0;
1332 			}
1333 			n_fil_crypt_iops_allocated -= extra;
1334 			state->allocated_iops -= extra;
1335 
1336 			if (state->allocated_iops == 0) {
1337 				/* no matter how slow io system seems to be
1338 				* never decrease allocated_iops to 0... */
1339 				state->allocated_iops ++;
1340 				n_fil_crypt_iops_allocated ++;
1341 			}
1342 
1343 			os_event_set(fil_crypt_threads_event);
1344 			mutex_exit(&fil_crypt_threads_mutex);
1345 		}
1346 	} else {
1347 		/* see if there are more to get */
1348 		mutex_enter(&fil_crypt_threads_mutex);
1349 		if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1350 			/* there are extra iops free */
1351 			uint extra = srv_n_fil_crypt_iops -
1352 				n_fil_crypt_iops_allocated;
1353 			if (state->allocated_iops + extra >
1354 			    state->estimated_max_iops) {
1355 				/* but don't alloc more than our max */
1356 				extra = state->estimated_max_iops -
1357 					state->allocated_iops;
1358 			}
1359 			n_fil_crypt_iops_allocated += extra;
1360 			state->allocated_iops += extra;
1361 
1362 			DBUG_PRINT("ib_crypt",
1363 				("thr_no: %u increased iops from %u to %u.",
1364 				state->thread_no,
1365 				state->allocated_iops - extra,
1366 				state->allocated_iops));
1367 
1368 		}
1369 		mutex_exit(&fil_crypt_threads_mutex);
1370 	}
1371 
1372 	fil_crypt_update_total_stat(state);
1373 }
1374 
1375 /***********************************************************************
1376 Return allocated iops to global
1377 @param[in,out]		state		Rotation state */
1378 static
1379 void
fil_crypt_return_iops(rotate_thread_t * state)1380 fil_crypt_return_iops(
1381 	rotate_thread_t *state)
1382 {
1383 	if (state->allocated_iops > 0) {
1384 		uint iops = state->allocated_iops;
1385 		mutex_enter(&fil_crypt_threads_mutex);
1386 		if (n_fil_crypt_iops_allocated < iops) {
1387 			/* unknown bug!
1388 			* crash in debug
1389 			* keep n_fil_crypt_iops_allocated unchanged
1390 			* in release */
1391 			ut_ad(0);
1392 			iops = 0;
1393 		}
1394 
1395 		n_fil_crypt_iops_allocated -= iops;
1396 		state->allocated_iops = 0;
1397 		os_event_set(fil_crypt_threads_event);
1398 		mutex_exit(&fil_crypt_threads_mutex);
1399 	}
1400 
1401 	fil_crypt_update_total_stat(state);
1402 }
1403 
fil_crypt_must_default_encrypt()1404 bool fil_crypt_must_default_encrypt()
1405 {
1406   return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1407 }
1408 
1409 /** Return the next tablespace from default_encrypt_tables.
1410 @param space   previous tablespace (NULL to start from the start)
1411 @param recheck whether the removal condition needs to be rechecked after
1412 the encryption parameters were changed
1413 @param encrypt expected state of innodb_encrypt_tables
1414 @return the next tablespace to process (n_pending_ops incremented)
1415 @retval NULL if this was the last */
default_encrypt_next(fil_space_t * space,bool recheck,bool encrypt)1416 inline fil_space_t *fil_system_t::default_encrypt_next(
1417   fil_space_t *space, bool recheck, bool encrypt)
1418 {
1419   ut_ad(mutex_own(&mutex));
1420 
1421   sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1422     space && space->is_in_default_encrypt
1423     ? space
1424     : default_encrypt_tables.begin();
1425   const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1426     default_encrypt_tables.end();
1427 
1428   if (space)
1429   {
1430     const bool released= !space->release();
1431 
1432     if (space->is_in_default_encrypt)
1433     {
1434       while (++it != end &&
1435              (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1436 
1437       /* If one of the encryption threads already started
1438       the encryption of the table then don't remove the
1439       unencrypted spaces from default encrypt list.
1440 
1441       If there is a change in innodb_encrypt_tables variables
1442       value then don't remove the last processed tablespace
1443       from the default encrypt list. */
1444       if (released && !recheck && fil_crypt_must_remove(*space))
1445       {
1446         ut_a(!default_encrypt_tables.empty());
1447         default_encrypt_tables.remove(*space);
1448         space->is_in_default_encrypt= false;
1449       }
1450     }
1451   }
1452   else while (it != end &&
1453 	      (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1454   {
1455     /* Find the next suitable default encrypt table if
1456     beginning of default_encrypt_tables list has been scheduled
1457     to be deleted */
1458     it++;
1459   }
1460 
1461   while (it != end)
1462   {
1463     space= &*it;
1464     if (space->acquire())
1465       return space;
1466     while (++it != end && (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1467   }
1468 
1469   return NULL;
1470 }
1471 
1472 /** Return the next tablespace.
1473 @param space    previous tablespace (NULL to start from the beginning)
1474 @param recheck  whether the removal condition needs to be rechecked after
1475 the encryption parameters were changed
1476 @param encrypt  expected state of innodb_encrypt_tables
1477 @return pointer to the next tablespace (with n_pending_ops incremented)
1478 @retval NULL if this was the last */
fil_space_next(fil_space_t * space,bool recheck,bool encrypt)1479 static fil_space_t *fil_space_next(fil_space_t *space, bool recheck,
1480                                    bool encrypt)
1481 {
1482   mutex_enter(&fil_system.mutex);
1483 
1484   if (fil_crypt_must_default_encrypt())
1485     space= fil_system.default_encrypt_next(space, recheck, encrypt);
1486   else if (!space)
1487   {
1488     space= UT_LIST_GET_FIRST(fil_system.space_list);
1489     /* We can trust that space is not NULL because at least the
1490     system tablespace is always present and loaded first. */
1491     if (!space->acquire())
1492       goto next;
1493   }
1494   else
1495   {
1496     /* Move on to the next fil_space_t */
1497     space->release();
1498 next:
1499     space= UT_LIST_GET_NEXT(space_list, space);
1500 
1501     /* Skip abnormal tablespaces or those that are being created by
1502     fil_ibd_create(), or being dropped. */
1503     while (space &&
1504            (UT_LIST_GET_LEN(space->chain) == 0 ||
1505             space->is_stopping() || space->purpose != FIL_TYPE_TABLESPACE))
1506       space= UT_LIST_GET_NEXT(space_list, space);
1507 
1508     if (space && !space->acquire())
1509       goto next;
1510   }
1511 
1512   mutex_exit(&fil_system.mutex);
1513   return space;
1514 }
1515 
1516 /** Search for a space needing rotation
1517 @param[in,out]	key_state	Key state
1518 @param[in,out]	state		Rotation state
1519 @param[in,out]	recheck		recheck of the tablespace is needed or
1520 				still encryption thread does write page 0 */
fil_crypt_find_space_to_rotate(key_state_t * key_state,rotate_thread_t * state,bool * recheck)1521 static bool fil_crypt_find_space_to_rotate(
1522 	key_state_t*		key_state,
1523 	rotate_thread_t*	state,
1524 	bool*			recheck)
1525 {
1526 	/* we need iops to start rotating */
1527 	while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1528 		if (state->space && state->space->is_stopping()) {
1529 			state->space->release();
1530 			state->space = NULL;
1531 		}
1532 
1533 		os_event_reset(fil_crypt_threads_event);
1534 		os_event_wait_time(fil_crypt_threads_event, 100000);
1535 	}
1536 
1537 	if (state->should_shutdown()) {
1538 		if (state->space) {
1539 			state->space->release();
1540 			state->space = NULL;
1541 		}
1542 		return false;
1543 	}
1544 
1545 	if (state->first) {
1546 		state->first = false;
1547 		if (state->space) {
1548 			state->space->release();
1549 		}
1550 		state->space = NULL;
1551 	}
1552 
1553 	state->space = fil_space_next(state->space, *recheck,
1554 				      key_state->key_version != 0);
1555 
1556 	while (!state->should_shutdown() && state->space) {
1557 		/* If there is no crypt data and we have not yet read
1558 		page 0 for this tablespace, we need to read it before
1559 		we can continue. */
1560 		if (!state->space->crypt_data) {
1561 			fil_crypt_read_crypt_data(state->space);
1562 		}
1563 
1564 		if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1565 			ut_ad(key_state->key_id);
1566 			/* init state->min_key_version_found before
1567 			* starting on a space */
1568 			state->min_key_version_found = key_state->key_version;
1569 			return true;
1570 		}
1571 
1572 		state->space = fil_space_next(state->space, *recheck,
1573 					      key_state->key_version != 0);
1574 	}
1575 
1576 	if (state->space) {
1577 		state->space->release();
1578 		state->space = NULL;
1579 	}
1580 
1581 	/* no work to do; release our allocation of I/O capacity */
1582 	fil_crypt_return_iops(state);
1583 
1584 	return false;
1585 
1586 }
1587 
1588 /***********************************************************************
1589 Start rotating a space
1590 @param[in]	key_state		Key state
1591 @param[in,out]	state			Rotation state */
1592 static
1593 void
fil_crypt_start_rotate_space(const key_state_t * key_state,rotate_thread_t * state)1594 fil_crypt_start_rotate_space(
1595 	const key_state_t*	key_state,
1596 	rotate_thread_t*	state)
1597 {
1598 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
1599 
1600 	ut_ad(crypt_data);
1601 	mutex_enter(&crypt_data->mutex);
1602 	ut_ad(key_state->key_id == crypt_data->key_id);
1603 
1604 	if (crypt_data->rotate_state.active_threads == 0) {
1605 		/* only first thread needs to init */
1606 		crypt_data->rotate_state.next_offset = 1; // skip page 0
1607 		/* no need to rotate beyond current max
1608 		* if space extends, it will be encrypted with newer version */
1609 		/* FIXME: max_offset could be removed and instead
1610 		space->size consulted.*/
1611 		crypt_data->rotate_state.max_offset = state->space->size;
1612 		crypt_data->rotate_state.end_lsn = 0;
1613 		crypt_data->rotate_state.min_key_version_found =
1614 			key_state->key_version;
1615 
1616 		crypt_data->rotate_state.start_time = time(0);
1617 
1618 		if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1619 			crypt_data->is_encrypted() &&
1620 			key_state->key_version != 0) {
1621 			/* this is rotation unencrypted => encrypted */
1622 			crypt_data->type = CRYPT_SCHEME_1;
1623 		}
1624 	}
1625 
1626 	/* count active threads in space */
1627 	crypt_data->rotate_state.active_threads++;
1628 
1629 	/* Initialize thread local state */
1630 	state->end_lsn = crypt_data->rotate_state.end_lsn;
1631 	state->min_key_version_found =
1632 		crypt_data->rotate_state.min_key_version_found;
1633 
1634 	mutex_exit(&crypt_data->mutex);
1635 }
1636 
1637 /***********************************************************************
1638 Search for batch of pages needing rotation
1639 @param[in]	key_state		Key state
1640 @param[in,out]	state			Rotation state
1641 @return true if page needing key rotation found, false if not found */
1642 static
1643 bool
fil_crypt_find_page_to_rotate(const key_state_t * key_state,rotate_thread_t * state)1644 fil_crypt_find_page_to_rotate(
1645 	const key_state_t*	key_state,
1646 	rotate_thread_t*	state)
1647 {
1648 	ulint batch = srv_alloc_time * state->allocated_iops;
1649 	fil_space_t* space = state->space;
1650 
1651 	ut_ad(!space || space->referenced());
1652 
1653 	/* If space is marked to be dropped stop rotation. */
1654 	if (!space || space->is_stopping()) {
1655 		return false;
1656 	}
1657 
1658 	fil_space_crypt_t *crypt_data = space->crypt_data;
1659 
1660 	mutex_enter(&crypt_data->mutex);
1661 	ut_ad(key_state->key_id == crypt_data->key_id);
1662 
1663 	bool found = crypt_data->rotate_state.max_offset >=
1664 		crypt_data->rotate_state.next_offset;
1665 
1666 	if (found) {
1667 		state->offset = crypt_data->rotate_state.next_offset;
1668 		ulint remaining = crypt_data->rotate_state.max_offset -
1669 			crypt_data->rotate_state.next_offset;
1670 
1671 		if (batch <= remaining) {
1672 			state->batch = batch;
1673 		} else {
1674 			state->batch = remaining;
1675 		}
1676 	}
1677 
1678 	crypt_data->rotate_state.next_offset += batch;
1679 	mutex_exit(&crypt_data->mutex);
1680 	return found;
1681 }
1682 
1683 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1684 	fil_crypt_get_page_throttle_func(state, offset, mtr, \
1685 					 sleeptime_ms, __FILE__, __LINE__)
1686 
1687 /***********************************************************************
1688 Get a page and compute sleep time
1689 @param[in,out]		state		Rotation state
1690 @param[in]		offset		Page offset
1691 @param[in,out]		mtr		Minitransaction
1692 @param[out]		sleeptime_ms	Sleep time
1693 @param[in]		file		File where called
1694 @param[in]		line		Line where called
1695 @return page or NULL*/
1696 static
1697 buf_block_t*
fil_crypt_get_page_throttle_func(rotate_thread_t * state,ulint offset,mtr_t * mtr,ulint * sleeptime_ms,const char * file,unsigned line)1698 fil_crypt_get_page_throttle_func(
1699 	rotate_thread_t*	state,
1700 	ulint 			offset,
1701 	mtr_t*			mtr,
1702 	ulint*			sleeptime_ms,
1703 	const char*		file,
1704 	unsigned		line)
1705 {
1706 	fil_space_t* space = state->space;
1707 	const page_size_t page_size = page_size_t(space->flags);
1708 	const page_id_t page_id(space->id, offset);
1709 	ut_ad(space->referenced());
1710 
1711 	/* Before reading from tablespace we need to make sure that
1712 	the tablespace is not about to be dropped. */
1713 	if (space->is_stopping()) {
1714 		return NULL;
1715 	}
1716 
1717 	dberr_t err = DB_SUCCESS;
1718 	buf_block_t* block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
1719 					      NULL,
1720 					      BUF_PEEK_IF_IN_POOL, file, line,
1721 					      mtr, &err);
1722 	if (block != NULL) {
1723 		/* page was in buffer pool */
1724 		state->crypt_stat.pages_read_from_cache++;
1725 		return block;
1726 	}
1727 
1728 	if (space->is_stopping()) {
1729 		return NULL;
1730 	}
1731 
1732 	state->crypt_stat.pages_read_from_disk++;
1733 
1734 	const ulonglong start = my_interval_timer();
1735 	block = buf_page_get_gen(page_id, page_size,
1736 				 RW_X_LATCH,
1737 				 NULL, BUF_GET_POSSIBLY_FREED,
1738 				file, line, mtr, &err);
1739 	const ulonglong end = my_interval_timer();
1740 
1741 	state->cnt_waited++;
1742 
1743 	if (end > start) {
1744 		state->sum_waited_us += (end - start) / 1000;
1745 	}
1746 
1747 	/* average page load */
1748 	ulint add_sleeptime_ms = 0;
1749 	ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1750 	ulint alloc_wait_us = 1000000 / state->allocated_iops;
1751 
1752 	if (avg_wait_time_us < alloc_wait_us) {
1753 		/* we reading faster than we allocated */
1754 		add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1755 	} else {
1756 		/* if page load time is longer than we want, skip sleeping */
1757 	}
1758 
1759 	*sleeptime_ms += add_sleeptime_ms;
1760 
1761 	return block;
1762 }
1763 
1764 
1765 /***********************************************************************
1766 Get block and allocation status
1767 
1768 note: innodb locks fil_space_latch and then block when allocating page
1769 but locks block and then fil_space_latch when freeing page.
1770 
1771 @param[in,out]		state		Rotation state
1772 @param[in]		offset		Page offset
1773 @param[in,out]		mtr		Minitransaction
1774 @param[out]		allocation_status Allocation status
1775 @param[out]		sleeptime_ms	Sleep time
1776 @return block or NULL
1777 */
1778 static
1779 buf_block_t*
btr_scrub_get_block_and_allocation_status(rotate_thread_t * state,ulint offset,mtr_t * mtr,btr_scrub_page_allocation_status_t * allocation_status,ulint * sleeptime_ms)1780 btr_scrub_get_block_and_allocation_status(
1781 	rotate_thread_t*	state,
1782 	ulint 			offset,
1783 	mtr_t*			mtr,
1784 	btr_scrub_page_allocation_status_t *allocation_status,
1785 	ulint*			sleeptime_ms)
1786 {
1787 	mtr_t local_mtr;
1788 	buf_block_t *block = NULL;
1789 	fil_space_t* space = state->space;
1790 
1791 	ut_ad(space->referenced());
1792 
1793 	mtr_start(&local_mtr);
1794 
1795 	*allocation_status = fseg_page_is_free(space, (uint32_t)offset) ?
1796 		BTR_SCRUB_PAGE_FREE :
1797 		BTR_SCRUB_PAGE_ALLOCATED;
1798 
1799 	if (*allocation_status == BTR_SCRUB_PAGE_FREE) {
1800 		/* this is easy case, we lock fil_space_latch first and
1801 		then block */
1802 		block = fil_crypt_get_page_throttle(state,
1803 						    offset, mtr,
1804 						    sleeptime_ms);
1805 		mtr_commit(&local_mtr);
1806 	} else {
1807 		/* page is allocated according to xdes */
1808 
1809 		/* release fil_space_latch *before* fetching block */
1810 		mtr_commit(&local_mtr);
1811 
1812 		/* NOTE: when we have locked dict_index_get_lock(),
1813 		* it's safe to release fil_space_latch and then fetch block
1814 		* as dict_index_get_lock() is needed to make tree modifications
1815 		* such as free-ing a page
1816 		*/
1817 
1818 		block = fil_crypt_get_page_throttle(state,
1819 						    offset, mtr,
1820 						    sleeptime_ms);
1821 	}
1822 
1823 	return block;
1824 }
1825 
1826 
1827 /***********************************************************************
1828 Rotate one page
1829 @param[in,out]		key_state		Key state
1830 @param[in,out]		state			Rotation state */
1831 static
1832 void
fil_crypt_rotate_page(const key_state_t * key_state,rotate_thread_t * state)1833 fil_crypt_rotate_page(
1834 	const key_state_t*	key_state,
1835 	rotate_thread_t*	state)
1836 {
1837 	fil_space_t*space = state->space;
1838 	ulint space_id = space->id;
1839 	ulint offset = state->offset;
1840 	ulint sleeptime_ms = 0;
1841 	fil_space_crypt_t *crypt_data = space->crypt_data;
1842 
1843 	ut_ad(space->referenced());
1844 	ut_ad(offset > 0);
1845 
1846 	/* In fil_crypt_thread where key rotation is done we have
1847 	acquired space and checked that this space is not yet
1848 	marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
1849 	Check here also to give DROP TABLE or similar a change. */
1850 	if (space->is_stopping()) {
1851 		return;
1852 	}
1853 
1854 	if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
1855 		/* don't encrypt this as it contains address to dblwr buffer */
1856 		return;
1857 	}
1858 
1859 	mtr_t mtr;
1860 	mtr.start();
1861 	if (buf_block_t* block = fil_crypt_get_page_throttle(state,
1862 							     offset, &mtr,
1863 							     &sleeptime_ms)) {
1864 		bool modified = false;
1865 		int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
1866 		lsn_t block_lsn = block->page.newest_modification;
1867 		byte* frame = buf_block_get_frame(block);
1868 		uint kv =  mach_read_from_4(frame+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
1869 
1870 		if (space->is_stopping()) {
1871 			/* The tablespace is closing (in DROP TABLE or
1872 			TRUNCATE TABLE or similar): avoid further access */
1873 		} else if (!kv && !*reinterpret_cast<uint16_t*>
1874 			   (&frame[FIL_PAGE_TYPE])) {
1875 			/* It looks like this page is not
1876 			allocated. Because key rotation is accessing
1877 			pages in a pattern that is unlike the normal
1878 			B-tree and undo log access pattern, we cannot
1879 			invoke fseg_page_is_free() here, because that
1880 			could result in a deadlock. If we invoked
1881 			fseg_page_is_free() and released the
1882 			tablespace latch before acquiring block->lock,
1883 			then the fseg_page_is_free() information
1884 			could be stale already. */
1885 
1886 			/* If the data file was originally created
1887 			before MariaDB 10.0 or MySQL 5.6, some
1888 			allocated data pages could carry 0 in
1889 			FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
1890 			pages will be updated in
1891 			buf_flush_init_for_writing() when the page
1892 			is modified the next time.
1893 
1894 			Also, when the doublewrite buffer pages are
1895 			allocated on bootstrap in a non-debug build,
1896 			some dummy pages will be allocated, with 0 in
1897 			the FIL_PAGE_TYPE. Those pages should be
1898 			skipped from key rotation forever. */
1899 		} else if (fil_crypt_needs_rotation(
1900 				crypt_data,
1901 				kv,
1902 				key_state->key_version,
1903 				key_state->rotate_key_age)) {
1904 
1905 			mtr.set_named_space(space);
1906 			modified = true;
1907 
1908 			/* force rotation by dummy updating page */
1909 			mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
1910 					 space_id, MLOG_4BYTES, &mtr);
1911 
1912 			/* statistics */
1913 			state->crypt_stat.pages_modified++;
1914 		} else {
1915 			if (crypt_data->is_encrypted()) {
1916 				if (kv < state->min_key_version_found) {
1917 					state->min_key_version_found = kv;
1918 				}
1919 			}
1920 
1921 			needs_scrubbing = btr_page_needs_scrubbing(
1922 				&state->scrub_data, block,
1923 				BTR_SCRUB_PAGE_ALLOCATION_UNKNOWN);
1924 		}
1925 
1926 		mtr.commit();
1927 		lsn_t end_lsn = mtr.commit_lsn();
1928 
1929 		if (needs_scrubbing == BTR_SCRUB_PAGE) {
1930 			mtr.start();
1931 			/*
1932 			* refetch page and allocation status
1933 			*/
1934 			btr_scrub_page_allocation_status_t allocated;
1935 
1936 			block = btr_scrub_get_block_and_allocation_status(
1937 				state, offset, &mtr,
1938 				&allocated,
1939 				&sleeptime_ms);
1940 
1941 			if (block) {
1942 				mtr.set_named_space(space);
1943 
1944 				/* get required table/index and index-locks */
1945 				needs_scrubbing = btr_scrub_recheck_page(
1946 					&state->scrub_data, block, allocated, &mtr);
1947 
1948 				if (needs_scrubbing == BTR_SCRUB_PAGE) {
1949 					/* we need to refetch it once more now that we have
1950 					* index locked */
1951 					block = btr_scrub_get_block_and_allocation_status(
1952 						state, offset, &mtr,
1953 						&allocated,
1954 						&sleeptime_ms);
1955 
1956 					needs_scrubbing = btr_scrub_page(&state->scrub_data,
1957 						block, allocated,
1958 						&mtr);
1959 				}
1960 
1961 				/* NOTE: mtr is committed inside btr_scrub_recheck_page()
1962 				* and/or btr_scrub_page. This is to make sure that
1963 				* locks & pages are latched in corrected order,
1964 				* the mtr is in some circumstances restarted.
1965 				* (mtr_commit() + mtr_start())
1966 				*/
1967 			}
1968 		}
1969 
1970 		if (needs_scrubbing != BTR_SCRUB_PAGE) {
1971 			/* if page didn't need scrubbing it might be that cleanups
1972 			are needed. do those outside of any mtr to prevent deadlocks.
1973 
1974 			the information what kinds of cleanups that are needed are
1975 			encoded inside the needs_scrubbing, but this is opaque to
1976 			this function (except the value BTR_SCRUB_PAGE) */
1977 			btr_scrub_skip_page(&state->scrub_data, needs_scrubbing);
1978 		}
1979 
1980 		if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
1981 			/* if we just detected that scrubbing was turned off
1982 			* update global state to reflect this */
1983 			ut_ad(crypt_data);
1984 			mutex_enter(&crypt_data->mutex);
1985 			crypt_data->rotate_state.scrubbing.is_active = false;
1986 			mutex_exit(&crypt_data->mutex);
1987 		}
1988 
1989 		if (modified) {
1990 			/* if we modified page, we take lsn from mtr */
1991 			ut_a(end_lsn > state->end_lsn);
1992 			ut_a(end_lsn > block_lsn);
1993 			state->end_lsn = end_lsn;
1994 		} else {
1995 			/* if we did not modify page, check for max lsn */
1996 			if (block_lsn > state->end_lsn) {
1997 				state->end_lsn = block_lsn;
1998 			}
1999 		}
2000 	} else {
2001 		/* If block read failed mtr memo and log should be empty. */
2002 		ut_ad(!mtr.has_modifications());
2003 		ut_ad(!mtr.is_dirty());
2004 		ut_ad(mtr.get_memo()->size() == 0);
2005 		ut_ad(mtr.get_log()->size() == 0);
2006 		mtr.commit();
2007 	}
2008 
2009 	if (sleeptime_ms) {
2010 		os_event_reset(fil_crypt_throttle_sleep_event);
2011 		os_event_wait_time(fil_crypt_throttle_sleep_event,
2012 				   1000 * sleeptime_ms);
2013 	}
2014 }
2015 
2016 /***********************************************************************
2017 Rotate a batch of pages
2018 @param[in,out]		key_state		Key state
2019 @param[in,out]		state			Rotation state */
2020 static
2021 void
fil_crypt_rotate_pages(const key_state_t * key_state,rotate_thread_t * state)2022 fil_crypt_rotate_pages(
2023 	const key_state_t*	key_state,
2024 	rotate_thread_t*	state)
2025 {
2026 	ulint space = state->space->id;
2027 	ulint end = std::min(state->offset + state->batch,
2028 			     state->space->free_limit);
2029 
2030 	ut_ad(state->space->referenced());
2031 
2032 	for (; state->offset < end; state->offset++) {
2033 
2034 		/* we can't rotate pages in dblwr buffer as
2035 		* it's not possible to read those due to lots of asserts
2036 		* in buffer pool.
2037 		*
2038 		* However since these are only (short-lived) copies of
2039 		* real pages, they will be updated anyway when the
2040 		* real page is updated
2041 		*/
2042 		if (space == TRX_SYS_SPACE &&
2043 		    buf_dblwr_page_inside(state->offset)) {
2044 			continue;
2045 		}
2046 
2047 		/* If space is marked as stopping, stop rotating
2048 		pages. */
2049 		if (state->space->is_stopping()) {
2050 			break;
2051 		}
2052 
2053 		fil_crypt_rotate_page(key_state, state);
2054 	}
2055 }
2056 
2057 /***********************************************************************
2058 Flush rotated pages and then update page 0
2059 
2060 @param[in,out]		state	rotation state */
2061 static
2062 void
fil_crypt_flush_space(rotate_thread_t * state)2063 fil_crypt_flush_space(
2064 	rotate_thread_t*	state)
2065 {
2066 	fil_space_t* space = state->space;
2067 	fil_space_crypt_t *crypt_data = space->crypt_data;
2068 
2069 	ut_ad(space->referenced());
2070 
2071 	/* flush tablespace pages so that there are no pages left with old key */
2072 	lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2073 
2074 	if (end_lsn > 0 && !space->is_stopping()) {
2075 		bool success = false;
2076 		ulint n_pages = 0;
2077 		ulint sum_pages = 0;
2078 		const ulonglong start = my_interval_timer();
2079 
2080 		do {
2081 			success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
2082 			buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
2083 			sum_pages += n_pages;
2084 		} while (!success && !space->is_stopping());
2085 
2086 		const ulonglong end = my_interval_timer();
2087 
2088 		if (sum_pages && end > start) {
2089 			state->cnt_waited += sum_pages;
2090 			state->sum_waited_us += (end - start) / 1000;
2091 
2092 			/* statistics */
2093 			state->crypt_stat.pages_flushed += sum_pages;
2094 		}
2095 	}
2096 
2097 	if (crypt_data->min_key_version == 0) {
2098 		crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2099 	}
2100 
2101 	if (space->is_stopping()) {
2102 		return;
2103 	}
2104 
2105 	/* update page 0 */
2106 	mtr_t mtr;
2107 	mtr.start();
2108 
2109 	dberr_t err;
2110 
2111 	if (buf_block_t* block = buf_page_get_gen(
2112 		    page_id_t(space->id, 0), page_size_t(space->flags),
2113 		    RW_X_LATCH, NULL, BUF_GET,
2114 		    __FILE__, __LINE__, &mtr, &err)) {
2115 		mtr.set_named_space(space);
2116 		crypt_data->write_page0(space, block->frame, &mtr);
2117 	}
2118 
2119 	mtr.commit();
2120 }
2121 
2122 /***********************************************************************
2123 Complete rotating a space
2124 @param[in,out]		state			Rotation state */
fil_crypt_complete_rotate_space(rotate_thread_t * state)2125 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2126 {
2127 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
2128 
2129 	ut_ad(crypt_data);
2130 	ut_ad(state->space->referenced());
2131 
2132 	/* Space might already be dropped */
2133 	if (!state->space->is_stopping()) {
2134 		mutex_enter(&crypt_data->mutex);
2135 
2136 		/**
2137 		* Update crypt data state with state from thread
2138 		*/
2139 		if (state->min_key_version_found <
2140 			crypt_data->rotate_state.min_key_version_found) {
2141 			crypt_data->rotate_state.min_key_version_found =
2142 				state->min_key_version_found;
2143 		}
2144 
2145 		if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2146 			crypt_data->rotate_state.end_lsn = state->end_lsn;
2147 		}
2148 
2149 		ut_a(crypt_data->rotate_state.active_threads > 0);
2150 		crypt_data->rotate_state.active_threads--;
2151 		bool last = crypt_data->rotate_state.active_threads == 0;
2152 
2153 		/**
2154 		* check if space is fully done
2155 		* this as when threads shutdown, it could be that we "complete"
2156 		* iterating before we have scanned the full space.
2157 		*/
2158 		bool done = crypt_data->rotate_state.next_offset >=
2159 			crypt_data->rotate_state.max_offset;
2160 
2161 		/**
2162 		* we should flush space if we're last thread AND
2163 		* the iteration is done
2164 		*/
2165 		bool should_flush = last && done;
2166 
2167 		if (should_flush) {
2168 			/* we're the last active thread */
2169 			crypt_data->rotate_state.flushing = true;
2170 			crypt_data->min_key_version =
2171 				crypt_data->rotate_state.min_key_version_found;
2172 		}
2173 
2174 		/* inform scrubbing */
2175 		crypt_data->rotate_state.scrubbing.is_active = false;
2176 		mutex_exit(&crypt_data->mutex);
2177 
2178 		/* all threads must call btr_scrub_complete_space wo/ mutex held */
2179 		if (state->scrub_data.scrubbing) {
2180 			btr_scrub_complete_space(&state->scrub_data);
2181 			if (should_flush) {
2182 				/* only last thread updates last_scrub_completed */
2183 				ut_ad(crypt_data);
2184 				mutex_enter(&crypt_data->mutex);
2185 				crypt_data->rotate_state.scrubbing.
2186 					last_scrub_completed = time(0);
2187 				mutex_exit(&crypt_data->mutex);
2188 			}
2189 		}
2190 
2191 		if (should_flush) {
2192 			fil_crypt_flush_space(state);
2193 
2194 			mutex_enter(&crypt_data->mutex);
2195 			crypt_data->rotate_state.flushing = false;
2196 			mutex_exit(&crypt_data->mutex);
2197 		}
2198 	} else {
2199 		mutex_enter(&crypt_data->mutex);
2200 		ut_a(crypt_data->rotate_state.active_threads > 0);
2201 		crypt_data->rotate_state.active_threads--;
2202 		mutex_exit(&crypt_data->mutex);
2203 	}
2204 }
2205 
2206 /*********************************************************************//**
2207 A thread which monitors global key state and rotates tablespaces accordingly
2208 @return a dummy parameter */
2209 extern "C" UNIV_INTERN
2210 os_thread_ret_t
DECLARE_THREAD(fil_crypt_thread)2211 DECLARE_THREAD(fil_crypt_thread)(void*)
2212 {
2213 	mutex_enter(&fil_crypt_threads_mutex);
2214 	uint thread_no = srv_n_fil_crypt_threads_started;
2215 	srv_n_fil_crypt_threads_started++;
2216 	os_event_set(fil_crypt_event); /* signal that we started */
2217 	mutex_exit(&fil_crypt_threads_mutex);
2218 
2219 	/* state of this thread */
2220 	rotate_thread_t thr(thread_no);
2221 
2222 	/* if we find a space that is starting, skip over it and recheck it later */
2223 	bool recheck = false;
2224 
2225 	while (!thr.should_shutdown()) {
2226 
2227 		key_state_t new_state;
2228 
2229 		time_t wait_start = time(0);
2230 
2231 		while (!thr.should_shutdown()) {
2232 
2233 			/* wait for key state changes
2234 			* i.e either new key version of change or
2235 			* new rotate_key_age */
2236 			os_event_reset(fil_crypt_threads_event);
2237 
2238 			if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2239 				break;
2240 			}
2241 
2242 			if (recheck) {
2243 				/* check recheck here, after sleep, so
2244 				* that we don't busy loop while when one thread is starting
2245 				* a space*/
2246 				break;
2247 			}
2248 
2249 			time_t waited = time(0) - wait_start;
2250 
2251 			/* Break if we have waited the background scrub
2252 			internal and background scrubbing is enabled */
2253 			if (waited >= 0
2254 			    && ulint(waited) >= srv_background_scrub_data_check_interval
2255 			    && (srv_background_scrub_data_uncompressed
2256 			        || srv_background_scrub_data_compressed)) {
2257 				break;
2258 			}
2259 		}
2260 
2261 		recheck = false;
2262 		thr.first = true;      // restart from first tablespace
2263 
2264 		/* iterate all spaces searching for those needing rotation */
2265 		while (!thr.should_shutdown() &&
2266 		       fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2267 
2268 			/* we found a space to rotate */
2269 			fil_crypt_start_rotate_space(&new_state, &thr);
2270 
2271 			/* iterate all pages (cooperativly with other threads) */
2272 			while (!thr.should_shutdown() &&
2273 			       fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2274 
2275 				if (!thr.space->is_stopping()) {
2276 					/* rotate a (set) of pages */
2277 					fil_crypt_rotate_pages(&new_state, &thr);
2278 				}
2279 
2280 				/* If space is marked as stopping, release
2281 				space and stop rotation. */
2282 				if (thr.space->is_stopping()) {
2283 					fil_crypt_complete_rotate_space(&thr);
2284 					thr.space->release();
2285 					thr.space = NULL;
2286 					break;
2287 				}
2288 
2289 				/* realloc iops */
2290 				fil_crypt_realloc_iops(&thr);
2291 			}
2292 
2293 			/* complete rotation */
2294 			if (thr.space) {
2295 				fil_crypt_complete_rotate_space(&thr);
2296 			}
2297 
2298 			/* force key state refresh */
2299 			new_state.key_id = 0;
2300 
2301 			/* return iops */
2302 			fil_crypt_return_iops(&thr);
2303 		}
2304 	}
2305 
2306 	/* return iops if shutting down */
2307 	fil_crypt_return_iops(&thr);
2308 
2309 	/* release current space if shutting down */
2310 	if (thr.space) {
2311 		thr.space->release();
2312 		thr.space = NULL;
2313 	}
2314 
2315 	mutex_enter(&fil_crypt_threads_mutex);
2316 	srv_n_fil_crypt_threads_started--;
2317 	os_event_set(fil_crypt_event); /* signal that we stopped */
2318 	mutex_exit(&fil_crypt_threads_mutex);
2319 
2320 	/* We count the number of threads in os_thread_exit(). A created
2321 	thread should always use that to exit and not use return() to exit. */
2322 
2323 	os_thread_exit();
2324 
2325 	OS_THREAD_DUMMY_RETURN;
2326 }
2327 
2328 /*********************************************************************
2329 Adjust thread count for key rotation
2330 @param[in]	enw_cnt		Number of threads to be used */
2331 UNIV_INTERN
2332 void
fil_crypt_set_thread_cnt(const uint new_cnt)2333 fil_crypt_set_thread_cnt(
2334 	const uint	new_cnt)
2335 {
2336 	if (!fil_crypt_threads_inited) {
2337 		fil_crypt_threads_init();
2338 	}
2339 
2340 	mutex_enter(&fil_crypt_threads_mutex);
2341 
2342 	if (new_cnt > srv_n_fil_crypt_threads) {
2343 		uint add = new_cnt - srv_n_fil_crypt_threads;
2344 		srv_n_fil_crypt_threads = new_cnt;
2345 		for (uint i = 0; i < add; i++) {
2346 			os_thread_id_t rotation_thread_id;
2347 			os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
2348 			ib::info() << "Creating #"
2349 				   << i+1 << " encryption thread id "
2350 				   << os_thread_pf(rotation_thread_id)
2351 				   << " total threads " << new_cnt << ".";
2352 		}
2353 	} else if (new_cnt < srv_n_fil_crypt_threads) {
2354 		srv_n_fil_crypt_threads = new_cnt;
2355 		os_event_set(fil_crypt_threads_event);
2356 	}
2357 
2358 	mutex_exit(&fil_crypt_threads_mutex);
2359 
2360 	while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2361 		os_event_reset(fil_crypt_event);
2362 		os_event_wait_time(fil_crypt_event, 100000);
2363 	}
2364 
2365 	/* Send a message to encryption threads that there could be
2366 	something to do. */
2367 	if (srv_n_fil_crypt_threads) {
2368 		os_event_set(fil_crypt_threads_event);
2369 	}
2370 }
2371 
2372 /** Initialize the tablespace default_encrypt_tables
2373 if innodb_encryption_rotate_key_age=0. */
fil_crypt_default_encrypt_tables_fill()2374 static void fil_crypt_default_encrypt_tables_fill()
2375 {
2376 	ut_ad(mutex_own(&fil_system.mutex));
2377 
2378 	for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2379 	     space != NULL;
2380 	     space = UT_LIST_GET_NEXT(space_list, space)) {
2381 		if (space->purpose != FIL_TYPE_TABLESPACE
2382 		    || space->is_in_default_encrypt
2383 		    || UT_LIST_GET_LEN(space->chain) == 0
2384 		    || !space->acquire()) {
2385 			continue;
2386 		}
2387 
2388 		/* Ensure that crypt_data has been initialized. */
2389 		if (!space->size) {
2390 			ut_d(const fil_space_t* s=)
2391 			        fil_system.read_page0(space->id);
2392 			ut_ad(!s || s == space);
2393 			if (!space->size) {
2394 				/* Page 0 was not loaded.
2395 				Skip this tablespace. */
2396 				goto next;
2397 			}
2398 		}
2399 
2400 		/* Skip ENCRYPTION!=DEFAULT tablespaces. */
2401 		if (space->crypt_data
2402 		    && !space->crypt_data->is_default_encryption()) {
2403 			goto next;
2404 		}
2405 
2406 		if (srv_encrypt_tables) {
2407 			/* Skip encrypted tablespaces if
2408 			innodb_encrypt_tables!=OFF */
2409 			if (space->crypt_data
2410 			    && space->crypt_data->min_key_version) {
2411 				goto next;
2412 			}
2413 		} else {
2414 			/* Skip unencrypted tablespaces if
2415 			innodb_encrypt_tables=OFF */
2416 			if (!space->crypt_data
2417 			    || !space->crypt_data->min_key_version) {
2418 				goto next;
2419 			}
2420 		}
2421 
2422 		fil_system.default_encrypt_tables.push_back(*space);
2423 		space->is_in_default_encrypt = true;
2424 next:
2425 		space->release();
2426 	}
2427 }
2428 
2429 /*********************************************************************
2430 Adjust max key age
2431 @param[in]	val		New max key age */
2432 UNIV_INTERN
2433 void
fil_crypt_set_rotate_key_age(uint val)2434 fil_crypt_set_rotate_key_age(
2435 	uint	val)
2436 {
2437 	mutex_enter(&fil_system.mutex);
2438 	srv_fil_crypt_rotate_key_age = val;
2439 	if (val == 0) {
2440 		fil_crypt_default_encrypt_tables_fill();
2441 	}
2442 	mutex_exit(&fil_system.mutex);
2443 	os_event_set(fil_crypt_threads_event);
2444 }
2445 
2446 /*********************************************************************
2447 Adjust rotation iops
2448 @param[in]	val		New max roation iops */
2449 UNIV_INTERN
2450 void
fil_crypt_set_rotation_iops(uint val)2451 fil_crypt_set_rotation_iops(
2452 	uint val)
2453 {
2454 	srv_n_fil_crypt_iops = val;
2455 	os_event_set(fil_crypt_threads_event);
2456 }
2457 
2458 /*********************************************************************
2459 Adjust encrypt tables
2460 @param[in]	val		New setting for innodb-encrypt-tables */
2461 UNIV_INTERN
2462 void
fil_crypt_set_encrypt_tables(uint val)2463 fil_crypt_set_encrypt_tables(
2464 	uint val)
2465 {
2466 	if (!fil_crypt_threads_inited) {
2467 		return;
2468 	}
2469 
2470 	mutex_enter(&fil_system.mutex);
2471 
2472 	srv_encrypt_tables = val;
2473 
2474 	if (fil_crypt_must_default_encrypt()) {
2475 		fil_crypt_default_encrypt_tables_fill();
2476 	}
2477 
2478 	mutex_exit(&fil_system.mutex);
2479 
2480 	os_event_set(fil_crypt_threads_event);
2481 }
2482 
2483 /*********************************************************************
2484 Init threads for key rotation */
2485 UNIV_INTERN
2486 void
fil_crypt_threads_init()2487 fil_crypt_threads_init()
2488 {
2489 	if (!fil_crypt_threads_inited) {
2490 		fil_crypt_event = os_event_create(0);
2491 		fil_crypt_threads_event = os_event_create(0);
2492 		mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2493 		     &fil_crypt_threads_mutex);
2494 
2495 		uint cnt = srv_n_fil_crypt_threads;
2496 		srv_n_fil_crypt_threads = 0;
2497 		fil_crypt_threads_inited = true;
2498 		fil_crypt_set_thread_cnt(cnt);
2499 	}
2500 }
2501 
2502 /*********************************************************************
2503 Clean up key rotation threads resources */
2504 UNIV_INTERN
2505 void
fil_crypt_threads_cleanup()2506 fil_crypt_threads_cleanup()
2507 {
2508 	if (!fil_crypt_threads_inited) {
2509 		return;
2510 	}
2511 	ut_a(!srv_n_fil_crypt_threads_started);
2512 	os_event_destroy(fil_crypt_event);
2513 	os_event_destroy(fil_crypt_threads_event);
2514 	mutex_free(&fil_crypt_threads_mutex);
2515 	fil_crypt_threads_inited = false;
2516 }
2517 
2518 /*********************************************************************
2519 Wait for crypt threads to stop accessing space
2520 @param[in]	space		Tablespace */
2521 UNIV_INTERN
2522 void
fil_space_crypt_close_tablespace(const fil_space_t * space)2523 fil_space_crypt_close_tablespace(
2524 	const fil_space_t*	space)
2525 {
2526 	fil_space_crypt_t* crypt_data = space->crypt_data;
2527 
2528 	if (!crypt_data || srv_n_fil_crypt_threads == 0
2529 	    || !fil_crypt_threads_inited) {
2530 		return;
2531 	}
2532 
2533 	mutex_enter(&fil_crypt_threads_mutex);
2534 
2535 	time_t start = time(0);
2536 	time_t last = start;
2537 
2538 	mutex_enter(&crypt_data->mutex);
2539 	mutex_exit(&fil_crypt_threads_mutex);
2540 
2541 	ulint cnt = crypt_data->rotate_state.active_threads;
2542 	bool flushing = crypt_data->rotate_state.flushing;
2543 
2544 	while (cnt > 0 || flushing) {
2545 		mutex_exit(&crypt_data->mutex);
2546 		/* release dict mutex so that scrub threads can release their
2547 		* table references */
2548 		dict_mutex_exit_for_mysql();
2549 
2550 		/* wakeup throttle (all) sleepers */
2551 		os_event_set(fil_crypt_throttle_sleep_event);
2552 		os_event_set(fil_crypt_threads_event);
2553 
2554 		os_thread_sleep(20000);
2555 		dict_mutex_enter_for_mysql();
2556 		mutex_enter(&crypt_data->mutex);
2557 		cnt = crypt_data->rotate_state.active_threads;
2558 		flushing = crypt_data->rotate_state.flushing;
2559 
2560 		time_t now = time(0);
2561 
2562 		if (now >= last + 30) {
2563 			ib::warn() << "Waited "
2564 				   << now - start
2565 				   << " seconds to drop space: "
2566 				   << space->name << " ("
2567 				   << space->id << ") active threads "
2568 				   << cnt << "flushing="
2569 				   << flushing << ".";
2570 			last = now;
2571 		}
2572 	}
2573 
2574 	mutex_exit(&crypt_data->mutex);
2575 }
2576 
2577 /*********************************************************************
2578 Get crypt status for a space (used by information_schema)
2579 @param[in]	space		Tablespace
2580 @param[out]	status		Crypt status */
2581 UNIV_INTERN
2582 void
fil_space_crypt_get_status(const fil_space_t * space,struct fil_space_crypt_status_t * status)2583 fil_space_crypt_get_status(
2584 	const fil_space_t*			space,
2585 	struct fil_space_crypt_status_t*	status)
2586 {
2587 	memset(status, 0, sizeof(*status));
2588 
2589 	ut_ad(space->referenced());
2590 
2591 	/* If there is no crypt data and we have not yet read
2592 	page 0 for this tablespace, we need to read it before
2593 	we can continue. */
2594 	if (!space->crypt_data) {
2595 		fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2596 	}
2597 
2598 	status->space = ULINT_UNDEFINED;
2599 
2600 	if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2601 		status->space = space->id;
2602 		mutex_enter(&crypt_data->mutex);
2603 		status->scheme = crypt_data->type;
2604 		status->keyserver_requests = crypt_data->keyserver_requests;
2605 		status->min_key_version = crypt_data->min_key_version;
2606 		status->key_id = crypt_data->key_id;
2607 
2608 		if (crypt_data->rotate_state.active_threads > 0 ||
2609 		    crypt_data->rotate_state.flushing) {
2610 			status->rotating = true;
2611 			status->flushing =
2612 				crypt_data->rotate_state.flushing;
2613 			status->rotate_next_page_number =
2614 				crypt_data->rotate_state.next_offset;
2615 			status->rotate_max_page_number =
2616 				crypt_data->rotate_state.max_offset;
2617 		}
2618 
2619 		mutex_exit(&crypt_data->mutex);
2620 
2621 		if (srv_encrypt_tables || crypt_data->min_key_version) {
2622 			status->current_key_version =
2623 				fil_crypt_get_latest_key_version(crypt_data);
2624 		}
2625 	}
2626 }
2627 
2628 /*********************************************************************
2629 Return crypt statistics
2630 @param[out]	stat		Crypt statistics */
2631 UNIV_INTERN
2632 void
fil_crypt_total_stat(fil_crypt_stat_t * stat)2633 fil_crypt_total_stat(
2634 	fil_crypt_stat_t *stat)
2635 {
2636 	mutex_enter(&crypt_stat_mutex);
2637 	*stat = crypt_stat;
2638 	mutex_exit(&crypt_stat_mutex);
2639 }
2640 
2641 /*********************************************************************
2642 Get scrub status for a space (used by information_schema)
2643 
2644 @param[in]	space		Tablespace
2645 @param[out]	status		Scrub status */
2646 UNIV_INTERN
2647 void
fil_space_get_scrub_status(const fil_space_t * space,struct fil_space_scrub_status_t * status)2648 fil_space_get_scrub_status(
2649 	const fil_space_t*			space,
2650 	struct fil_space_scrub_status_t*	status)
2651 {
2652 	memset(status, 0, sizeof(*status));
2653 
2654 	ut_ad(space->referenced());
2655 	fil_space_crypt_t* crypt_data = space->crypt_data;
2656 
2657 	status->space = space->id;
2658 
2659 	if (crypt_data != NULL) {
2660 		status->compressed = FSP_FLAGS_GET_ZIP_SSIZE(space->flags) > 0;
2661 		mutex_enter(&crypt_data->mutex);
2662 		status->last_scrub_completed =
2663 			crypt_data->rotate_state.scrubbing.last_scrub_completed;
2664 		if (crypt_data->rotate_state.active_threads > 0 &&
2665 		    crypt_data->rotate_state.scrubbing.is_active) {
2666 			status->scrubbing = true;
2667 			status->current_scrub_started =
2668 				crypt_data->rotate_state.start_time;
2669 			status->current_scrub_active_threads =
2670 				crypt_data->rotate_state.active_threads;
2671 			status->current_scrub_page_number =
2672 				crypt_data->rotate_state.next_offset;
2673 			status->current_scrub_max_page_number =
2674 				crypt_data->rotate_state.max_offset;
2675 		}
2676 
2677 		mutex_exit(&crypt_data->mutex);
2678 	}
2679 }
2680 #endif /* UNIV_INNOCHECKSUM */
2681 
2682 /**
2683 Verify that post encryption checksum match calculated checksum.
2684 This function should be called only if tablespace contains crypt_data
2685 metadata (this is strong indication that tablespace is encrypted).
2686 Function also verifies that traditional checksum does not match
2687 calculated checksum as if it does page could be valid unencrypted,
2688 encrypted, or corrupted.
2689 
2690 @param[in,out]	page		page frame (checksum is temporarily modified)
2691 @param[in]	page_size	page size
2692 @return whether the encrypted page is OK */
2693 bool
fil_space_verify_crypt_checksum(const byte * page,const page_size_t & page_size)2694 fil_space_verify_crypt_checksum(const byte* page, const page_size_t& page_size)
2695 {
2696 	ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
2697 
2698 	/* Compressed and encrypted pages do not have checksum. Assume not
2699 	corrupted. Page verification happens after decompression in
2700 	buf_page_io_complete() using buf_page_is_corrupted(). */
2701 	if (mach_read_from_2(page + FIL_PAGE_TYPE)
2702 	    == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2703 		return true;
2704 	}
2705 
2706 	/* Read stored post encryption checksum. */
2707 	const ib_uint32_t checksum = mach_read_from_4(
2708 		page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2709 
2710 	/* If stored checksum matches one of the calculated checksums
2711 	page is not corrupted. */
2712 
2713 	switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2714 	case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2715 		if (page_size.is_compressed()) {
2716 			return checksum == page_zip_calc_checksum(
2717 				page, page_size.physical(),
2718 				SRV_CHECKSUM_ALGORITHM_CRC32)
2719 #ifdef INNODB_BUG_ENDIAN_CRC32
2720 				|| checksum == page_zip_calc_checksum(
2721 					page, page_size.physical(),
2722 					SRV_CHECKSUM_ALGORITHM_CRC32, true)
2723 #endif
2724 				;
2725 		}
2726 
2727 		return checksum == buf_calc_page_crc32(page)
2728 #ifdef INNODB_BUG_ENDIAN_CRC32
2729 			|| checksum == buf_calc_page_crc32(page, true)
2730 #endif
2731 			;
2732 	case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2733 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2734 		due to MDEV-12114, fil_crypt_calculate_checksum()
2735 		is only using CRC32 for the encrypted pages.
2736 		Due to this, we must treat "strict_none" as "none". */
2737 	case SRV_CHECKSUM_ALGORITHM_NONE:
2738 		return true;
2739 	case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2740 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2741 		due to MDEV-12114, fil_crypt_calculate_checksum()
2742 		is only using CRC32 for the encrypted pages.
2743 		Due to this, we must treat "strict_innodb" as "innodb". */
2744 	case SRV_CHECKSUM_ALGORITHM_INNODB:
2745 	case SRV_CHECKSUM_ALGORITHM_CRC32:
2746 		if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2747 			return true;
2748 		}
2749 		if (page_size.is_compressed()) {
2750 			return checksum == page_zip_calc_checksum(
2751 				page, page_size.physical(),
2752 				SRV_CHECKSUM_ALGORITHM_CRC32)
2753 #ifdef INNODB_BUG_ENDIAN_CRC32
2754 				|| checksum == page_zip_calc_checksum(
2755 					page, page_size.physical(),
2756 					SRV_CHECKSUM_ALGORITHM_CRC32, true)
2757 #endif
2758 				|| checksum == page_zip_calc_checksum(
2759 					page, page_size.physical(),
2760 					SRV_CHECKSUM_ALGORITHM_INNODB);
2761 		}
2762 
2763 		return checksum == buf_calc_page_crc32(page)
2764 #ifdef INNODB_BUG_ENDIAN_CRC32
2765 			|| checksum == buf_calc_page_crc32(page, true)
2766 #endif
2767 			|| checksum == buf_calc_page_new_checksum(page);
2768 	}
2769 
2770 	ut_ad(!"unhandled innodb_checksum_algorithm");
2771 	return false;
2772 }
2773