1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16 
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21 
22 Created            Jonas Oreland Google
23 Modified           Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25 
26 #include "fil0crypt.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0zip.h"
30 #include "buf0checksum.h"
31 #ifdef UNIV_INNOCHECKSUM
32 # include "buf0buf.h"
33 #else
34 #include "buf0dblwr.h"
35 #include "srv0srv.h"
36 #include "srv0start.h"
37 #include "mtr0mtr.h"
38 #include "mtr0log.h"
39 #include "ut0ut.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43 
44 static bool fil_crypt_threads_inited = false;
45 
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48 
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51 
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54 
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57 
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60 
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63 
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66 
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69 
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72 
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75 
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100;	 // 10ms per iop
78 static uint srv_alloc_time = 3;		    // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80 
81 #define DEBUG_KEYROTATION_THROTTLING 0
82 
83 /** Statistics variables */
84 static fil_crypt_stat_t crypt_stat;
85 static ib_mutex_t crypt_stat_mutex;
86 
87 /***********************************************************************
88 Check if a key needs rotation given a key_state
89 @param[in]	crypt_data		Encryption information
90 @param[in]	key_version		Current key version
91 @param[in]	latest_key_version	Latest key version
92 @param[in]	rotate_key_age		when to rotate
93 @return true if key needs rotation, false if not */
94 static bool
95 fil_crypt_needs_rotation(
96 	const fil_space_crypt_t*	crypt_data,
97 	uint				key_version,
98 	uint				latest_key_version,
99 	uint				rotate_key_age)
100 	MY_ATTRIBUTE((warn_unused_result));
101 
102 /*********************************************************************
103 Init space crypt */
104 UNIV_INTERN
105 void
fil_space_crypt_init()106 fil_space_crypt_init()
107 {
108 	fil_crypt_throttle_sleep_event = os_event_create(0);
109 
110 	mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
111 	memset(&crypt_stat, 0, sizeof(crypt_stat));
112 }
113 
114 /*********************************************************************
115 Cleanup space crypt */
116 UNIV_INTERN
117 void
fil_space_crypt_cleanup()118 fil_space_crypt_cleanup()
119 {
120 	os_event_destroy(fil_crypt_throttle_sleep_event);
121 	mutex_free(&crypt_stat_mutex);
122 }
123 
124 /**
125 Get latest key version from encryption plugin.
126 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
127 uint
key_get_latest_version(void)128 fil_space_crypt_t::key_get_latest_version(void)
129 {
130 	uint key_version = key_found;
131 
132 	if (is_key_found()) {
133 		key_version = encryption_key_get_latest_version(key_id);
134 		/* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
135 		It doesn't matter because srv_encrypt_rotate
136 		can be set to true only once */
137 		if (!srv_encrypt_rotate
138 		    && key_version > srv_fil_crypt_rotate_key_age) {
139 			srv_encrypt_rotate = true;
140 		}
141 
142 		srv_stats.n_key_requests.inc();
143 		key_found = key_version;
144 	}
145 
146 	return key_version;
147 }
148 
149 /******************************************************************
150 Get the latest(key-version), waking the encrypt thread, if needed
151 @param[in,out]	crypt_data	Crypt data */
152 static inline
153 uint
fil_crypt_get_latest_key_version(fil_space_crypt_t * crypt_data)154 fil_crypt_get_latest_key_version(
155 	fil_space_crypt_t* crypt_data)
156 {
157 	ut_ad(crypt_data != NULL);
158 
159 	uint key_version = crypt_data->key_get_latest_version();
160 
161 	if (crypt_data->is_key_found()) {
162 
163 		if (fil_crypt_needs_rotation(
164 				crypt_data,
165 				crypt_data->min_key_version,
166 				key_version,
167 				srv_fil_crypt_rotate_key_age)) {
168 			/* Below event seen as NULL-pointer at startup
169 			when new database was created and we create a
170 			checkpoint. Only seen when debugging. */
171 			if (fil_crypt_threads_inited) {
172 				os_event_set(fil_crypt_threads_event);
173 			}
174 		}
175 	}
176 
177 	return key_version;
178 }
179 
180 /******************************************************************
181 Mutex helper for crypt_data->scheme */
182 void
crypt_data_scheme_locker(st_encryption_scheme * scheme,int exit)183 crypt_data_scheme_locker(
184 /*=====================*/
185 	st_encryption_scheme*	scheme,
186 	int			exit)
187 {
188 	fil_space_crypt_t* crypt_data =
189 		static_cast<fil_space_crypt_t*>(scheme);
190 
191 	if (exit) {
192 		mutex_exit(&crypt_data->mutex);
193 	} else {
194 		mutex_enter(&crypt_data->mutex);
195 	}
196 }
197 
198 /******************************************************************
199 Create a fil_space_crypt_t object
200 @param[in]	type		CRYPT_SCHEME_UNENCRYPTE or
201 				CRYPT_SCHEME_1
202 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
203 				FIL_ENCRYPTION_ON or
204 				FIL_ENCRYPTION_OFF
205 @param[in]	min_key_version key_version or 0
206 @param[in]	key_id		Used key id
207 @return crypt object */
208 static
209 fil_space_crypt_t*
fil_space_create_crypt_data(uint type,fil_encryption_t encrypt_mode,uint min_key_version,uint key_id)210 fil_space_create_crypt_data(
211 	uint			type,
212 	fil_encryption_t	encrypt_mode,
213 	uint			min_key_version,
214 	uint			key_id)
215 {
216 	fil_space_crypt_t* crypt_data = NULL;
217 	if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
218 		crypt_data = new(buf)
219 			fil_space_crypt_t(
220 				type,
221 				min_key_version,
222 				key_id,
223 				encrypt_mode);
224 	}
225 
226 	return crypt_data;
227 }
228 
229 /******************************************************************
230 Create a fil_space_crypt_t object
231 @param[in]	encrypt_mode	FIL_ENCRYPTION_DEFAULT or
232 				FIL_ENCRYPTION_ON or
233 				FIL_ENCRYPTION_OFF
234 
235 @param[in]	key_id		Encryption key id
236 @return crypt object */
237 UNIV_INTERN
238 fil_space_crypt_t*
fil_space_create_crypt_data(fil_encryption_t encrypt_mode,uint key_id)239 fil_space_create_crypt_data(
240 	fil_encryption_t	encrypt_mode,
241 	uint			key_id)
242 {
243 	return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
244 }
245 
246 /******************************************************************
247 Merge fil_space_crypt_t object
248 @param[in,out]	dst		Destination cryp data
249 @param[in]	src		Source crypt data */
250 UNIV_INTERN
251 void
fil_space_merge_crypt_data(fil_space_crypt_t * dst,const fil_space_crypt_t * src)252 fil_space_merge_crypt_data(
253 	fil_space_crypt_t* dst,
254 	const fil_space_crypt_t* src)
255 {
256 	mutex_enter(&dst->mutex);
257 
258 	/* validate that they are mergeable */
259 	ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
260 	     src->type == CRYPT_SCHEME_1);
261 
262 	ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
263 	     dst->type == CRYPT_SCHEME_1);
264 
265 	dst->encryption = src->encryption;
266 	dst->type = src->type;
267 	dst->min_key_version = src->min_key_version;
268 	dst->keyserver_requests += src->keyserver_requests;
269 
270 	mutex_exit(&dst->mutex);
271 }
272 
273 /** Initialize encryption parameters from a tablespace header page.
274 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
275 @param[in]	page		first page of the tablespace
276 @return crypt data from page 0
277 @retval	NULL	if not present or not valid */
fil_space_read_crypt_data(ulint zip_size,const byte * page)278 fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page)
279 {
280 	const ulint offset = FSP_HEADER_OFFSET
281 		+ fsp_header_get_encryption_offset(zip_size);
282 
283 	if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
284 		/* Crypt data is not stored. */
285 		return NULL;
286 	}
287 
288 	uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
289 	uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
290 	fil_space_crypt_t* crypt_data;
291 
292 	if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
293 	      type == CRYPT_SCHEME_1)
294 	    || iv_length != sizeof crypt_data->iv) {
295 		ib::error() << "Found non sensible crypt scheme: "
296 			    << type << "," << iv_length
297 			    << " for space: "
298 			    << page_get_space_id(page);
299 		return NULL;
300 	}
301 
302 	uint min_key_version = mach_read_from_4
303 		(page + offset + MAGIC_SZ + 2 + iv_length);
304 
305 	uint key_id = mach_read_from_4
306 		(page + offset + MAGIC_SZ + 2 + iv_length + 4);
307 
308 	fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
309 		page + offset + MAGIC_SZ + 2 + iv_length + 8);
310 
311 	crypt_data = fil_space_create_crypt_data(encryption, key_id);
312 	/* We need to overwrite these as above function will initialize
313 	members */
314 	crypt_data->type = type;
315 	crypt_data->min_key_version = min_key_version;
316 	memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
317 
318 	return crypt_data;
319 }
320 
321 /******************************************************************
322 Free a crypt data object
323 @param[in,out] crypt_data	crypt data to be freed */
324 UNIV_INTERN
325 void
fil_space_destroy_crypt_data(fil_space_crypt_t ** crypt_data)326 fil_space_destroy_crypt_data(
327 	fil_space_crypt_t **crypt_data)
328 {
329 	if (crypt_data != NULL && (*crypt_data) != NULL) {
330 		fil_space_crypt_t* c;
331 		if (UNIV_LIKELY(fil_crypt_threads_inited)) {
332 			mutex_enter(&fil_crypt_threads_mutex);
333 			c = *crypt_data;
334 			*crypt_data = NULL;
335 			mutex_exit(&fil_crypt_threads_mutex);
336 		} else {
337 			ut_ad(srv_read_only_mode || !srv_was_started);
338 			c = *crypt_data;
339 			*crypt_data = NULL;
340 		}
341 		if (c) {
342 			c->~fil_space_crypt_t();
343 			ut_free(c);
344 		}
345 	}
346 }
347 
348 /** Amend encryption information from redo log.
349 @param[in]	space	tablespace
350 @param[in]	data	encryption metadata */
fil_crypt_parse(fil_space_t * space,const byte * data)351 void fil_crypt_parse(fil_space_t* space, const byte* data)
352 {
353 	ut_ad(data[1] == MY_AES_BLOCK_SIZE);
354 	if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
355 		fil_space_crypt_t* crypt_data = new(buf)
356 			fil_space_crypt_t(
357 				data[0],
358 				mach_read_from_4(&data[2 + MY_AES_BLOCK_SIZE]),
359 				mach_read_from_4(&data[6 + MY_AES_BLOCK_SIZE]),
360 				static_cast<fil_encryption_t>
361 				(data[10 + MY_AES_BLOCK_SIZE]));
362 		memcpy(crypt_data->iv, data + 2, MY_AES_BLOCK_SIZE);
363 		mutex_enter(&fil_system.mutex);
364 		if (space->crypt_data) {
365 			fil_space_merge_crypt_data(space->crypt_data,
366 						   crypt_data);
367 			fil_space_destroy_crypt_data(&crypt_data);
368 			crypt_data = space->crypt_data;
369 		} else {
370 			space->crypt_data = crypt_data;
371 		}
372 		mutex_exit(&fil_system.mutex);
373 	}
374 }
375 
376 /** Fill crypt data information to the give page.
377 It should be called during ibd file creation.
378 @param[in]	flags	tablespace flags
379 @param[in,out]	page	first page of the tablespace */
380 void
fill_page0(ulint flags,byte * page)381 fil_space_crypt_t::fill_page0(
382 	ulint	flags,
383 	byte*	page)
384 {
385 	const uint len = sizeof(iv);
386 	const ulint offset = FSP_HEADER_OFFSET
387 		+ fsp_header_get_encryption_offset(
388 			fil_space_t::zip_size(flags));
389 
390 	memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
391 	mach_write_to_1(page + offset + MAGIC_SZ, type);
392 	mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
393 	memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
394 
395 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
396 			min_key_version);
397 	mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
398 			key_id);
399 	mach_write_to_1(page + offset + MAGIC_SZ + 2  + len + 8,
400 			encryption);
401 }
402 
403 /** Write encryption metadata to the first page.
404 @param[in,out]	block	first page of the tablespace
405 @param[in,out]	mtr	mini-transaction */
write_page0(buf_block_t * block,mtr_t * mtr)406 void fil_space_crypt_t::write_page0(buf_block_t* block, mtr_t* mtr)
407 {
408 	const ulint offset = FSP_HEADER_OFFSET
409 		+ fsp_header_get_encryption_offset(block->zip_size());
410 	byte* b = block->frame + offset;
411 
412 	mtr->memcpy<mtr_t::MAYBE_NOP>(*block, b, CRYPT_MAGIC, MAGIC_SZ);
413 
414 	b += MAGIC_SZ;
415 	byte* const start = b;
416 	*b++ = static_cast<byte>(type);
417 	compile_time_assert(sizeof iv == MY_AES_BLOCK_SIZE);
418 	compile_time_assert(sizeof iv == CRYPT_SCHEME_1_IV_LEN);
419 	*b++ = sizeof iv;
420 	memcpy(b, iv, sizeof iv);
421 	b += sizeof iv;
422 	mach_write_to_4(b, min_key_version);
423 	b += 4;
424 	mach_write_to_4(b, key_id);
425 	b += 4;
426 	*b++ = byte(encryption);
427 	ut_ad(b - start == 11 + MY_AES_BLOCK_SIZE);
428 	/* We must log also any unchanged bytes, because recovery will
429 	invoke fil_crypt_parse() based on this log record. */
430 	mtr->memcpy(*block, offset + MAGIC_SZ, b - start);
431 }
432 
433 /** Encrypt a buffer for non full checksum.
434 @param[in,out]		crypt_data		Crypt data
435 @param[in]		space			space_id
436 @param[in]		offset			Page offset
437 @param[in]		lsn			Log sequence number
438 @param[in]		src_frame		Page to encrypt
439 @param[in]		zip_size		ROW_FORMAT=COMPRESSED
440 						page size, or 0
441 @param[in,out]		dst_frame		Output buffer
442 @return encrypted buffer or NULL */
fil_encrypt_buf_for_non_full_checksum(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,ulint zip_size,byte * dst_frame)443 static byte* fil_encrypt_buf_for_non_full_checksum(
444 	fil_space_crypt_t*	crypt_data,
445 	ulint			space,
446 	ulint			offset,
447 	lsn_t			lsn,
448 	const byte*		src_frame,
449 	ulint			zip_size,
450 	byte*			dst_frame)
451 {
452 	uint size = uint(zip_size ? zip_size : srv_page_size);
453 	uint key_version = fil_crypt_get_latest_key_version(crypt_data);
454 	ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
455 	ut_ad(!ut_align_offset(src_frame, 8));
456 	ut_ad(!ut_align_offset(dst_frame, 8));
457 
458 	const bool page_compressed = fil_page_get_type(src_frame)
459 		== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED;
460 	uint header_len = FIL_PAGE_DATA;
461 
462 	if (page_compressed) {
463 		header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
464 	}
465 
466 	/* FIL page header is not encrypted */
467 	memcpy(dst_frame, src_frame, header_len);
468 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
469 			key_version);
470 
471 	/* Calculate the start offset in a page */
472 	uint		unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
473 	uint		srclen = size - unencrypted_bytes;
474 	const byte*	src = src_frame + header_len;
475 	byte*		dst = dst_frame + header_len;
476 	uint32		dstlen = 0;
477 	ib_uint32_t	checksum = 0;
478 
479 	if (page_compressed) {
480 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
481 	}
482 
483 	int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
484 					   crypt_data, key_version,
485 					   (uint32)space, (uint32)offset, lsn);
486 	ut_a(rc == MY_AES_OK);
487 	ut_a(dstlen == srclen);
488 
489 	/* For compressed tables we do not store the FIL header because
490 	the whole page is not stored to the disk. In compressed tables only
491 	the FIL header + compressed (and now encrypted) payload alligned
492 	to sector boundary is written. */
493 	if (!page_compressed) {
494 		/* FIL page trailer is also not encrypted */
495 		static_assert(FIL_PAGE_DATA_END == 8, "alignment");
496 		memcpy_aligned<8>(dst_frame + size - FIL_PAGE_DATA_END,
497 				  src_frame + size - FIL_PAGE_DATA_END, 8);
498 	} else {
499 		/* Clean up rest of buffer */
500 		memset(dst_frame+header_len+srclen, 0,
501 		       size - (header_len + srclen));
502 	}
503 
504 	checksum = fil_crypt_calculate_checksum(zip_size, dst_frame);
505 
506 	/* store the post-encryption checksum after the key-version */
507 	mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4,
508 			checksum);
509 
510 	ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
511 
512 	srv_stats.pages_encrypted.inc();
513 
514 	return dst_frame;
515 }
516 
517 /** Encrypt a buffer for full checksum format.
518 @param[in,out]		crypt_data		Crypt data
519 @param[in]		space			space_id
520 @param[in]		offset			Page offset
521 @param[in]		lsn			Log sequence number
522 @param[in]		src_frame		Page to encrypt
523 @param[in,out]		dst_frame		Output buffer
524 @return encrypted buffer or NULL */
fil_encrypt_buf_for_full_crc32(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,byte * dst_frame)525 static byte* fil_encrypt_buf_for_full_crc32(
526 	fil_space_crypt_t*	crypt_data,
527 	ulint			space,
528 	ulint			offset,
529 	lsn_t			lsn,
530 	const byte*		src_frame,
531 	byte*			dst_frame)
532 {
533 	uint key_version = fil_crypt_get_latest_key_version(crypt_data);
534 	ut_d(bool corrupted = false);
535 	const uint size = buf_page_full_crc32_size(src_frame, NULL,
536 #ifdef UNIV_DEBUG
537 						   &corrupted
538 #else
539 						   NULL
540 #endif
541 						   );
542 	ut_ad(!corrupted);
543 	uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
544 			      + FIL_PAGE_FCRC32_CHECKSUM);
545 	const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
546 	byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
547 	uint dstlen = 0;
548 
549 	ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
550 
551 	/* Till FIL_PAGE_LSN, page is not encrypted */
552 	memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
553 
554 	/* Write key version to the page. */
555 	mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version);
556 
557 	int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
558 					   crypt_data, key_version,
559 					   uint(space), uint(offset), lsn);
560 	ut_a(rc == MY_AES_OK);
561 	ut_a(dstlen == srclen);
562 
563 	const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
564 	mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
565 	/* Clean the rest of the buffer. FIXME: Punch holes when writing! */
566 	memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
567 
568 	srv_stats.pages_encrypted.inc();
569 
570 	return dst_frame;
571 }
572 
573 /** Encrypt a buffer.
574 @param[in,out]		crypt_data		Crypt data
575 @param[in]		space			space_id
576 @param[in]		offset			Page offset
577 @param[in]		src_frame		Page to encrypt
578 @param[in]		zip_size		ROW_FORMAT=COMPRESSED
579 						page size, or 0
580 @param[in,out]		dst_frame		Output buffer
581 @param[in]		use_full_checksum	full crc32 algo is used
582 @return encrypted buffer or NULL */
fil_encrypt_buf(fil_space_crypt_t * crypt_data,ulint space,ulint offset,const byte * src_frame,ulint zip_size,byte * dst_frame,bool use_full_checksum)583 byte* fil_encrypt_buf(
584 	fil_space_crypt_t*	crypt_data,
585 	ulint			space,
586 	ulint			offset,
587 	const byte*		src_frame,
588 	ulint			zip_size,
589 	byte*			dst_frame,
590 	bool			use_full_checksum)
591 {
592 	const lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
593 
594 	if (use_full_checksum) {
595 		ut_ad(!zip_size);
596 		return fil_encrypt_buf_for_full_crc32(
597 			crypt_data, space, offset,
598 			lsn, src_frame, dst_frame);
599 	}
600 
601 	return fil_encrypt_buf_for_non_full_checksum(
602 		crypt_data, space, offset, lsn,
603 		src_frame, zip_size, dst_frame);
604 }
605 
606 /** Check whether these page types are allowed to encrypt.
607 @param[in]	space		tablespace object
608 @param[in]	src_frame	source page
609 @return true if it is valid page type */
fil_space_encrypt_valid_page_type(const fil_space_t * space,const byte * src_frame)610 static bool fil_space_encrypt_valid_page_type(
611 	const fil_space_t*	space,
612 	const byte*		src_frame)
613 {
614 	switch (fil_page_get_type(src_frame)) {
615 	case FIL_PAGE_RTREE:
616 		return space->full_crc32();
617 	case FIL_PAGE_TYPE_FSP_HDR:
618 	case FIL_PAGE_TYPE_XDES:
619 		return false;
620 	}
621 
622 	return true;
623 }
624 
625 /******************************************************************
626 Encrypt a page
627 
628 @param[in]		space		Tablespace
629 @param[in]		offset		Page offset
630 @param[in]		src_frame	Page to encrypt
631 @param[in,out]		dst_frame	Output buffer
632 @return encrypted buffer or NULL */
fil_space_encrypt(const fil_space_t * space,ulint offset,byte * src_frame,byte * dst_frame)633 byte* fil_space_encrypt(
634 	const fil_space_t*	space,
635 	ulint			offset,
636 	byte*			src_frame,
637 	byte*			dst_frame)
638 {
639 	if (!fil_space_encrypt_valid_page_type(space, src_frame)) {
640 		return src_frame;
641 	}
642 
643 	if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
644 		return (src_frame);
645 	}
646 
647 	ut_ad(space->referenced());
648 
649 	return fil_encrypt_buf(space->crypt_data, space->id, offset,
650 			       src_frame, space->zip_size(),
651 			       dst_frame, space->full_crc32());
652 }
653 
654 /** Decrypt a page for full checksum format.
655 @param[in]	space			space id
656 @param[in]	crypt_data		crypt_data
657 @param[in]	tmp_frame		Temporary buffer
658 @param[in,out]	src_frame		Page to decrypt
659 @return DB_SUCCESS or error */
fil_space_decrypt_full_crc32(ulint space,fil_space_crypt_t * crypt_data,byte * tmp_frame,byte * src_frame)660 static dberr_t fil_space_decrypt_full_crc32(
661 	ulint			space,
662 	fil_space_crypt_t*	crypt_data,
663 	byte*			tmp_frame,
664 	byte*			src_frame)
665 {
666 	uint key_version = mach_read_from_4(
667 		src_frame + FIL_PAGE_FCRC32_KEY_VERSION);
668 	lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
669 	uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
670 
671 	ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
672 
673 	ut_ad(crypt_data);
674 	ut_ad(crypt_data->is_encrypted());
675 
676 	memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
677 
678 	/* Calculate the offset where decryption starts */
679 	const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
680 	byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
681 	uint dstlen = 0;
682 	bool corrupted = false;
683 	uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
684 	if (UNIV_UNLIKELY(corrupted)) {
685 		return DB_DECRYPTION_FAILED;
686 	}
687 
688 	uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
689 			      + FIL_PAGE_FCRC32_CHECKSUM);
690 
691 	int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
692 					   crypt_data, key_version,
693 					   (uint) space, offset, lsn);
694 
695 	if (rc != MY_AES_OK || dstlen != srclen) {
696 		if (rc == -1) {
697 			return DB_DECRYPTION_FAILED;
698 		}
699 
700 		ib::fatal() << "Unable to decrypt data-block "
701 			    << " src: " << src << "srclen: "
702 			    << srclen << " buf: " << dst << "buflen: "
703 			    << dstlen << " return-code: " << rc
704 			    << " Can't continue!";
705 	}
706 
707 	/* Copy only checksum part in the trailer */
708 	memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
709 	       src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
710 	       FIL_PAGE_FCRC32_CHECKSUM);
711 
712 	srv_stats.pages_decrypted.inc();
713 
714 	return DB_SUCCESS; /* page was decrypted */
715 }
716 
717 /** Decrypt a page for non full checksum format.
718 @param[in]	crypt_data		crypt_data
719 @param[in]	tmp_frame		Temporary buffer
720 @param[in]	physical_size		page size
721 @param[in,out]	src_frame		Page to decrypt
722 @return DB_SUCCESS or error */
fil_space_decrypt_for_non_full_checksum(fil_space_crypt_t * crypt_data,byte * tmp_frame,ulint physical_size,byte * src_frame)723 static dberr_t fil_space_decrypt_for_non_full_checksum(
724 	fil_space_crypt_t*	crypt_data,
725 	byte*			tmp_frame,
726 	ulint			physical_size,
727 	byte*			src_frame)
728 {
729 	uint key_version = mach_read_from_4(
730 			src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
731 	bool page_compressed = (fil_page_get_type(src_frame)
732 				== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
733 	uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
734 	uint space = mach_read_from_4(
735 			src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
736 	ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
737 
738 	ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
739 	ut_a(crypt_data != NULL && crypt_data->is_encrypted());
740 
741 	/* read space & lsn */
742 	uint header_len = FIL_PAGE_DATA;
743 
744 	if (page_compressed) {
745 		header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
746 	}
747 
748 	/* Copy FIL page header, it is not encrypted */
749 	memcpy(tmp_frame, src_frame, header_len);
750 
751 	/* Calculate the offset where decryption starts */
752 	const byte* src = src_frame + header_len;
753 	byte* dst = tmp_frame + header_len;
754 	uint32 dstlen = 0;
755 	uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END;
756 
757 	if (page_compressed) {
758 		srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
759 	}
760 
761 	int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
762 					   crypt_data, key_version,
763 					   space, offset, lsn);
764 
765 	if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
766 
767 		if (rc == -1) {
768 			return DB_DECRYPTION_FAILED;
769 		}
770 
771 		ib::fatal() << "Unable to decrypt data-block "
772 			    << " src: " << static_cast<const void*>(src)
773 			    << "srclen: "
774 			    << srclen << " buf: "
775 			    << static_cast<const void*>(dst) << "buflen: "
776 			    << dstlen << " return-code: " << rc
777 			    << " Can't continue!";
778 	}
779 
780 	/* For compressed tables we do not store the FIL header because
781 	the whole page is not stored to the disk. In compressed tables only
782 	the FIL header + compressed (and now encrypted) payload alligned
783 	to sector boundary is written. */
784 	if (!page_compressed) {
785 		/* Copy FIL trailer */
786 		memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END,
787 		       src_frame + physical_size - FIL_PAGE_DATA_END,
788 		       FIL_PAGE_DATA_END);
789 	}
790 
791 	srv_stats.pages_decrypted.inc();
792 
793 	return DB_SUCCESS; /* page was decrypted */
794 }
795 
796 /** Decrypt a page.
797 @param[in]	space_id		tablespace id
798 @param[in]	crypt_data		crypt_data
799 @param[in]	tmp_frame		Temporary buffer
800 @param[in]	physical_size		page size
801 @param[in]	fsp_flags		Tablespace flags
802 @param[in,out]	src_frame		Page to decrypt
803 @param[out]	err			DB_SUCCESS or DB_DECRYPTION_FAILED
804 @return DB_SUCCESS or error */
805 UNIV_INTERN
806 dberr_t
fil_space_decrypt(ulint space_id,fil_space_crypt_t * crypt_data,byte * tmp_frame,ulint physical_size,ulint fsp_flags,byte * src_frame)807 fil_space_decrypt(
808 	ulint			space_id,
809 	fil_space_crypt_t*	crypt_data,
810 	byte*			tmp_frame,
811 	ulint			physical_size,
812 	ulint			fsp_flags,
813 	byte*			src_frame)
814 {
815 	if (fil_space_t::full_crc32(fsp_flags)) {
816 		return fil_space_decrypt_full_crc32(
817 			space_id, crypt_data, tmp_frame, src_frame);
818 	}
819 
820 	return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame,
821 						       physical_size,
822 						       src_frame);
823 }
824 
825 /**
826 Decrypt a page.
827 @param[in]	space			Tablespace
828 @param[in]	tmp_frame		Temporary buffer used for decrypting
829 @param[in,out]	src_frame		Page to decrypt
830 @return decrypted page, or original not encrypted page if decryption is
831 not needed.*/
832 UNIV_INTERN
833 byte*
fil_space_decrypt(const fil_space_t * space,byte * tmp_frame,byte * src_frame)834 fil_space_decrypt(
835 	const fil_space_t* space,
836 	byte*		tmp_frame,
837 	byte*		src_frame)
838 {
839 	const ulint physical_size = space->physical_size();
840 
841 	ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
842 	ut_ad(space->referenced());
843 
844 	if (DB_SUCCESS != fil_space_decrypt(space->id, space->crypt_data,
845 					    tmp_frame, physical_size,
846 					    space->flags, src_frame)) {
847 		return nullptr;
848 	}
849 
850 	/* Copy the decrypted page back to page buffer, not
851 	really any other options. */
852 	memcpy(src_frame, tmp_frame, physical_size);
853 
854 	return src_frame;
855 }
856 
857 /**
858 Calculate post encryption checksum
859 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
860 @param[in]	dst_frame	Block where checksum is calculated
861 @return page checksum
862 not needed. */
863 uint32_t
fil_crypt_calculate_checksum(ulint zip_size,const byte * dst_frame)864 fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame)
865 {
866 	/* For encrypted tables we use only crc32 and strict_crc32 */
867 	return zip_size
868 		? page_zip_calc_checksum(dst_frame, zip_size,
869 					 SRV_CHECKSUM_ALGORITHM_CRC32)
870 		: buf_calc_page_crc32(dst_frame);
871 }
872 
873 /***********************************************************************/
874 
875 /** A copy of global key state */
876 struct key_state_t {
key_state_tkey_state_t877 	key_state_t() : key_id(0), key_version(0),
878 			rotate_key_age(srv_fil_crypt_rotate_key_age) {}
operator ==key_state_t879 	bool operator==(const key_state_t& other) const {
880 		return key_version == other.key_version &&
881 			rotate_key_age == other.rotate_key_age;
882 	}
883 	uint key_id;
884 	uint key_version;
885 	uint rotate_key_age;
886 };
887 
888 /***********************************************************************
889 Copy global key state
890 @param[in,out]	new_state	key state
891 @param[in]	crypt_data	crypt data */
892 static void
fil_crypt_get_key_state(key_state_t * new_state,fil_space_crypt_t * crypt_data)893 fil_crypt_get_key_state(
894 	key_state_t*			new_state,
895 	fil_space_crypt_t*		crypt_data)
896 {
897 	if (srv_encrypt_tables) {
898 		new_state->key_version = crypt_data->key_get_latest_version();
899 		new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
900 
901 		ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
902 	} else {
903 		new_state->key_version = 0;
904 		new_state->rotate_key_age = 0;
905 	}
906 }
907 
908 /***********************************************************************
909 Check if a key needs rotation given a key_state
910 @param[in]	crypt_data		Encryption information
911 @param[in]	key_version		Current key version
912 @param[in]	latest_key_version	Latest key version
913 @param[in]	rotate_key_age		when to rotate
914 @return true if key needs rotation, false if not */
915 static bool
fil_crypt_needs_rotation(const fil_space_crypt_t * crypt_data,uint key_version,uint latest_key_version,uint rotate_key_age)916 fil_crypt_needs_rotation(
917 	const fil_space_crypt_t*	crypt_data,
918 	uint				key_version,
919 	uint				latest_key_version,
920 	uint				rotate_key_age)
921 {
922 	if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
923 		return false;
924 	}
925 
926 	if (key_version == 0 && latest_key_version != 0) {
927 		/* this is rotation unencrypted => encrypted
928 		* ignore rotate_key_age */
929 		return true;
930 	}
931 
932 	if (latest_key_version == 0 && key_version != 0) {
933 		if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
934 			/* this is rotation encrypted => unencrypted */
935 			return true;
936 		}
937 		return false;
938 	}
939 
940 	if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
941 	    && crypt_data->type == CRYPT_SCHEME_1
942 	    && !srv_encrypt_tables) {
943 		/* This is rotation encrypted => unencrypted */
944 		return true;
945 	}
946 
947 	if (rotate_key_age == 0) {
948 		return false;
949 	}
950 
951 	/* this is rotation encrypted => encrypted,
952 	* only reencrypt if key is sufficiently old */
953 	if (key_version + rotate_key_age < latest_key_version) {
954 		return true;
955 	}
956 
957 	return false;
958 }
959 
960 /** Read page 0 and possible crypt data from there.
961 @param[in,out]	space		Tablespace */
962 static inline
963 void
fil_crypt_read_crypt_data(fil_space_t * space)964 fil_crypt_read_crypt_data(fil_space_t* space)
965 {
966 	if (space->crypt_data || space->size || !space->get_size()) {
967 		/* The encryption metadata has already been read, or
968 		the tablespace is not encrypted and the file has been
969 		opened already, or the file cannot be accessed,
970 		likely due to a concurrent DROP
971 		(possibly as part of TRUNCATE or ALTER TABLE).
972 		FIXME: The file can become unaccessible any time
973 		after this check! We should really remove this
974 		function and instead make crypt_data an integral
975 		part of fil_space_t. */
976 		return;
977 	}
978 
979 	const ulint zip_size = space->zip_size();
980 	mtr_t	mtr;
981 	mtr.start();
982 	if (buf_block_t* block = buf_page_get_gen(page_id_t(space->id, 0),
983 						  zip_size, RW_S_LATCH,
984 						  nullptr,
985 						  BUF_GET_POSSIBLY_FREED,
986 						  __FILE__, __LINE__, &mtr)) {
987 		if (block->page.status == buf_page_t::FREED) {
988 			goto func_exit;
989 		}
990 		mutex_enter(&fil_system.mutex);
991 		if (!space->crypt_data && !space->is_stopping()) {
992 			space->crypt_data = fil_space_read_crypt_data(
993 				zip_size, block->frame);
994 		}
995 		mutex_exit(&fil_system.mutex);
996 	}
997 func_exit:
998 	mtr.commit();
999 }
1000 
1001 /** Start encrypting a space
1002 @param[in,out]		space		Tablespace
1003 @return true if a recheck of tablespace is needed by encryption thread. */
fil_crypt_start_encrypting_space(fil_space_t * space)1004 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
1005 {
1006 	mutex_enter(&fil_crypt_threads_mutex);
1007 
1008 	fil_space_crypt_t *crypt_data = space->crypt_data;
1009 
1010 	/* If space is not encrypted and encryption is not enabled, then
1011 	do not continue encrypting the space. */
1012 	if (!crypt_data && !srv_encrypt_tables) {
1013 		mutex_exit(&fil_crypt_threads_mutex);
1014 		return false;
1015 	}
1016 
1017 	const bool recheck = fil_crypt_start_converting;
1018 
1019 	if (recheck || crypt_data || space->is_stopping()) {
1020 		mutex_exit(&fil_crypt_threads_mutex);
1021 		return recheck;
1022 	}
1023 
1024 	/* NOTE: we need to write and flush page 0 before publishing
1025 	* the crypt data. This so that after restart there is no
1026 	* risk of finding encrypted pages without having
1027 	* crypt data in page 0 */
1028 
1029 	/* 1 - create crypt data */
1030 	crypt_data = fil_space_create_crypt_data(
1031 		FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
1032 
1033 	if (crypt_data == NULL) {
1034 		mutex_exit(&fil_crypt_threads_mutex);
1035 		return false;
1036 	}
1037 
1038 	fil_crypt_start_converting = true;
1039 	mutex_exit(&fil_crypt_threads_mutex);
1040 
1041 	mtr_t mtr;
1042 	mtr.start();
1043 
1044 	/* 2 - get page 0 */
1045 	dberr_t err = DB_SUCCESS;
1046 	if (buf_block_t* block = buf_page_get_gen(
1047 		    page_id_t(space->id, 0), space->zip_size(),
1048 		    RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
1049 		    __FILE__, __LINE__, &mtr, &err)) {
1050 		if (block->page.status == buf_page_t::FREED) {
1051 			goto abort;
1052 		}
1053 
1054 		crypt_data->type = CRYPT_SCHEME_1;
1055 		crypt_data->min_key_version = 0; // all pages are unencrypted
1056 		crypt_data->rotate_state.start_time = time(0);
1057 		crypt_data->rotate_state.starting = true;
1058 		crypt_data->rotate_state.active_threads = 1;
1059 
1060 		mutex_enter(&fil_system.mutex);
1061 		const bool stopping = space->is_stopping();
1062 		if (!stopping) {
1063 			space->crypt_data = crypt_data;
1064 		}
1065 		mutex_exit(&fil_system.mutex);
1066 
1067 		if (stopping) {
1068 			goto abort;
1069 		}
1070 
1071 		/* 3 - write crypt data to page 0 */
1072 		mtr.set_named_space(space);
1073 		crypt_data->write_page0(block, &mtr);
1074 
1075 		mtr.commit();
1076 
1077 		/* 4 - sync tablespace before publishing crypt data */
1078 		while (buf_flush_list_space(space));
1079 
1080 		/* 5 - publish crypt data */
1081 		mutex_enter(&fil_crypt_threads_mutex);
1082 		mutex_enter(&crypt_data->mutex);
1083 		crypt_data->type = CRYPT_SCHEME_1;
1084 		ut_a(crypt_data->rotate_state.active_threads == 1);
1085 		crypt_data->rotate_state.active_threads = 0;
1086 		crypt_data->rotate_state.starting = false;
1087 
1088 		fil_crypt_start_converting = false;
1089 		mutex_exit(&crypt_data->mutex);
1090 		mutex_exit(&fil_crypt_threads_mutex);
1091 
1092 		return false;
1093 	}
1094 
1095 abort:
1096 	mtr.commit();
1097 	mutex_enter(&fil_crypt_threads_mutex);
1098 	fil_crypt_start_converting = false;
1099 	mutex_exit(&fil_crypt_threads_mutex);
1100 
1101 	crypt_data->~fil_space_crypt_t();
1102 	ut_free(crypt_data);
1103 	return false;
1104 }
1105 
1106 /** State of a rotation thread */
1107 struct rotate_thread_t {
rotate_thread_trotate_thread_t1108 	explicit rotate_thread_t(uint no) {
1109 		memset(this, 0, sizeof(* this));
1110 		thread_no = no;
1111 		first = true;
1112 		estimated_max_iops = 20;
1113 	}
1114 
1115 	uint thread_no;
1116 	bool first;		    /*!< is position before first space */
1117 	fil_space_t* space;	    /*!< current space or NULL */
1118 	uint32_t offset;	    /*!< current page number */
1119 	ulint batch;		    /*!< #pages to rotate */
1120 	uint  min_key_version_found;/*!< min key version found but not rotated */
1121 	lsn_t end_lsn;		    /*!< max lsn when rotating this space */
1122 
1123 	uint estimated_max_iops;   /*!< estimation of max iops */
1124 	uint allocated_iops;	   /*!< allocated iops */
1125 	ulint cnt_waited;	   /*!< #times waited during this slot */
1126 	uintmax_t sum_waited_us;   /*!< wait time during this slot */
1127 
1128 	fil_crypt_stat_t crypt_stat; // statistics
1129 
1130 	/** @return whether this thread should terminate */
should_shutdownrotate_thread_t1131 	bool should_shutdown() const {
1132 		switch (srv_shutdown_state) {
1133 		case SRV_SHUTDOWN_NONE:
1134 			return thread_no >= srv_n_fil_crypt_threads;
1135 		case SRV_SHUTDOWN_EXIT_THREADS:
1136 			/* srv_init_abort() must have been invoked */
1137 		case SRV_SHUTDOWN_CLEANUP:
1138 		case SRV_SHUTDOWN_INITIATED:
1139 			return true;
1140 		case SRV_SHUTDOWN_LAST_PHASE:
1141 			break;
1142 		}
1143 		ut_ad(0);
1144 		return true;
1145 	}
1146 };
1147 
1148 /** Avoid the removal of the tablespace from
1149 default_encrypt_list only when
1150 1) Another active encryption thread working on tablespace
1151 2) Eligible for tablespace key rotation
1152 3) Tablespace is in flushing phase
1153 @return true if tablespace should be removed from
1154 default encrypt */
fil_crypt_must_remove(const fil_space_t & space)1155 static bool fil_crypt_must_remove(const fil_space_t &space)
1156 {
1157   ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1158   fil_space_crypt_t *crypt_data = space.crypt_data;
1159   ut_ad(mutex_own(&fil_system.mutex));
1160   const ulong encrypt_tables= srv_encrypt_tables;
1161   if (!crypt_data)
1162     return !encrypt_tables;
1163   if (!crypt_data->is_key_found())
1164     return true;
1165 
1166   mutex_enter(&crypt_data->mutex);
1167   const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1168     (!crypt_data->rotate_state.flushing &&
1169      !encrypt_tables == !!crypt_data->min_key_version &&
1170      !crypt_data->rotate_state.active_threads);
1171   mutex_exit(&crypt_data->mutex);
1172   return remove;
1173 }
1174 
1175 /***********************************************************************
1176 Check if space needs rotation given a key_state
1177 @param[in,out]		state		Key rotation state
1178 @param[in,out]		key_state	Key state
1179 @param[in,out]		recheck		needs recheck ?
1180 @return true if space needs key rotation */
1181 static
1182 bool
fil_crypt_space_needs_rotation(rotate_thread_t * state,key_state_t * key_state,bool * recheck)1183 fil_crypt_space_needs_rotation(
1184 	rotate_thread_t*	state,
1185 	key_state_t*		key_state,
1186 	bool*			recheck)
1187 {
1188 	fil_space_t* space = state->space;
1189 
1190 	/* Make sure that tablespace is normal tablespace */
1191 	if (space->purpose != FIL_TYPE_TABLESPACE) {
1192 		return false;
1193 	}
1194 
1195 	ut_ad(space->referenced());
1196 
1197 	fil_space_crypt_t *crypt_data = space->crypt_data;
1198 
1199 	if (crypt_data == NULL) {
1200 		/**
1201 		* space has no crypt data
1202 		*   start encrypting it...
1203 		*/
1204 		*recheck = fil_crypt_start_encrypting_space(space);
1205 		crypt_data = space->crypt_data;
1206 
1207 		if (crypt_data == NULL) {
1208 			return false;
1209 		}
1210 
1211 		crypt_data->key_get_latest_version();
1212 	}
1213 
1214 	/* If used key_id is not found from encryption plugin we can't
1215 	continue to rotate the tablespace */
1216 	if (!crypt_data->is_key_found()) {
1217 		return false;
1218 	}
1219 
1220 	bool need_key_rotation = false;
1221 	mutex_enter(&crypt_data->mutex);
1222 
1223 	do {
1224 		/* prevent threads from starting to rotate space */
1225 		if (crypt_data->rotate_state.starting) {
1226 			/* recheck this space later */
1227 			*recheck = true;
1228 			break;
1229 		}
1230 
1231 		/* prevent threads from starting to rotate space */
1232 		if (space->is_stopping()) {
1233 			break;
1234 		}
1235 
1236 		if (crypt_data->rotate_state.flushing) {
1237 			break;
1238 		}
1239 
1240 		/* No need to rotate space if encryption is disabled */
1241 		if (crypt_data->not_encrypted()) {
1242 			break;
1243 		}
1244 
1245 		if (crypt_data->key_id != key_state->key_id) {
1246 			key_state->key_id= crypt_data->key_id;
1247 			fil_crypt_get_key_state(key_state, crypt_data);
1248 		}
1249 
1250 		need_key_rotation = fil_crypt_needs_rotation(
1251 			crypt_data,
1252 			crypt_data->min_key_version,
1253 			key_state->key_version,
1254 			key_state->rotate_key_age);
1255 	} while (0);
1256 
1257 	mutex_exit(&crypt_data->mutex);
1258 	return need_key_rotation;
1259 }
1260 
1261 /***********************************************************************
1262 Update global statistics with thread statistics
1263 @param[in,out]	state		key rotation statistics */
1264 static void
fil_crypt_update_total_stat(rotate_thread_t * state)1265 fil_crypt_update_total_stat(
1266 	rotate_thread_t *state)
1267 {
1268 	mutex_enter(&crypt_stat_mutex);
1269 	crypt_stat.pages_read_from_cache +=
1270 		state->crypt_stat.pages_read_from_cache;
1271 	crypt_stat.pages_read_from_disk +=
1272 		state->crypt_stat.pages_read_from_disk;
1273 	crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1274 	crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1275 	// remote old estimate
1276 	crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1277 	// add new estimate
1278 	crypt_stat.estimated_iops += state->estimated_max_iops;
1279 	mutex_exit(&crypt_stat_mutex);
1280 
1281 	// make new estimate "current" estimate
1282 	memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1283 	// record our old (current) estimate
1284 	state->crypt_stat.estimated_iops = state->estimated_max_iops;
1285 }
1286 
1287 /***********************************************************************
1288 Allocate iops to thread from global setting,
1289 used before starting to rotate a space.
1290 @param[in,out]		state		Rotation state
1291 @return true if allocation succeeded, false if failed */
1292 static
1293 bool
fil_crypt_alloc_iops(rotate_thread_t * state)1294 fil_crypt_alloc_iops(
1295 	rotate_thread_t *state)
1296 {
1297 	ut_ad(state->allocated_iops == 0);
1298 
1299 	/* We have not yet selected the space to rotate, thus
1300 	state might not contain space and we can't check
1301 	its status yet. */
1302 
1303 	uint max_iops = state->estimated_max_iops;
1304 	mutex_enter(&fil_crypt_threads_mutex);
1305 
1306 	if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1307 		/* this can happen when user decreases srv_fil_crypt_iops */
1308 		mutex_exit(&fil_crypt_threads_mutex);
1309 		return false;
1310 	}
1311 
1312 	uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1313 
1314 	if (alloc > max_iops) {
1315 		alloc = max_iops;
1316 	}
1317 
1318 	n_fil_crypt_iops_allocated += alloc;
1319 	mutex_exit(&fil_crypt_threads_mutex);
1320 
1321 	state->allocated_iops = alloc;
1322 
1323 	return alloc > 0;
1324 }
1325 
1326 /***********************************************************************
1327 Reallocate iops to thread,
1328 used when inside a space
1329 @param[in,out]		state		Rotation state */
1330 static
1331 void
fil_crypt_realloc_iops(rotate_thread_t * state)1332 fil_crypt_realloc_iops(
1333 	rotate_thread_t *state)
1334 {
1335 	ut_a(state->allocated_iops > 0);
1336 
1337 	if (10 * state->cnt_waited > state->batch) {
1338 		/* if we waited more than 10% re-estimate max_iops */
1339 		ulint avg_wait_time_us =
1340 			ulint(state->sum_waited_us / state->cnt_waited);
1341 
1342 		if (avg_wait_time_us == 0) {
1343 			avg_wait_time_us = 1; // prevent division by zero
1344 		}
1345 
1346 		DBUG_PRINT("ib_crypt",
1347 			("thr_no: %u - update estimated_max_iops from %u to "
1348 			 ULINTPF ".",
1349 			state->thread_no,
1350 			state->estimated_max_iops,
1351 			1000000 / avg_wait_time_us));
1352 
1353 		state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1354 		state->cnt_waited = 0;
1355 		state->sum_waited_us = 0;
1356 	} else {
1357 		DBUG_PRINT("ib_crypt",
1358 			   ("thr_no: %u only waited " ULINTPF
1359 			    "%% skip re-estimate.",
1360 			    state->thread_no,
1361 			    (100 * state->cnt_waited)
1362 			    / (state->batch ? state->batch : 1)));
1363 	}
1364 
1365 	if (state->estimated_max_iops <= state->allocated_iops) {
1366 		/* return extra iops */
1367 		uint extra = state->allocated_iops - state->estimated_max_iops;
1368 
1369 		if (extra > 0) {
1370 			mutex_enter(&fil_crypt_threads_mutex);
1371 			if (n_fil_crypt_iops_allocated < extra) {
1372 				/* unknown bug!
1373 				* crash in debug
1374 				* keep n_fil_crypt_iops_allocated unchanged
1375 				* in release */
1376 				ut_ad(0);
1377 				extra = 0;
1378 			}
1379 			n_fil_crypt_iops_allocated -= extra;
1380 			state->allocated_iops -= extra;
1381 
1382 			if (state->allocated_iops == 0) {
1383 				/* no matter how slow io system seems to be
1384 				* never decrease allocated_iops to 0... */
1385 				state->allocated_iops ++;
1386 				n_fil_crypt_iops_allocated ++;
1387 			}
1388 
1389 			os_event_set(fil_crypt_threads_event);
1390 			mutex_exit(&fil_crypt_threads_mutex);
1391 		}
1392 	} else {
1393 		/* see if there are more to get */
1394 		mutex_enter(&fil_crypt_threads_mutex);
1395 		if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1396 			/* there are extra iops free */
1397 			uint extra = srv_n_fil_crypt_iops -
1398 				n_fil_crypt_iops_allocated;
1399 			if (state->allocated_iops + extra >
1400 			    state->estimated_max_iops) {
1401 				/* but don't alloc more than our max */
1402 				extra = state->estimated_max_iops -
1403 					state->allocated_iops;
1404 			}
1405 			n_fil_crypt_iops_allocated += extra;
1406 			state->allocated_iops += extra;
1407 
1408 			DBUG_PRINT("ib_crypt",
1409 				("thr_no: %u increased iops from %u to %u.",
1410 				state->thread_no,
1411 				state->allocated_iops - extra,
1412 				state->allocated_iops));
1413 
1414 		}
1415 		mutex_exit(&fil_crypt_threads_mutex);
1416 	}
1417 
1418 	fil_crypt_update_total_stat(state);
1419 }
1420 
1421 /** Release excess allocated iops
1422 @param state   rotation state
1423 @param wake    whether to wake up other threads */
fil_crypt_return_iops(rotate_thread_t * state,bool wake=true)1424 static void fil_crypt_return_iops(rotate_thread_t *state, bool wake= true)
1425 {
1426 	if (state->allocated_iops > 0) {
1427 		uint iops = state->allocated_iops;
1428 		mutex_enter(&fil_crypt_threads_mutex);
1429 		if (n_fil_crypt_iops_allocated < iops) {
1430 			/* unknown bug!
1431 			* crash in debug
1432 			* keep n_fil_crypt_iops_allocated unchanged
1433 			* in release */
1434 			ut_ad(0);
1435 			iops = 0;
1436 		}
1437 
1438 		n_fil_crypt_iops_allocated -= iops;
1439 		state->allocated_iops = 0;
1440 		if (wake) {
1441 			os_event_set(fil_crypt_threads_event);
1442 		}
1443 		mutex_exit(&fil_crypt_threads_mutex);
1444 	}
1445 
1446 	fil_crypt_update_total_stat(state);
1447 }
1448 
1449 /** Acquire a tablespace reference.
1450 @return whether a tablespace reference was successfully acquired */
acquire_if_not_stopped()1451 inline bool fil_space_t::acquire_if_not_stopped()
1452 {
1453   ut_ad(mutex_own(&fil_system.mutex));
1454   const uint32_t n= acquire_low();
1455   if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
1456     return true;
1457   if (UNIV_UNLIKELY(n & STOPPING))
1458     return false;
1459   return UNIV_LIKELY(!(n & CLOSING)) || prepare(true);
1460 }
1461 
fil_crypt_must_default_encrypt()1462 bool fil_crypt_must_default_encrypt()
1463 {
1464   return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1465 }
1466 
1467 /** Return the next tablespace from default_encrypt_tables list.
1468 @param space   previous tablespace (nullptr to start from the start)
1469 @param recheck whether the removal condition needs to be rechecked after
1470 the encryption parameters were changed
1471 @param encrypt expected state of innodb_encrypt_tables
1472 @return the next tablespace to process (n_pending_ops incremented)
1473 @retval fil_system.temp_space if there is no work to do
1474 @retval nullptr upon reaching the end of the iteration */
default_encrypt_next(fil_space_t * space,bool recheck,bool encrypt)1475 inline fil_space_t *fil_system_t::default_encrypt_next(fil_space_t *space,
1476                                                        bool recheck,
1477                                                        bool encrypt)
1478 {
1479   ut_ad(mutex_own(&mutex));
1480 
1481   sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1482     space && space->is_in_default_encrypt
1483     ? space
1484     : default_encrypt_tables.begin();
1485   const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1486     default_encrypt_tables.end();
1487 
1488   if (space)
1489   {
1490     const bool released= !space->release();
1491 
1492     if (space->is_in_default_encrypt)
1493     {
1494       while (++it != end &&
1495              (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1496 
1497       /* If one of the encryption threads already started
1498       the encryption of the table then don't remove the
1499       unencrypted spaces from default encrypt list.
1500 
1501       If there is a change in innodb_encrypt_tables variables
1502       value then don't remove the last processed tablespace
1503       from the default encrypt list. */
1504       if (released && !recheck && fil_crypt_must_remove(*space))
1505       {
1506         ut_a(!default_encrypt_tables.empty());
1507         default_encrypt_tables.remove(*space);
1508         space->is_in_default_encrypt= false;
1509       }
1510     }
1511   }
1512   else while (it != end &&
1513 	      (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1514   {
1515     /* Find the next suitable default encrypt table if
1516     beginning of default_encrypt_tables list has been scheduled
1517     to be deleted */
1518     it++;
1519   }
1520 
1521   if (it == end)
1522     return temp_space;
1523 
1524   do
1525   {
1526     space= &*it;
1527     if (space->acquire_if_not_stopped())
1528       return space;
1529     if (++it == end)
1530       return nullptr;
1531   }
1532   while (!UT_LIST_GET_LEN(it->chain) || it->is_stopping());
1533 
1534   return nullptr;
1535 }
1536 
1537 /** Determine the next tablespace for encryption key rotation.
1538 @param space    current tablespace (nullptr to start from the beginning)
1539 @param recheck  whether the removal condition needs to be rechecked after
1540 encryption parameters were changed
1541 @param encrypt  expected state of innodb_encrypt_tables
1542 @return the next tablespace
1543 @retval fil_system.temp_space if there is no work to do
1544 @retval nullptr upon reaching the end of the iteration */
next(fil_space_t * space,bool recheck,bool encrypt)1545 inline fil_space_t *fil_space_t::next(fil_space_t *space, bool recheck,
1546                                       bool encrypt)
1547 {
1548   mutex_enter(&fil_system.mutex);
1549 
1550   if (fil_crypt_must_default_encrypt())
1551     space= fil_system.default_encrypt_next(space, recheck, encrypt);
1552   else
1553   {
1554     if (!space)
1555       space= UT_LIST_GET_FIRST(fil_system.space_list);
1556     else
1557     {
1558       /* Move on to the next fil_space_t */
1559       space->release();
1560       space= UT_LIST_GET_NEXT(space_list, space);
1561     }
1562 
1563     for (; space; space= UT_LIST_GET_NEXT(space_list, space))
1564     {
1565       if (space->purpose != FIL_TYPE_TABLESPACE)
1566         continue;
1567       const uint32_t n= space->acquire_low();
1568       if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
1569         break;
1570       if (!(n & STOPPING) && space->prepare(true))
1571         break;
1572     }
1573   }
1574 
1575   mutex_exit(&fil_system.mutex);
1576   return space;
1577 }
1578 
1579 /** Search for a space needing rotation
1580 @param[in,out]	key_state	Key state
1581 @param[in,out]	state		Rotation state
1582 @param[in,out]	recheck		recheck of the tablespace is needed or
1583 				still encryption thread does write page 0 */
fil_crypt_find_space_to_rotate(key_state_t * key_state,rotate_thread_t * state,bool * recheck)1584 static bool fil_crypt_find_space_to_rotate(
1585 	key_state_t*		key_state,
1586 	rotate_thread_t*	state,
1587 	bool*			recheck)
1588 {
1589 	/* we need iops to start rotating */
1590 	while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1591 		if (state->space && state->space->is_stopping()) {
1592 			state->space->release();
1593 			state->space = NULL;
1594 		}
1595 
1596 		os_event_reset(fil_crypt_threads_event);
1597 		os_event_wait_time(fil_crypt_threads_event, 100000);
1598 	}
1599 
1600 	if (state->should_shutdown()) {
1601 		if (state->space) {
1602 			state->space->release();
1603 			state->space = NULL;
1604 		}
1605 		return false;
1606 	}
1607 
1608 	if (state->first) {
1609 		state->first = false;
1610 		if (state->space) {
1611 			state->space->release();
1612 		}
1613 		state->space = NULL;
1614 	}
1615 
1616 	bool wake;
1617 	for (;;) {
1618 		state->space = fil_space_t::next(state->space, *recheck,
1619 						 key_state->key_version != 0);
1620 		wake = state->should_shutdown();
1621 
1622 		if (state->space == fil_system.temp_space) {
1623 			goto done;
1624 		} else if (wake) {
1625 			break;
1626 		} else {
1627 			wake = true;
1628 		}
1629 
1630 		if (!state->space) {
1631 			break;
1632 		}
1633 
1634 		/* If there is no crypt data and we have not yet read
1635 		page 0 for this tablespace, we need to read it before
1636 		we can continue. */
1637 		if (!state->space->crypt_data) {
1638 			fil_crypt_read_crypt_data(state->space);
1639 		}
1640 
1641 		if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1642 			ut_ad(key_state->key_id);
1643 			/* init state->min_key_version_found before
1644 			* starting on a space */
1645 			state->min_key_version_found = key_state->key_version;
1646 			return true;
1647 		}
1648 	}
1649 
1650 	if (state->space) {
1651 		state->space->release();
1652 done:
1653 		state->space = NULL;
1654 	}
1655 
1656 	/* no work to do; release our allocation of I/O capacity */
1657 	fil_crypt_return_iops(state, wake);
1658 
1659 	return false;
1660 
1661 }
1662 
1663 /***********************************************************************
1664 Start rotating a space
1665 @param[in]	key_state		Key state
1666 @param[in,out]	state			Rotation state */
1667 static
1668 void
fil_crypt_start_rotate_space(const key_state_t * key_state,rotate_thread_t * state)1669 fil_crypt_start_rotate_space(
1670 	const key_state_t*	key_state,
1671 	rotate_thread_t*	state)
1672 {
1673 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
1674 
1675 	ut_ad(crypt_data);
1676 	mutex_enter(&crypt_data->mutex);
1677 	ut_ad(key_state->key_id == crypt_data->key_id);
1678 
1679 	if (crypt_data->rotate_state.active_threads == 0) {
1680 		/* only first thread needs to init */
1681 		crypt_data->rotate_state.next_offset = 1; // skip page 0
1682 		/* no need to rotate beyond current max
1683 		* if space extends, it will be encrypted with newer version */
1684 		/* FIXME: max_offset could be removed and instead
1685 		space->size consulted.*/
1686 		crypt_data->rotate_state.max_offset = state->space->size;
1687 		crypt_data->rotate_state.end_lsn = 0;
1688 		crypt_data->rotate_state.min_key_version_found =
1689 			key_state->key_version;
1690 
1691 		crypt_data->rotate_state.start_time = time(0);
1692 
1693 		if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1694 			crypt_data->is_encrypted() &&
1695 			key_state->key_version != 0) {
1696 			/* this is rotation unencrypted => encrypted */
1697 			crypt_data->type = CRYPT_SCHEME_1;
1698 		}
1699 	}
1700 
1701 	/* count active threads in space */
1702 	crypt_data->rotate_state.active_threads++;
1703 
1704 	/* Initialize thread local state */
1705 	state->end_lsn = crypt_data->rotate_state.end_lsn;
1706 	state->min_key_version_found =
1707 		crypt_data->rotate_state.min_key_version_found;
1708 
1709 	mutex_exit(&crypt_data->mutex);
1710 }
1711 
1712 /***********************************************************************
1713 Search for batch of pages needing rotation
1714 @param[in]	key_state		Key state
1715 @param[in,out]	state			Rotation state
1716 @return true if page needing key rotation found, false if not found */
1717 static
1718 bool
fil_crypt_find_page_to_rotate(const key_state_t * key_state,rotate_thread_t * state)1719 fil_crypt_find_page_to_rotate(
1720 	const key_state_t*	key_state,
1721 	rotate_thread_t*	state)
1722 {
1723 	ulint batch = srv_alloc_time * state->allocated_iops;
1724 	fil_space_t* space = state->space;
1725 
1726 	ut_ad(!space || space->referenced());
1727 
1728 	/* If space is marked to be dropped stop rotation. */
1729 	if (!space || space->is_stopping()) {
1730 		return false;
1731 	}
1732 
1733 	fil_space_crypt_t *crypt_data = space->crypt_data;
1734 
1735 	mutex_enter(&crypt_data->mutex);
1736 	ut_ad(key_state->key_id == crypt_data->key_id);
1737 
1738 	bool found = crypt_data->rotate_state.max_offset >=
1739 		crypt_data->rotate_state.next_offset;
1740 
1741 	if (found) {
1742 		state->offset = crypt_data->rotate_state.next_offset;
1743 		ulint remaining = crypt_data->rotate_state.max_offset -
1744 			crypt_data->rotate_state.next_offset;
1745 
1746 		if (batch <= remaining) {
1747 			state->batch = batch;
1748 		} else {
1749 			state->batch = remaining;
1750 		}
1751 	}
1752 
1753 	crypt_data->rotate_state.next_offset += uint32_t(batch);
1754 	mutex_exit(&crypt_data->mutex);
1755 	return found;
1756 }
1757 
1758 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1759 	fil_crypt_get_page_throttle_func(state, offset, mtr, \
1760 					 sleeptime_ms, __FILE__, __LINE__)
1761 
1762 /***********************************************************************
1763 Get a page and compute sleep time
1764 @param[in,out]		state		Rotation state
1765 @param[in]		offset		Page offset
1766 @param[in,out]		mtr		Minitransaction
1767 @param[out]		sleeptime_ms	Sleep time
1768 @param[in]		file		File where called
1769 @param[in]		line		Line where called
1770 @return page or NULL*/
1771 static
1772 buf_block_t*
fil_crypt_get_page_throttle_func(rotate_thread_t * state,uint32_t offset,mtr_t * mtr,ulint * sleeptime_ms,const char * file,unsigned line)1773 fil_crypt_get_page_throttle_func(
1774 	rotate_thread_t*	state,
1775 	uint32_t		offset,
1776 	mtr_t*			mtr,
1777 	ulint*			sleeptime_ms,
1778 	const char*		file,
1779 	unsigned		line)
1780 {
1781 	fil_space_t* space = state->space;
1782 	const ulint zip_size = space->zip_size();
1783 	const page_id_t page_id(space->id, offset);
1784 	ut_ad(space->referenced());
1785 
1786 	/* Before reading from tablespace we need to make sure that
1787 	the tablespace is not about to be dropped. */
1788 	if (space->is_stopping()) {
1789 		return NULL;
1790 	}
1791 
1792 	dberr_t err = DB_SUCCESS;
1793 	buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
1794 					      NULL,
1795 					      BUF_PEEK_IF_IN_POOL, file, line,
1796 					      mtr, &err);
1797 	if (block != NULL) {
1798 		/* page was in buffer pool */
1799 		state->crypt_stat.pages_read_from_cache++;
1800 		return block;
1801 	}
1802 
1803 	if (space->is_stopping()) {
1804 		return NULL;
1805 	}
1806 
1807 	if (fseg_page_is_free(space, state->offset)) {
1808 		/* page is already freed */
1809 		return NULL;
1810 	}
1811 
1812 	state->crypt_stat.pages_read_from_disk++;
1813 
1814 	const ulonglong start = my_interval_timer();
1815 	block = buf_page_get_gen(page_id, zip_size,
1816 				 RW_X_LATCH,
1817 				 NULL, BUF_GET_POSSIBLY_FREED,
1818 				file, line, mtr, &err);
1819 	const ulonglong end = my_interval_timer();
1820 
1821 	state->cnt_waited++;
1822 
1823 	if (end > start) {
1824 		state->sum_waited_us += (end - start) / 1000;
1825 	}
1826 
1827 	/* average page load */
1828 	ulint add_sleeptime_ms = 0;
1829 	ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1830 	ulint alloc_wait_us = 1000000 / state->allocated_iops;
1831 
1832 	if (avg_wait_time_us < alloc_wait_us) {
1833 		/* we reading faster than we allocated */
1834 		add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1835 	} else {
1836 		/* if page load time is longer than we want, skip sleeping */
1837 	}
1838 
1839 	*sleeptime_ms += add_sleeptime_ms;
1840 
1841 	return block;
1842 }
1843 
1844 /***********************************************************************
1845 Rotate one page
1846 @param[in,out]		key_state		Key state
1847 @param[in,out]		state			Rotation state */
1848 static
1849 void
fil_crypt_rotate_page(const key_state_t * key_state,rotate_thread_t * state)1850 fil_crypt_rotate_page(
1851 	const key_state_t*	key_state,
1852 	rotate_thread_t*	state)
1853 {
1854 	fil_space_t*space = state->space;
1855 	ulint space_id = space->id;
1856 	uint32_t offset = state->offset;
1857 	ulint sleeptime_ms = 0;
1858 	fil_space_crypt_t *crypt_data = space->crypt_data;
1859 
1860 	ut_ad(space->referenced());
1861 	ut_ad(offset > 0);
1862 
1863 	/* In fil_crypt_thread where key rotation is done we have
1864 	acquired space and checked that this space is not yet
1865 	marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
1866 	Check here also to give DROP TABLE or similar a change. */
1867 	if (space->is_stopping()) {
1868 		return;
1869 	}
1870 
1871 	if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
1872 		/* don't encrypt this as it contains address to dblwr buffer */
1873 		return;
1874 	}
1875 
1876 	mtr_t mtr;
1877 	mtr.start();
1878 	if (buf_block_t* block = fil_crypt_get_page_throttle(state,
1879 							     offset, &mtr,
1880 							     &sleeptime_ms)) {
1881 		bool modified = false;
1882 		byte* frame = buf_block_get_frame(block);
1883 		const lsn_t block_lsn = mach_read_from_8(FIL_PAGE_LSN + frame);
1884 		uint kv = buf_page_get_key_version(frame, space->flags);
1885 
1886 		if (block->page.status == buf_page_t::FREED) {
1887 			/* Do not modify freed pages to avoid an assertion
1888 			failure on recovery.*/
1889 		} else if (block->page.oldest_modification() > 1) {
1890 			/* Do not unnecessarily touch pages that are
1891 			already dirty. */
1892 		} else if (space->is_stopping()) {
1893 			/* The tablespace is closing (in DROP TABLE or
1894 			TRUNCATE TABLE or similar): avoid further access */
1895 		} else if (!kv && !*reinterpret_cast<uint16_t*>
1896 			   (&frame[FIL_PAGE_TYPE])) {
1897 			/* It looks like this page is not
1898 			allocated. Because key rotation is accessing
1899 			pages in a pattern that is unlike the normal
1900 			B-tree and undo log access pattern, we cannot
1901 			invoke fseg_page_is_free() here, because that
1902 			could result in a deadlock. If we invoked
1903 			fseg_page_is_free() and released the
1904 			tablespace latch before acquiring block->lock,
1905 			then the fseg_page_is_free() information
1906 			could be stale already. */
1907 
1908 			/* If the data file was originally created
1909 			before MariaDB 10.0 or MySQL 5.6, some
1910 			allocated data pages could carry 0 in
1911 			FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
1912 			pages will be updated in
1913 			buf_flush_init_for_writing() when the page
1914 			is modified the next time.
1915 
1916 			Also, when the doublewrite buffer pages are
1917 			allocated on bootstrap in a non-debug build,
1918 			some dummy pages will be allocated, with 0 in
1919 			the FIL_PAGE_TYPE. Those pages should be
1920 			skipped from key rotation forever. */
1921 		} else if (fil_crypt_needs_rotation(
1922 				crypt_data,
1923 				kv,
1924 				key_state->key_version,
1925 				key_state->rotate_key_age)) {
1926 
1927 			mtr.set_named_space(space);
1928 			modified = true;
1929 
1930 			/* force rotation by dummy updating page */
1931 			mtr.write<1,mtr_t::FORCED>(*block,
1932 						   &frame[FIL_PAGE_SPACE_ID],
1933 						   frame[FIL_PAGE_SPACE_ID]);
1934 
1935 			/* statistics */
1936 			state->crypt_stat.pages_modified++;
1937 		} else {
1938 			if (crypt_data->is_encrypted()) {
1939 				if (kv < state->min_key_version_found) {
1940 					state->min_key_version_found = kv;
1941 				}
1942 			}
1943 		}
1944 
1945 		mtr.commit();
1946 		lsn_t end_lsn = mtr.commit_lsn();
1947 
1948 
1949 		if (modified) {
1950 			/* if we modified page, we take lsn from mtr */
1951 			ut_a(end_lsn > state->end_lsn);
1952 			ut_a(end_lsn > block_lsn);
1953 			state->end_lsn = end_lsn;
1954 		} else {
1955 			/* if we did not modify page, check for max lsn */
1956 			if (block_lsn > state->end_lsn) {
1957 				state->end_lsn = block_lsn;
1958 			}
1959 		}
1960 	} else {
1961 		/* If block read failed mtr memo and log should be empty. */
1962 		ut_ad(!mtr.has_modifications());
1963 		ut_ad(!mtr.is_dirty());
1964 		ut_ad(mtr.get_memo()->size() == 0);
1965 		ut_ad(mtr.get_log()->size() == 0);
1966 		mtr.commit();
1967 	}
1968 
1969 	if (sleeptime_ms) {
1970 		os_event_reset(fil_crypt_throttle_sleep_event);
1971 		os_event_wait_time(fil_crypt_throttle_sleep_event,
1972 				   1000 * sleeptime_ms);
1973 	}
1974 }
1975 
1976 /***********************************************************************
1977 Rotate a batch of pages
1978 @param[in,out]		key_state		Key state
1979 @param[in,out]		state			Rotation state */
1980 static
1981 void
fil_crypt_rotate_pages(const key_state_t * key_state,rotate_thread_t * state)1982 fil_crypt_rotate_pages(
1983 	const key_state_t*	key_state,
1984 	rotate_thread_t*	state)
1985 {
1986 	ulint space_id = state->space->id;
1987 	uint32_t end = std::min(state->offset + uint32_t(state->batch),
1988 				state->space->free_limit);
1989 
1990 	ut_ad(state->space->referenced());
1991 
1992 	for (; state->offset < end; state->offset++) {
1993 
1994 		/* we can't rotate pages in dblwr buffer as
1995 		* it's not possible to read those due to lots of asserts
1996 		* in buffer pool.
1997 		*
1998 		* However since these are only (short-lived) copies of
1999 		* real pages, they will be updated anyway when the
2000 		* real page is updated
2001 		*/
2002 		if (buf_dblwr.is_inside(page_id_t(space_id, state->offset))) {
2003 			continue;
2004 		}
2005 
2006 		/* If space is marked as stopping, stop rotating
2007 		pages. */
2008 		if (state->space->is_stopping()) {
2009 			break;
2010 		}
2011 
2012 		fil_crypt_rotate_page(key_state, state);
2013 	}
2014 }
2015 
2016 /***********************************************************************
2017 Flush rotated pages and then update page 0
2018 
2019 @param[in,out]		state	rotation state */
2020 static
2021 void
fil_crypt_flush_space(rotate_thread_t * state)2022 fil_crypt_flush_space(
2023 	rotate_thread_t*	state)
2024 {
2025 	fil_space_t* space = state->space;
2026 	fil_space_crypt_t *crypt_data = space->crypt_data;
2027 
2028 	ut_ad(space->referenced());
2029 
2030 	/* flush tablespace pages so that there are no pages left with old key */
2031 	lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2032 
2033 	if (end_lsn > 0 && !space->is_stopping()) {
2034 		ulint sum_pages = 0;
2035 		const ulonglong start = my_interval_timer();
2036 		while (buf_flush_list_space(space, &sum_pages));
2037 		if (sum_pages) {
2038 			const ulonglong end = my_interval_timer();
2039 
2040 			state->cnt_waited += sum_pages;
2041 			state->sum_waited_us += (end - start) / 1000;
2042 
2043 			/* statistics */
2044 			state->crypt_stat.pages_flushed += sum_pages;
2045 		}
2046 	}
2047 
2048 	if (crypt_data->min_key_version == 0) {
2049 		crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2050 	}
2051 
2052 	if (space->is_stopping()) {
2053 		return;
2054 	}
2055 
2056 	/* update page 0 */
2057 	mtr_t mtr;
2058 	mtr.start();
2059 
2060 	if (buf_block_t* block = buf_page_get_gen(
2061 		    page_id_t(space->id, 0), space->zip_size(),
2062 		    RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
2063 		    __FILE__, __LINE__, &mtr)) {
2064 		if (block->page.status != buf_page_t::FREED) {
2065 			mtr.set_named_space(space);
2066 			crypt_data->write_page0(block, &mtr);
2067 		}
2068 	}
2069 
2070 	mtr.commit();
2071 }
2072 
2073 /***********************************************************************
2074 Complete rotating a space
2075 @param[in,out]		state			Rotation state */
fil_crypt_complete_rotate_space(rotate_thread_t * state)2076 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2077 {
2078 	fil_space_crypt_t *crypt_data = state->space->crypt_data;
2079 
2080 	ut_ad(crypt_data);
2081 	ut_ad(state->space->referenced());
2082 
2083 	/* Space might already be dropped */
2084 	if (!state->space->is_stopping()) {
2085 		mutex_enter(&crypt_data->mutex);
2086 
2087 		/**
2088 		* Update crypt data state with state from thread
2089 		*/
2090 		if (state->min_key_version_found <
2091 			crypt_data->rotate_state.min_key_version_found) {
2092 			crypt_data->rotate_state.min_key_version_found =
2093 				state->min_key_version_found;
2094 		}
2095 
2096 		if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2097 			crypt_data->rotate_state.end_lsn = state->end_lsn;
2098 		}
2099 
2100 		ut_a(crypt_data->rotate_state.active_threads > 0);
2101 		crypt_data->rotate_state.active_threads--;
2102 		bool last = crypt_data->rotate_state.active_threads == 0;
2103 
2104 		/**
2105 		* check if space is fully done
2106 		* this as when threads shutdown, it could be that we "complete"
2107 		* iterating before we have scanned the full space.
2108 		*/
2109 		bool done = crypt_data->rotate_state.next_offset >=
2110 			crypt_data->rotate_state.max_offset;
2111 
2112 		/**
2113 		* we should flush space if we're last thread AND
2114 		* the iteration is done
2115 		*/
2116 		bool should_flush = last && done;
2117 
2118 		if (should_flush) {
2119 			/* we're the last active thread */
2120 			crypt_data->rotate_state.flushing = true;
2121 			crypt_data->min_key_version =
2122 				crypt_data->rotate_state.min_key_version_found;
2123 			mutex_exit(&crypt_data->mutex);
2124 			fil_crypt_flush_space(state);
2125 
2126 			mutex_enter(&crypt_data->mutex);
2127 			crypt_data->rotate_state.flushing = false;
2128 			mutex_exit(&crypt_data->mutex);
2129 		} else {
2130 			mutex_exit(&crypt_data->mutex);
2131 		}
2132 	} else {
2133 		mutex_enter(&crypt_data->mutex);
2134 		ut_a(crypt_data->rotate_state.active_threads > 0);
2135 		crypt_data->rotate_state.active_threads--;
2136 		mutex_exit(&crypt_data->mutex);
2137 	}
2138 }
2139 
2140 /*********************************************************************//**
2141 A thread which monitors global key state and rotates tablespaces accordingly
2142 @return a dummy parameter */
2143 extern "C" UNIV_INTERN
2144 os_thread_ret_t
DECLARE_THREAD(fil_crypt_thread)2145 DECLARE_THREAD(fil_crypt_thread)(void*)
2146 {
2147 	mutex_enter(&fil_crypt_threads_mutex);
2148 	uint thread_no = srv_n_fil_crypt_threads_started;
2149 	srv_n_fil_crypt_threads_started++;
2150 	os_event_set(fil_crypt_event); /* signal that we started */
2151 	mutex_exit(&fil_crypt_threads_mutex);
2152 
2153 	/* state of this thread */
2154 	rotate_thread_t thr(thread_no);
2155 
2156 	/* if we find a space that is starting, skip over it and recheck it later */
2157 	bool recheck = false;
2158 
2159 	while (!thr.should_shutdown()) {
2160 
2161 		key_state_t new_state;
2162 
2163 		while (!thr.should_shutdown()) {
2164 
2165 			/* wait for key state changes
2166 			* i.e either new key version of change or
2167 			* new rotate_key_age */
2168 			os_event_reset(fil_crypt_threads_event);
2169 
2170 			if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2171 				break;
2172 			}
2173 
2174 			if (recheck) {
2175 				/* check recheck here, after sleep, so
2176 				* that we don't busy loop while when one thread is starting
2177 				* a space*/
2178 				break;
2179 			}
2180 		}
2181 
2182 		recheck = false;
2183 		thr.first = true;      // restart from first tablespace
2184 
2185 		/* iterate all spaces searching for those needing rotation */
2186 		while (!thr.should_shutdown() &&
2187 		       fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2188 
2189 			/* we found a space to rotate */
2190 			fil_crypt_start_rotate_space(&new_state, &thr);
2191 
2192 			/* iterate all pages (cooperativly with other threads) */
2193 			while (!thr.should_shutdown() &&
2194 			       fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2195 
2196 				if (!thr.space->is_stopping()) {
2197 					/* rotate a (set) of pages */
2198 					fil_crypt_rotate_pages(&new_state, &thr);
2199 				}
2200 
2201 				/* If space is marked as stopping, release
2202 				space and stop rotation. */
2203 				if (thr.space->is_stopping()) {
2204 					fil_crypt_complete_rotate_space(&thr);
2205 					thr.space->release();
2206 					thr.space = NULL;
2207 					break;
2208 				}
2209 
2210 				/* realloc iops */
2211 				fil_crypt_realloc_iops(&thr);
2212 			}
2213 
2214 			/* complete rotation */
2215 			if (thr.space) {
2216 				fil_crypt_complete_rotate_space(&thr);
2217 			}
2218 
2219 			/* force key state refresh */
2220 			new_state.key_id = 0;
2221 
2222 			/* return iops */
2223 			fil_crypt_return_iops(&thr);
2224 		}
2225 	}
2226 
2227 	/* return iops if shutting down */
2228 	fil_crypt_return_iops(&thr);
2229 
2230 	/* release current space if shutting down */
2231 	if (thr.space) {
2232 		thr.space->release();
2233 		thr.space = NULL;
2234 	}
2235 
2236 	mutex_enter(&fil_crypt_threads_mutex);
2237 	srv_n_fil_crypt_threads_started--;
2238 	os_event_set(fil_crypt_event); /* signal that we stopped */
2239 	mutex_exit(&fil_crypt_threads_mutex);
2240 
2241 	/* We count the number of threads in os_thread_exit(). A created
2242 	thread should always use that to exit and not use return() to exit. */
2243 
2244 	os_thread_exit();
2245 
2246 	OS_THREAD_DUMMY_RETURN;
2247 }
2248 
2249 /*********************************************************************
2250 Adjust thread count for key rotation
2251 @param[in]	enw_cnt		Number of threads to be used */
2252 UNIV_INTERN
2253 void
fil_crypt_set_thread_cnt(const uint new_cnt)2254 fil_crypt_set_thread_cnt(
2255 	const uint	new_cnt)
2256 {
2257 	if (!fil_crypt_threads_inited) {
2258 		if (srv_shutdown_state != SRV_SHUTDOWN_NONE)
2259 			return;
2260 		fil_crypt_threads_init();
2261 	}
2262 
2263 	mutex_enter(&fil_crypt_threads_mutex);
2264 
2265 	if (new_cnt > srv_n_fil_crypt_threads) {
2266 		uint add = new_cnt - srv_n_fil_crypt_threads;
2267 		srv_n_fil_crypt_threads = new_cnt;
2268 		for (uint i = 0; i < add; i++) {
2269 			ib::info() << "Creating #"
2270 				   << i+1 << " encryption thread id "
2271 				   << os_thread_create(fil_crypt_thread)
2272 				   << " total threads " << new_cnt << ".";
2273 		}
2274 	} else if (new_cnt < srv_n_fil_crypt_threads) {
2275 		srv_n_fil_crypt_threads = new_cnt;
2276 		os_event_set(fil_crypt_threads_event);
2277 	}
2278 
2279 	mutex_exit(&fil_crypt_threads_mutex);
2280 
2281 	while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2282 		os_event_reset(fil_crypt_event);
2283 		os_event_wait_time(fil_crypt_event, 100000);
2284 	}
2285 
2286 	/* Send a message to encryption threads that there could be
2287 	something to do. */
2288 	if (srv_n_fil_crypt_threads) {
2289 		os_event_set(fil_crypt_threads_event);
2290 	}
2291 }
2292 
2293 /** Initialize the tablespace default_encrypt_tables
2294 if innodb_encryption_rotate_key_age=0. */
fil_crypt_default_encrypt_tables_fill()2295 static void fil_crypt_default_encrypt_tables_fill()
2296 {
2297 	ut_ad(mutex_own(&fil_system.mutex));
2298 
2299 	for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2300 	     space != NULL;
2301 	     space = UT_LIST_GET_NEXT(space_list, space)) {
2302 		if (space->purpose != FIL_TYPE_TABLESPACE
2303 		    || space->is_in_default_encrypt
2304 		    || UT_LIST_GET_LEN(space->chain) == 0
2305 		    || !space->acquire_if_not_stopped()) {
2306 			continue;
2307 		}
2308 
2309 		/* Ensure that crypt_data has been initialized. */
2310 		ut_ad(space->size);
2311 
2312 		/* Skip ENCRYPTION!=DEFAULT tablespaces. */
2313 		if (space->crypt_data
2314 		    && !space->crypt_data->is_default_encryption()) {
2315 			goto next;
2316 		}
2317 
2318 		if (srv_encrypt_tables) {
2319 			/* Skip encrypted tablespaces if
2320 			innodb_encrypt_tables!=OFF */
2321 			if (space->crypt_data
2322 			    && space->crypt_data->min_key_version) {
2323 				goto next;
2324 			}
2325 		} else {
2326 			/* Skip unencrypted tablespaces if
2327 			innodb_encrypt_tables=OFF */
2328 			if (!space->crypt_data
2329 			    || !space->crypt_data->min_key_version) {
2330 				goto next;
2331 			}
2332 		}
2333 
2334 		fil_system.default_encrypt_tables.push_back(*space);
2335 		space->is_in_default_encrypt = true;
2336 next:
2337 		space->release();
2338 	}
2339 }
2340 
2341 /*********************************************************************
2342 Adjust max key age
2343 @param[in]	val		New max key age */
2344 UNIV_INTERN
2345 void
fil_crypt_set_rotate_key_age(uint val)2346 fil_crypt_set_rotate_key_age(
2347 	uint	val)
2348 {
2349 	mutex_enter(&fil_system.mutex);
2350 	srv_fil_crypt_rotate_key_age = val;
2351 	if (val == 0) {
2352 		fil_crypt_default_encrypt_tables_fill();
2353 	}
2354 	mutex_exit(&fil_system.mutex);
2355 	os_event_set(fil_crypt_threads_event);
2356 }
2357 
2358 /*********************************************************************
2359 Adjust rotation iops
2360 @param[in]	val		New max roation iops */
2361 UNIV_INTERN
2362 void
fil_crypt_set_rotation_iops(uint val)2363 fil_crypt_set_rotation_iops(
2364 	uint val)
2365 {
2366 	srv_n_fil_crypt_iops = val;
2367 	os_event_set(fil_crypt_threads_event);
2368 }
2369 
2370 /*********************************************************************
2371 Adjust encrypt tables
2372 @param[in]	val		New setting for innodb-encrypt-tables */
fil_crypt_set_encrypt_tables(ulong val)2373 void fil_crypt_set_encrypt_tables(ulong val)
2374 {
2375 	if (!fil_crypt_threads_inited) {
2376 		return;
2377 	}
2378 
2379 	mutex_enter(&fil_system.mutex);
2380 
2381 	srv_encrypt_tables = val;
2382 
2383 	if (fil_crypt_must_default_encrypt()) {
2384 		fil_crypt_default_encrypt_tables_fill();
2385 	}
2386 
2387 	mutex_exit(&fil_system.mutex);
2388 
2389 	os_event_set(fil_crypt_threads_event);
2390 }
2391 
2392 /*********************************************************************
2393 Init threads for key rotation */
2394 UNIV_INTERN
2395 void
fil_crypt_threads_init()2396 fil_crypt_threads_init()
2397 {
2398 	if (!fil_crypt_threads_inited) {
2399 		fil_crypt_event = os_event_create(0);
2400 		fil_crypt_threads_event = os_event_create(0);
2401 		mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2402 		     &fil_crypt_threads_mutex);
2403 
2404 		uint cnt = srv_n_fil_crypt_threads;
2405 		srv_n_fil_crypt_threads = 0;
2406 		fil_crypt_threads_inited = true;
2407 		fil_crypt_set_thread_cnt(cnt);
2408 	}
2409 }
2410 
2411 /*********************************************************************
2412 Clean up key rotation threads resources */
2413 UNIV_INTERN
2414 void
fil_crypt_threads_cleanup()2415 fil_crypt_threads_cleanup()
2416 {
2417 	if (!fil_crypt_threads_inited) {
2418 		return;
2419 	}
2420 	ut_a(!srv_n_fil_crypt_threads_started);
2421 	os_event_destroy(fil_crypt_event);
2422 	os_event_destroy(fil_crypt_threads_event);
2423 	mutex_free(&fil_crypt_threads_mutex);
2424 	fil_crypt_threads_inited = false;
2425 }
2426 
2427 /*********************************************************************
2428 Wait for crypt threads to stop accessing space
2429 @param[in]	space		Tablespace */
2430 UNIV_INTERN
2431 void
fil_space_crypt_close_tablespace(const fil_space_t * space)2432 fil_space_crypt_close_tablespace(
2433 	const fil_space_t*	space)
2434 {
2435 	fil_space_crypt_t* crypt_data = space->crypt_data;
2436 
2437 	if (!crypt_data || srv_n_fil_crypt_threads == 0
2438 	    || !fil_crypt_threads_inited) {
2439 		return;
2440 	}
2441 
2442 	mutex_enter(&fil_crypt_threads_mutex);
2443 
2444 	time_t start = time(0);
2445 	time_t last = start;
2446 
2447 	mutex_enter(&crypt_data->mutex);
2448 	mutex_exit(&fil_crypt_threads_mutex);
2449 
2450 	ulint cnt = crypt_data->rotate_state.active_threads;
2451 	bool flushing = crypt_data->rotate_state.flushing;
2452 
2453 	while (cnt > 0 || flushing) {
2454 		mutex_exit(&crypt_data->mutex);
2455 		/* release dict mutex so that scrub threads can release their
2456 		* table references */
2457 		dict_mutex_exit_for_mysql();
2458 
2459 		/* wakeup throttle (all) sleepers */
2460 		os_event_set(fil_crypt_throttle_sleep_event);
2461 		os_event_set(fil_crypt_threads_event);
2462 
2463 		os_thread_sleep(20000);
2464 		dict_mutex_enter_for_mysql();
2465 		mutex_enter(&crypt_data->mutex);
2466 		cnt = crypt_data->rotate_state.active_threads;
2467 		flushing = crypt_data->rotate_state.flushing;
2468 
2469 		time_t now = time(0);
2470 
2471 		if (now >= last + 30) {
2472 			ib::warn() << "Waited "
2473 				   << now - start
2474 				   << " seconds to drop space: "
2475 				   << space->name << " ("
2476 				   << space->id << ") active threads "
2477 				   << cnt << "flushing="
2478 				   << flushing << ".";
2479 			last = now;
2480 		}
2481 	}
2482 
2483 	mutex_exit(&crypt_data->mutex);
2484 }
2485 
2486 /*********************************************************************
2487 Get crypt status for a space (used by information_schema)
2488 @param[in]	space		Tablespace
2489 @param[out]	status		Crypt status */
2490 UNIV_INTERN
2491 void
fil_space_crypt_get_status(const fil_space_t * space,struct fil_space_crypt_status_t * status)2492 fil_space_crypt_get_status(
2493 	const fil_space_t*			space,
2494 	struct fil_space_crypt_status_t*	status)
2495 {
2496 	memset(status, 0, sizeof(*status));
2497 
2498 	ut_ad(space->referenced());
2499 
2500 	/* If there is no crypt data and we have not yet read
2501 	page 0 for this tablespace, we need to read it before
2502 	we can continue. */
2503 	if (!space->crypt_data) {
2504 		fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2505 	}
2506 
2507 	status->space = ULINT_UNDEFINED;
2508 
2509 	if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2510 		status->space = space->id;
2511 		mutex_enter(&crypt_data->mutex);
2512 		status->scheme = crypt_data->type;
2513 		status->keyserver_requests = crypt_data->keyserver_requests;
2514 		status->min_key_version = crypt_data->min_key_version;
2515 		status->key_id = crypt_data->key_id;
2516 
2517 		if (crypt_data->rotate_state.active_threads > 0 ||
2518 		    crypt_data->rotate_state.flushing) {
2519 			status->rotating = true;
2520 			status->flushing =
2521 				crypt_data->rotate_state.flushing;
2522 			status->rotate_next_page_number =
2523 				crypt_data->rotate_state.next_offset;
2524 			status->rotate_max_page_number =
2525 				crypt_data->rotate_state.max_offset;
2526 		}
2527 
2528 		mutex_exit(&crypt_data->mutex);
2529 
2530 		if (srv_encrypt_tables || crypt_data->min_key_version) {
2531 			status->current_key_version =
2532 				fil_crypt_get_latest_key_version(crypt_data);
2533 		}
2534 	}
2535 }
2536 
2537 /*********************************************************************
2538 Return crypt statistics
2539 @param[out]	stat		Crypt statistics */
2540 UNIV_INTERN
2541 void
fil_crypt_total_stat(fil_crypt_stat_t * stat)2542 fil_crypt_total_stat(
2543 	fil_crypt_stat_t *stat)
2544 {
2545 	mutex_enter(&crypt_stat_mutex);
2546 	*stat = crypt_stat;
2547 	mutex_exit(&crypt_stat_mutex);
2548 }
2549 
2550 #endif /* UNIV_INNOCHECKSUM */
2551 
2552 /**
2553 Verify that post encryption checksum match calculated checksum.
2554 This function should be called only if tablespace contains crypt_data
2555 metadata (this is strong indication that tablespace is encrypted).
2556 Function also verifies that traditional checksum does not match
2557 calculated checksum as if it does page could be valid unencrypted,
2558 encrypted, or corrupted.
2559 
2560 @param[in,out]	page		page frame (checksum is temporarily modified)
2561 @param[in]	zip_size	ROW_FORMAT=COMPRESSED page size, or 0
2562 @return true if page is encrypted AND OK, false otherwise */
fil_space_verify_crypt_checksum(const byte * page,ulint zip_size)2563 bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
2564 {
2565 	if (ENCRYPTION_KEY_NOT_ENCRYPTED == mach_read_from_4(
2566 			page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) {
2567 		return false;
2568 	}
2569 
2570 	/* Compressed and encrypted pages do not have checksum. Assume not
2571 	corrupted. Page verification happens after decompression in
2572 	buf_page_read_complete() using buf_page_is_corrupted(). */
2573 	if (fil_page_get_type(page) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2574 		return true;
2575 	}
2576 
2577 	/* Read stored post encryption checksum. */
2578 	const ib_uint32_t checksum = mach_read_from_4(
2579 		page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2580 
2581 	/* If stored checksum matches one of the calculated checksums
2582 	page is not corrupted. */
2583 
2584 	switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2585 	case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
2586 	case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2587 		if (zip_size) {
2588 			return checksum == page_zip_calc_checksum(
2589 				page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
2590 		}
2591 
2592 		return checksum == buf_calc_page_crc32(page);
2593 	case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2594 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2595 		due to MDEV-12114, fil_crypt_calculate_checksum()
2596 		is only using CRC32 for the encrypted pages.
2597 		Due to this, we must treat "strict_none" as "none". */
2598 	case SRV_CHECKSUM_ALGORITHM_NONE:
2599 		return true;
2600 	case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2601 		/* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2602 		due to MDEV-12114, fil_crypt_calculate_checksum()
2603 		is only using CRC32 for the encrypted pages.
2604 		Due to this, we must treat "strict_innodb" as "innodb". */
2605 	case SRV_CHECKSUM_ALGORITHM_INNODB:
2606 	case SRV_CHECKSUM_ALGORITHM_CRC32:
2607 	case SRV_CHECKSUM_ALGORITHM_FULL_CRC32:
2608 		if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2609 			return true;
2610 		}
2611 		if (zip_size) {
2612 			return checksum == page_zip_calc_checksum(
2613 				page, zip_size,
2614 				SRV_CHECKSUM_ALGORITHM_CRC32)
2615 				|| checksum == page_zip_calc_checksum(
2616 					page, zip_size,
2617 					SRV_CHECKSUM_ALGORITHM_INNODB);
2618 		}
2619 
2620 		return checksum == buf_calc_page_crc32(page)
2621 			|| checksum == buf_calc_page_new_checksum(page);
2622 	}
2623 
2624 	ut_ad("unhandled innodb_checksum_algorithm" == 0);
2625 	return false;
2626 }
2627