1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21
22 Created Jonas Oreland Google
23 Modified Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25
26 #include "fil0crypt.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0zip.h"
30 #include "buf0checksum.h"
31 #ifdef UNIV_INNOCHECKSUM
32 # include "buf0buf.h"
33 #else
34 #include "srv0srv.h"
35 #include "srv0start.h"
36 #include "mtr0mtr.h"
37 #include "mtr0log.h"
38 #include "ut0ut.h"
39 #include "btr0scrub.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43
44 static bool fil_crypt_threads_inited = false;
45
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop
78 static uint srv_alloc_time = 3; // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80
81 /** Variables for scrubbing */
82 extern uint srv_background_scrub_data_interval;
83 extern uint srv_background_scrub_data_check_interval;
84
85 #define DEBUG_KEYROTATION_THROTTLING 0
86
87 /** Statistics variables */
88 static fil_crypt_stat_t crypt_stat;
89 static ib_mutex_t crypt_stat_mutex;
90
91 /** Is background scrubbing enabled, defined on btr0scrub.cc */
92 extern my_bool srv_background_scrub_data_uncompressed;
93 extern my_bool srv_background_scrub_data_compressed;
94
95 /***********************************************************************
96 Check if a key needs rotation given a key_state
97 @param[in] crypt_data Encryption information
98 @param[in] key_version Current key version
99 @param[in] latest_key_version Latest key version
100 @param[in] rotate_key_age when to rotate
101 @return true if key needs rotation, false if not */
102 static bool
103 fil_crypt_needs_rotation(
104 const fil_space_crypt_t* crypt_data,
button_2_on()105 uint key_version,
106 uint latest_key_version,
107 uint rotate_key_age)
108 MY_ATTRIBUTE((warn_unused_result));
109
button_2_off()110 /*********************************************************************
111 Init space crypt */
112 UNIV_INTERN
113 void
114 fil_space_crypt_init()
115 {
116 fil_crypt_throttle_sleep_event = os_event_create(0);
117
118 mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
119 memset(&crypt_stat, 0, sizeof(crypt_stat));
120 }
121
122 /*********************************************************************
123 Cleanup space crypt */
124 UNIV_INTERN
125 void
126 fil_space_crypt_cleanup()
127 {
128 os_event_destroy(fil_crypt_throttle_sleep_event);
129 mutex_free(&crypt_stat_mutex);
130 }
131
132 /**
133 Get latest key version from encryption plugin.
134 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
135 uint
136 fil_space_crypt_t::key_get_latest_version(void)
137 {
138 uint key_version = key_found;
139
140 if (is_key_found()) {
141 key_version = encryption_key_get_latest_version(key_id);
142 /* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
143 It doesn't matter because srv_encrypt_rotate
144 can be set to true only once */
145 if (!srv_encrypt_rotate
146 && key_version > srv_fil_crypt_rotate_key_age) {
147 srv_encrypt_rotate = true;
148 }
149
150 srv_stats.n_key_requests.inc();
151 key_found = key_version;
152 }
153
154 return key_version;
155 }
156
157 /******************************************************************
158 Get the latest(key-version), waking the encrypt thread, if needed
159 @param[in,out] crypt_data Crypt data */
160 static inline
161 uint
162 fil_crypt_get_latest_key_version(
163 fil_space_crypt_t* crypt_data)
164 {
165 ut_ad(crypt_data != NULL);
166
167 uint key_version = crypt_data->key_get_latest_version();
168
169 if (crypt_data->is_key_found()) {
170
171 if (fil_crypt_needs_rotation(
172 crypt_data,
173 crypt_data->min_key_version,
174 key_version,
175 srv_fil_crypt_rotate_key_age)) {
176 /* Below event seen as NULL-pointer at startup
177 when new database was created and we create a
178 checkpoint. Only seen when debugging. */
179 if (fil_crypt_threads_inited) {
180 os_event_set(fil_crypt_threads_event);
181 }
182 }
183 }
184
185 return key_version;
186 }
187
188 /******************************************************************
189 Mutex helper for crypt_data->scheme */
190 void
191 crypt_data_scheme_locker(
192 /*=====================*/
193 st_encryption_scheme* scheme,
194 int exit)
195 {
196 fil_space_crypt_t* crypt_data =
197 static_cast<fil_space_crypt_t*>(scheme);
198
199 if (exit) {
200 mutex_exit(&crypt_data->mutex);
201 } else {
202 mutex_enter(&crypt_data->mutex);
203 }
204 }
205
206 /******************************************************************
207 Create a fil_space_crypt_t object
208 @param[in] type CRYPT_SCHEME_UNENCRYPTE or
209 CRYPT_SCHEME_1
210 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
211 FIL_ENCRYPTION_ON or
212 FIL_ENCRYPTION_OFF
213 @param[in] min_key_version key_version or 0
214 @param[in] key_id Used key id
215 @return crypt object */
216 static
217 fil_space_crypt_t*
218 fil_space_create_crypt_data(
219 uint type,
220 fil_encryption_t encrypt_mode,
221 uint min_key_version,
222 uint key_id)
223 {
224 fil_space_crypt_t* crypt_data = NULL;
225 if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
226 crypt_data = new(buf)
227 fil_space_crypt_t(
228 type,
229 min_key_version,
230 key_id,
231 encrypt_mode);
232 }
233
234 return crypt_data;
235 }
236
237 /******************************************************************
238 Create a fil_space_crypt_t object
239 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
240 FIL_ENCRYPTION_ON or
241 FIL_ENCRYPTION_OFF
242
243 @param[in] key_id Encryption key id
244 @return crypt object */
245 UNIV_INTERN
246 fil_space_crypt_t*
247 fil_space_create_crypt_data(
248 fil_encryption_t encrypt_mode,
249 uint key_id)
250 {
251 return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
252 }
253
254 /******************************************************************
255 Merge fil_space_crypt_t object
256 @param[in,out] dst Destination cryp data
257 @param[in] src Source crypt data */
258 UNIV_INTERN
259 void
260 fil_space_merge_crypt_data(
261 fil_space_crypt_t* dst,
262 const fil_space_crypt_t* src)
263 {
264 mutex_enter(&dst->mutex);
265
266 /* validate that they are mergeable */
267 ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
268 src->type == CRYPT_SCHEME_1);
269
270 ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
271 dst->type == CRYPT_SCHEME_1);
272
273 dst->encryption = src->encryption;
274 dst->type = src->type;
275 dst->min_key_version = src->min_key_version;
276 dst->keyserver_requests += src->keyserver_requests;
277
278 mutex_exit(&dst->mutex);
279 }
280
281 /** Initialize encryption parameters from a tablespace header page.
282 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
283 @param[in] page first page of the tablespace
284 @return crypt data from page 0
285 @retval NULL if not present or not valid */
286 fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page)
287 {
288 const ulint offset = FSP_HEADER_OFFSET
289 + fsp_header_get_encryption_offset(zip_size);
290
291 if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
292 /* Crypt data is not stored. */
293 return NULL;
294 }
295
296 uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
297 uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
298 fil_space_crypt_t* crypt_data;
299
300 if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
301 type == CRYPT_SCHEME_1)
302 || iv_length != sizeof crypt_data->iv) {
303 ib::error() << "Found non sensible crypt scheme: "
304 << type << "," << iv_length
305 << " for space: "
306 << page_get_space_id(page);
307 return NULL;
308 }
309
310 uint min_key_version = mach_read_from_4
311 (page + offset + MAGIC_SZ + 2 + iv_length);
312
313 uint key_id = mach_read_from_4
314 (page + offset + MAGIC_SZ + 2 + iv_length + 4);
315
316 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
317 page + offset + MAGIC_SZ + 2 + iv_length + 8);
318
319 crypt_data = fil_space_create_crypt_data(encryption, key_id);
320 /* We need to overwrite these as above function will initialize
321 members */
322 crypt_data->type = type;
323 crypt_data->min_key_version = min_key_version;
324 crypt_data->page0_offset = offset;
325 memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
326
327 return crypt_data;
328 }
329
330 /******************************************************************
331 Free a crypt data object
332 @param[in,out] crypt_data crypt data to be freed */
333 UNIV_INTERN
334 void
335 fil_space_destroy_crypt_data(
336 fil_space_crypt_t **crypt_data)
337 {
338 if (crypt_data != NULL && (*crypt_data) != NULL) {
339 fil_space_crypt_t* c;
340 if (UNIV_LIKELY(fil_crypt_threads_inited)) {
341 mutex_enter(&fil_crypt_threads_mutex);
342 c = *crypt_data;
343 *crypt_data = NULL;
344 mutex_exit(&fil_crypt_threads_mutex);
345 } else {
346 ut_ad(srv_read_only_mode || !srv_was_started);
347 c = *crypt_data;
348 *crypt_data = NULL;
349 }
350 if (c) {
351 c->~fil_space_crypt_t();
352 ut_free(c);
353 }
354 }
355 }
356
357 /** Fill crypt data information to the give page.
358 It should be called during ibd file creation.
359 @param[in] flags tablespace flags
360 @param[in,out] page first page of the tablespace */
361 void
362 fil_space_crypt_t::fill_page0(
363 ulint flags,
364 byte* page)
365 {
366 const uint len = sizeof(iv);
367 const ulint offset = FSP_HEADER_OFFSET
368 + fsp_header_get_encryption_offset(
369 fil_space_t::zip_size(flags));
370 page0_offset = offset;
371
372 memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
373 mach_write_to_1(page + offset + MAGIC_SZ, type);
374 mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
375 memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
376
377 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
378 min_key_version);
379 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
380 key_id);
381 mach_write_to_1(page + offset + MAGIC_SZ + 2 + len + 8,
382 encryption);
383 }
384
385 /******************************************************************
386 Write crypt data to a page (0)
387 @param[in] space tablespace
388 @param[in,out] page0 first page of the tablespace
389 @param[in,out] mtr mini-transaction */
390 UNIV_INTERN
391 void
392 fil_space_crypt_t::write_page0(
393 const fil_space_t* space,
394 byte* page,
395 mtr_t* mtr)
396 {
397 ut_ad(this == space->crypt_data);
398 const uint len = sizeof(iv);
399 const ulint offset = FSP_HEADER_OFFSET
400 + fsp_header_get_encryption_offset(space->zip_size());
401 page0_offset = offset;
402
403 /*
404 redo log this as bytewise updates to page 0
405 followed by an MLOG_FILE_WRITE_CRYPT_DATA
406 (that will during recovery update fil_space_t)
407 */
408 mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
409 mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
410 mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
411 mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
412 mtr);
413 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
414 MLOG_4BYTES, mtr);
415 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 4, key_id,
416 MLOG_4BYTES, mtr);
417 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 8, encryption,
418 MLOG_1BYTE, mtr);
419
420 DBUG_EXECUTE_IF("ib_do_not_log_crypt_data", return;);
421
422 byte* log_ptr = mlog_open(mtr, 11 + 17 + len);
423
424 if (log_ptr != NULL) {
425 log_ptr = mlog_write_initial_log_record_fast(
426 page,
427 MLOG_FILE_WRITE_CRYPT_DATA,
428 log_ptr, mtr);
429 mach_write_to_4(log_ptr, space->id);
430 log_ptr += 4;
431 mach_write_to_2(log_ptr, offset);
432 log_ptr += 2;
set_beginmark_label(int value)433 mach_write_to_1(log_ptr, type);
434 log_ptr += 1;
435 mach_write_to_1(log_ptr, len);
436 log_ptr += 1;
437 mach_write_to_4(log_ptr, min_key_version);
438 log_ptr += 4;
set_lengthmark_label(int value,int freq)439 mach_write_to_4(log_ptr, key_id);
440 log_ptr += 4;
441 mach_write_to_1(log_ptr, encryption);
442 log_ptr += 1;
443 mlog_close(mtr, log_ptr);
444
445 mlog_catenate_string(mtr, iv, len);
446 }
447 }
448
449 /******************************************************************
450 Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
set_zoom_label(int value)451 @param[in] ptr Log entry start
452 @param[in] end_ptr Log entry end
453 @param[in] block buffer block
454 @return position on log buffer */
455 UNIV_INTERN
456 byte*
457 fil_parse_write_crypt_data(
458 byte* ptr,
459 const byte* end_ptr,
460 dberr_t* err)
461 {
462 /* check that redo log entry is complete */
463 uint entry_size =
464 4 + // size of space_id
465 2 + // size of offset
466 1 + // size of type
467 1 + // size of iv-len
468 4 + // size of min_key_version
469 4 + // size of key_id
470 1; // fil_encryption_t
471
472 *err = DB_SUCCESS;
473
474 if (ptr + entry_size > end_ptr) {
475 return NULL;
476 }
477
478 ulint space_id = mach_read_from_4(ptr);
479 ptr += 4;
480 uint offset = mach_read_from_2(ptr);
481 ptr += 2;
482 uint type = mach_read_from_1(ptr);
483 ptr += 1;
484 uint len = mach_read_from_1(ptr);
485 ptr += 1;
486
487 if ((type != CRYPT_SCHEME_1 && type != CRYPT_SCHEME_UNENCRYPTED)
488 || len != CRYPT_SCHEME_1_IV_LEN) {
489 *err = DB_CORRUPTION;
490 return NULL;
491 }
492
493 uint min_key_version = mach_read_from_4(ptr);
494 ptr += 4;
495
496 uint key_id = mach_read_from_4(ptr);
497 ptr += 4;
498
499 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
500 ptr +=1;
501
502 if (ptr + len > end_ptr) {
503 return NULL;
504 }
505
506 mutex_enter(&fil_system.mutex);
507
508 fil_space_t* space = fil_space_get_by_id(space_id);
509
510 if (!space) {
511 mutex_exit(&fil_system.mutex);
512 return ptr + len;
513 }
514
515 fil_space_crypt_t* crypt_data = fil_space_create_crypt_data(
516 encryption, key_id);
517
518 crypt_data->page0_offset = offset;
519 crypt_data->min_key_version = min_key_version;
520 crypt_data->type = type;
521 memcpy(crypt_data->iv, ptr, len);
522 ptr += len;
523
524 if (space->crypt_data) {
525 fil_space_merge_crypt_data(space->crypt_data, crypt_data);
526 fil_space_destroy_crypt_data(&crypt_data);
527 crypt_data = space->crypt_data;
528 } else {
529 space->crypt_data = crypt_data;
530 }
531
532 mutex_exit(&fil_system.mutex);
533
534 if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
535 *err = DB_DECRYPTION_FAILED;
536 }
537
538 return ptr;
539 }
540
541 /** Encrypt a buffer for non full checksum.
542 @param[in,out] crypt_data Crypt data
543 @param[in] space space_id
544 @param[in] offset Page offset
545 @param[in] lsn Log sequence number
546 @param[in] src_frame Page to encrypt
547 @param[in] zip_size ROW_FORMAT=COMPRESSED
548 page size, or 0
549 @param[in,out] dst_frame Output buffer
550 @return encrypted buffer or NULL */
551 static byte* fil_encrypt_buf_for_non_full_checksum(
552 fil_space_crypt_t* crypt_data,
553 ulint space,
554 ulint offset,
555 lsn_t lsn,
556 const byte* src_frame,
557 ulint zip_size,
558 byte* dst_frame)
play_it(Widget w,XtPointer client_data,XtPointer call_data)559 {
560 uint size = uint(zip_size ? zip_size : srv_page_size);
561 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
562 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
563
564 ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
565 ibool page_compressed = (orig_page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
566 uint header_len = FIL_PAGE_DATA;
567
568 if (page_compressed) {
569 header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
570 }
571
572 /* FIL page header is not encrypted */
573 memcpy(dst_frame, src_frame, header_len);
574 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
575 key_version);
576
577 /* Calculate the start offset in a page */
578 uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
579 uint srclen = size - unencrypted_bytes;
580 const byte* src = src_frame + header_len;
581 byte* dst = dst_frame + header_len;
582 uint32 dstlen = 0;
583 ib_uint32_t checksum = 0;
584
585 if (page_compressed) {
586 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
587 }
588
589 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
590 crypt_data, key_version,
591 (uint32)space, (uint32)offset, lsn);
592 ut_a(rc == MY_AES_OK);
593 ut_a(dstlen == srclen);
594
595 /* For compressed tables we do not store the FIL header because
596 the whole page is not stored to the disk. In compressed tables only
597 the FIL header + compressed (and now encrypted) payload alligned
598 to sector boundary is written. */
599 if (!page_compressed) {
600 /* FIL page trailer is also not encrypted */
601 memcpy(dst_frame + size - FIL_PAGE_DATA_END,
602 src_frame + size - FIL_PAGE_DATA_END,
603 FIL_PAGE_DATA_END);
604 } else {
605 /* Clean up rest of buffer */
606 memset(dst_frame+header_len+srclen, 0,
607 size - (header_len + srclen));
608 }
609
610 checksum = fil_crypt_calculate_checksum(zip_size, dst_frame);
611
612 /* store the post-encryption checksum after the key-version */
613 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4,
614 checksum);
615
616 ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
617
618 srv_stats.pages_encrypted.inc();
619
620 return dst_frame;
621 }
622
623 /** Encrypt a buffer for full checksum format.
624 @param[in,out] crypt_data Crypt data
625 @param[in] space space_id
626 @param[in] offset Page offset
627 @param[in] lsn Log sequence number
628 @param[in] src_frame Page to encrypt
629 @param[in,out] dst_frame Output buffer
630 @return encrypted buffer or NULL */
631 static byte* fil_encrypt_buf_for_full_crc32(
632 fil_space_crypt_t* crypt_data,
633 ulint space,
634 ulint offset,
635 lsn_t lsn,
636 const byte* src_frame,
637 byte* dst_frame)
638 {
639 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
640 ut_d(bool corrupted = false);
641 const uint size = buf_page_full_crc32_size(src_frame, NULL,
abort_playing()642 #ifdef UNIV_DEBUG
643 &corrupted
644 #else
645 NULL
646 #endif
647 );
648 ut_ad(!corrupted);
649 uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
650 + FIL_PAGE_FCRC32_CHECKSUM);
651 const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
652 byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
653 uint dstlen = 0;
654
655 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
656
657 /* Till FIL_PAGE_LSN, page is not encrypted */
658 memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
659
660 /* Write key version to the page. */
661 mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version);
662
663 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
664 crypt_data, key_version,
665 uint(space), uint(offset), lsn);
666 ut_a(rc == MY_AES_OK);
667 ut_a(dstlen == srclen);
668
669 const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
670 mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
set_playline(XtPointer client_data)671 /* Clean the rest of the buffer. FIXME: Punch holes when writing! */
672 memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
673
674 srv_stats.pages_encrypted.inc();
675
676 return dst_frame;
677 }
678
679 /** Encrypt a buffer.
680 @param[in,out] crypt_data Crypt data
681 @param[in] space space_id
682 @param[in] offset Page offset
683 @param[in] lsn Log sequence number
684 @param[in] src_frame Page to encrypt
685 @param[in] zip_size ROW_FORMAT=COMPRESSED
686 page size, or 0
687 @param[in,out] dst_frame Output buffer
688 @param[in] use_full_checksum full crc32 algo is used
689 @return encrypted buffer or NULL */
690 UNIV_INTERN
691 byte*
692 fil_encrypt_buf(
693 fil_space_crypt_t* crypt_data,
694 ulint space,
695 ulint offset,
696 lsn_t lsn,
697 const byte* src_frame,
698 ulint zip_size,
699 byte* dst_frame,
700 bool use_full_checksum)
701 {
702 if (use_full_checksum) {
703 return fil_encrypt_buf_for_full_crc32(
704 crypt_data, space, offset,
705 lsn, src_frame, dst_frame);
706 }
707
708 return fil_encrypt_buf_for_non_full_checksum(
709 crypt_data, space, offset, lsn,
710 src_frame, zip_size, dst_frame);
711 }
712
713 /** Check whether these page types are allowed to encrypt.
714 @param[in] space tablespace object
715 @param[in] src_frame source page
716 @return true if it is valid page type */
NEW_set_playline(XtPointer client_data)717 static bool fil_space_encrypt_valid_page_type(
718 const fil_space_t* space,
719 byte* src_frame)
720 {
721 switch (mach_read_from_2(src_frame+FIL_PAGE_TYPE)) {
722 case FIL_PAGE_RTREE:
723 return space->full_crc32();
724 case FIL_PAGE_TYPE_FSP_HDR:
725 case FIL_PAGE_TYPE_XDES:
726 return false;
727 }
728
729 return true;
730 }
731
732 /******************************************************************
733 Encrypt a page
734
735 @param[in] space Tablespace
736 @param[in] offset Page offset
737 @param[in] lsn Log sequence number
738 @param[in] src_frame Page to encrypt
739 @param[in,out] dst_frame Output buffer
740 @return encrypted buffer or NULL */
741 UNIV_INTERN
742 byte*
743 fil_space_encrypt(
744 const fil_space_t* space,
745 ulint offset,
746 lsn_t lsn,
747 byte* src_frame,
748 byte* dst_frame)
749 {
750 if (!fil_space_encrypt_valid_page_type(space, src_frame)) {
751 return src_frame;
752 }
753
754 if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
755 return (src_frame);
756 }
757
758 ut_ad(space->pending_io());
759
760 return fil_encrypt_buf(space->crypt_data, space->id, offset, lsn,
761 src_frame, space->zip_size(),
762 dst_frame, space->full_crc32());
763 }
764
765 /** Decrypt a page for full checksum format.
766 @param[in] space space id
767 @param[in] crypt_data crypt_data
768 @param[in] tmp_frame Temporary buffer
769 @param[in,out] src_frame Page to decrypt
770 @return DB_SUCCESS or error */
771 static dberr_t fil_space_decrypt_full_crc32(
772 ulint space,
773 fil_space_crypt_t* crypt_data,
774 byte* tmp_frame,
775 byte* src_frame)
776 {
777 uint key_version = mach_read_from_4(
778 src_frame + FIL_PAGE_FCRC32_KEY_VERSION);
779 lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
780 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
781
782 ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
783
784 ut_ad(crypt_data);
785 ut_ad(crypt_data->is_encrypted());
786
787 memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
788
789 /* Calculate the offset where decryption starts */
790 const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
791 byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
792 uint dstlen = 0;
793 bool corrupted = false;
794 uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
795 if (UNIV_UNLIKELY(corrupted)) {
796 return DB_DECRYPTION_FAILED;
797 }
798
799 uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
800 + FIL_PAGE_FCRC32_CHECKSUM);
801
802 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
803 crypt_data, key_version,
804 (uint) space, offset, lsn);
805
806 if (rc != MY_AES_OK || dstlen != srclen) {
807 if (rc == -1) {
808 return DB_DECRYPTION_FAILED;
809 }
810
811 ib::fatal() << "Unable to decrypt data-block "
812 << " src: " << src << "srclen: "
813 << srclen << " buf: " << dst << "buflen: "
814 << dstlen << " return-code: " << rc
815 << " Can't continue!";
816 }
817
818 /* Copy only checksum part in the trailer */
819 memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
820 src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
821 FIL_PAGE_FCRC32_CHECKSUM);
822
823 srv_stats.pages_decrypted.inc();
824
825 return DB_SUCCESS; /* page was decrypted */
826 }
827
828 /** Decrypt a page for non full checksum format.
829 @param[in] crypt_data crypt_data
830 @param[in] tmp_frame Temporary buffer
831 @param[in] physical_size page size
832 @param[in,out] src_frame Page to decrypt
833 @return DB_SUCCESS or error */
834 static dberr_t fil_space_decrypt_for_non_full_checksum(
835 fil_space_crypt_t* crypt_data,
836 byte* tmp_frame,
837 ulint physical_size,
838 byte* src_frame)
839 {
840 ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
841 uint key_version = mach_read_from_4(
842 src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
843 bool page_compressed = (page_type
844 == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
845 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
846 uint space = mach_read_from_4(
847 src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
848 ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
849
850 ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
851 ut_a(crypt_data != NULL && crypt_data->is_encrypted());
852
853 /* read space & lsn */
854 uint header_len = FIL_PAGE_DATA;
855
856 if (page_compressed) {
857 header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
858 }
859
860 /* Copy FIL page header, it is not encrypted */
861 memcpy(tmp_frame, src_frame, header_len);
862
863 /* Calculate the offset where decryption starts */
864 const byte* src = src_frame + header_len;
865 byte* dst = tmp_frame + header_len;
866 uint32 dstlen = 0;
867 uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END;
868
869 if (page_compressed) {
870 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
871 }
872
873 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
874 crypt_data, key_version,
875 space, offset, lsn);
876
877 if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
878
879 if (rc == -1) {
880 return DB_DECRYPTION_FAILED;
881 }
882
883 ib::fatal() << "Unable to decrypt data-block "
884 << " src: " << static_cast<const void*>(src)
885 << "srclen: "
886 << srclen << " buf: "
887 << static_cast<const void*>(dst) << "buflen: "
888 << dstlen << " return-code: " << rc
889 << " Can't continue!";
890 }
891
892 /* For compressed tables we do not store the FIL header because
893 the whole page is not stored to the disk. In compressed tables only
894 the FIL header + compressed (and now encrypted) payload alligned
895 to sector boundary is written. */
896 if (!page_compressed) {
897 /* Copy FIL trailer */
898 memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END,
899 src_frame + physical_size - FIL_PAGE_DATA_END,
900 FIL_PAGE_DATA_END);
901 }
902
903 srv_stats.pages_decrypted.inc();
904
905 return DB_SUCCESS; /* page was decrypted */
906 }
907
908 /** Decrypt a page.
909 @param[in] space_id tablespace id
910 @param[in] crypt_data crypt_data
911 @param[in] tmp_frame Temporary buffer
912 @param[in] physical_size page size
913 @param[in] fsp_flags Tablespace flags
914 @param[in,out] src_frame Page to decrypt
915 @param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
916 @return DB_SUCCESS or error */
917 UNIV_INTERN
918 dberr_t
919 fil_space_decrypt(
920 ulint space_id,
921 fil_space_crypt_t* crypt_data,
922 byte* tmp_frame,
923 ulint physical_size,
924 ulint fsp_flags,
925 byte* src_frame)
926 {
927 if (fil_space_t::full_crc32(fsp_flags)) {
928 return fil_space_decrypt_full_crc32(
929 space_id, crypt_data, tmp_frame, src_frame);
930 }
931
932 return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame,
933 physical_size,
934 src_frame);
935 }
936
937 /**
938 Decrypt a page.
939 @param[in] space Tablespace
940 @param[in] tmp_frame Temporary buffer used for decrypting
941 @param[in,out] src_frame Page to decrypt
942 @return decrypted page, or original not encrypted page if decryption is
943 not needed.*/
944 UNIV_INTERN
945 byte*
946 fil_space_decrypt(
947 const fil_space_t* space,
948 byte* tmp_frame,
949 byte* src_frame)
950 {
951 const ulint physical_size = space->physical_size();
952
953 ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
954 ut_ad(space->pending_io());
955
956 if (DB_SUCCESS != fil_space_decrypt(space->id, space->crypt_data,
957 tmp_frame, physical_size,
958 space->flags, src_frame)) {
959 return nullptr;
960 }
961
962 /* Copy the decrypted page back to page buffer, not
963 really any other options. */
964 memcpy(src_frame, tmp_frame, physical_size);
965
966 return src_frame;
967 }
968
969 /**
970 Calculate post encryption checksum
971 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
972 @param[in] dst_frame Block where checksum is calculated
973 @return page checksum
974 not needed. */
975 uint32_t
976 fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame)
977 {
978 /* For encrypted tables we use only crc32 and strict_crc32 */
979 return zip_size
980 ? page_zip_calc_checksum(dst_frame, zip_size,
981 SRV_CHECKSUM_ALGORITHM_CRC32)
982 : buf_calc_page_crc32(dst_frame);
983 }
984
985 /***********************************************************************/
986
987 /** A copy of global key state */
988 struct key_state_t {
989 key_state_t() : key_id(0), key_version(0),
990 rotate_key_age(srv_fil_crypt_rotate_key_age) {}
991 bool operator==(const key_state_t& other) const {
992 return key_version == other.key_version &&
993 rotate_key_age == other.rotate_key_age;
994 }
995 uint key_id;
996 uint key_version;
997 uint rotate_key_age;
998 };
999
1000 /***********************************************************************
1001 Copy global key state
1002 @param[in,out] new_state key state
1003 @param[in] crypt_data crypt data */
1004 static void
1005 fil_crypt_get_key_state(
1006 key_state_t* new_state,
1007 fil_space_crypt_t* crypt_data)
1008 {
1009 if (srv_encrypt_tables) {
1010 new_state->key_version = crypt_data->key_get_latest_version();
1011 new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
1012
1013 ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
1014 } else {
1015 new_state->key_version = 0;
1016 new_state->rotate_key_age = 0;
1017 }
1018 }
1019
1020 /***********************************************************************
1021 Check if a key needs rotation given a key_state
1022 @param[in] crypt_data Encryption information
1023 @param[in] key_version Current key version
1024 @param[in] latest_key_version Latest key version
1025 @param[in] rotate_key_age when to rotate
1026 @return true if key needs rotation, false if not */
1027 static bool
1028 fil_crypt_needs_rotation(
1029 const fil_space_crypt_t* crypt_data,
1030 uint key_version,
1031 uint latest_key_version,
1032 uint rotate_key_age)
1033 {
1034 if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
1035 return false;
1036 }
1037
1038 if (key_version == 0 && latest_key_version != 0) {
1039 /* this is rotation unencrypted => encrypted
1040 * ignore rotate_key_age */
1041 return true;
1042 }
1043
1044 if (latest_key_version == 0 && key_version != 0) {
1045 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
1046 /* this is rotation encrypted => unencrypted */
1047 return true;
1048 }
1049 return false;
1050 }
1051
1052 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
1053 && crypt_data->type == CRYPT_SCHEME_1
1054 && !srv_encrypt_tables) {
1055 /* This is rotation encrypted => unencrypted */
1056 return true;
1057 }
1058
1059 if (rotate_key_age == 0) {
1060 return false;
1061 }
1062
1063 /* this is rotation encrypted => encrypted,
1064 * only reencrypt if key is sufficiently old */
1065 if (key_version + rotate_key_age < latest_key_version) {
1066 return true;
1067 }
1068
1069 return false;
1070 }
1071
1072 /** Read page 0 and possible crypt data from there.
1073 @param[in,out] space Tablespace */
1074 static inline
1075 void
1076 fil_crypt_read_crypt_data(fil_space_t* space)
1077 {
1078 if (space->crypt_data || space->size
1079 || !fil_space_get_size(space->id)) {
1080 /* The encryption metadata has already been read, or
1081 the tablespace is not encrypted and the file has been
1082 opened already, or the file cannot be accessed,
1083 likely due to a concurrent DROP
1084 (possibly as part of TRUNCATE or ALTER TABLE).
1085 FIXME: The file can become unaccessible any time
1086 after this check! We should really remove this
1087 function and instead make crypt_data an integral
1088 part of fil_space_t. */
1089 return;
1090 }
1091
1092 const ulint zip_size = space->zip_size();
1093 mtr_t mtr;
1094 mtr.start();
1095 if (buf_block_t* block = buf_page_get(page_id_t(space->id, 0),
1096 zip_size, RW_S_LATCH, &mtr)) {
1097 mutex_enter(&fil_system.mutex);
1098 if (!space->crypt_data) {
1099 space->crypt_data = fil_space_read_crypt_data(
1100 zip_size, block->frame);
1101 }
1102 mutex_exit(&fil_system.mutex);
1103 }
1104 mtr.commit();
1105 }
1106
1107 /** Start encrypting a space
1108 @param[in,out] space Tablespace
1109 @return true if a recheck of tablespace is needed by encryption thread. */
1110 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
1111 {
1112 bool recheck = false;
1113
1114 mutex_enter(&fil_crypt_threads_mutex);
1115
1116 fil_space_crypt_t *crypt_data = space->crypt_data;
1117
1118 /* If space is not encrypted and encryption is not enabled, then
1119 do not continue encrypting the space. */
1120 if (!crypt_data && !srv_encrypt_tables) {
1121 mutex_exit(&fil_crypt_threads_mutex);
1122 return false;
1123 }
1124
1125 if (crypt_data != NULL || fil_crypt_start_converting) {
1126 /* someone beat us to it */
1127 if (fil_crypt_start_converting) {
1128 recheck = true;
1129 }
1130
1131 mutex_exit(&fil_crypt_threads_mutex);
1132 return recheck;
1133 }
1134
1135 /* NOTE: we need to write and flush page 0 before publishing
1136 * the crypt data. This so that after restart there is no
1137 * risk of finding encrypted pages without having
1138 * crypt data in page 0 */
1139
1140 /* 1 - create crypt data */
1141 crypt_data = fil_space_create_crypt_data(
1142 FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
1143
1144 if (crypt_data == NULL) {
1145 mutex_exit(&fil_crypt_threads_mutex);
1146 return false;
1147 }
1148
1149 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
1150 crypt_data->min_key_version = 0; // all pages are unencrypted
1151 crypt_data->rotate_state.start_time = time(0);
1152 crypt_data->rotate_state.starting = true;
1153 crypt_data->rotate_state.active_threads = 1;
1154
1155 mutex_enter(&fil_system.mutex);
1156 space->crypt_data = crypt_data;
1157 mutex_exit(&fil_system.mutex);
1158
1159 fil_crypt_start_converting = true;
1160 mutex_exit(&fil_crypt_threads_mutex);
1161
1162 do
1163 {
1164 mtr_t mtr;
1165 mtr.start();
1166 mtr.set_named_space(space);
1167
1168 /* 2 - get page 0 */
1169 dberr_t err = DB_SUCCESS;
1170 buf_block_t* block = buf_page_get_gen(
1171 page_id_t(space->id, 0), space->zip_size(),
1172 RW_X_LATCH, NULL, BUF_GET,
1173 __FILE__, __LINE__,
1174 &mtr, &err);
1175
1176
1177 /* 3 - write crypt data to page 0 */
1178 byte* frame = buf_block_get_frame(block);
1179 crypt_data->type = CRYPT_SCHEME_1;
1180 crypt_data->write_page0(space, frame, &mtr);
1181
1182 mtr.commit();
1183
1184 /* record lsn of update */
1185 lsn_t end_lsn = mtr.commit_lsn();
1186
1187 /* 4 - sync tablespace before publishing crypt data */
1188
1189 bool success = false;
1190
1191 do {
1192 ulint n_pages = 0;
1193 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
1194 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
1195 } while (!success);
1196
1197 /* 5 - publish crypt data */
1198 mutex_enter(&fil_crypt_threads_mutex);
1199 mutex_enter(&crypt_data->mutex);
1200 crypt_data->type = CRYPT_SCHEME_1;
1201 ut_a(crypt_data->rotate_state.active_threads == 1);
1202 crypt_data->rotate_state.active_threads = 0;
1203 crypt_data->rotate_state.starting = false;
1204
1205 fil_crypt_start_converting = false;
1206 mutex_exit(&crypt_data->mutex);
1207 mutex_exit(&fil_crypt_threads_mutex);
1208
1209 return recheck;
1210 } while (0);
1211
1212 mutex_enter(&crypt_data->mutex);
1213 ut_a(crypt_data->rotate_state.active_threads == 1);
1214 crypt_data->rotate_state.active_threads = 0;
1215 mutex_exit(&crypt_data->mutex);
1216
1217 mutex_enter(&fil_crypt_threads_mutex);
1218 fil_crypt_start_converting = false;
1219 mutex_exit(&fil_crypt_threads_mutex);
1220
1221 return recheck;
1222 }
1223
1224 /** State of a rotation thread */
1225 struct rotate_thread_t {
1226 explicit rotate_thread_t(uint no) {
1227 memset(this, 0, sizeof(* this));
1228 thread_no = no;
1229 first = true;
1230 estimated_max_iops = 20;
1231 }
1232
1233 uint thread_no;
1234 bool first; /*!< is position before first space */
1235 fil_space_t* space; /*!< current space or NULL */
1236 ulint offset; /*!< current offset */
1237 ulint batch; /*!< #pages to rotate */
1238 uint min_key_version_found;/*!< min key version found but not rotated */
1239 lsn_t end_lsn; /*!< max lsn when rotating this space */
1240
1241 uint estimated_max_iops; /*!< estimation of max iops */
1242 uint allocated_iops; /*!< allocated iops */
1243 ulint cnt_waited; /*!< #times waited during this slot */
1244 uintmax_t sum_waited_us; /*!< wait time during this slot */
1245
1246 fil_crypt_stat_t crypt_stat; // statistics
1247
1248 btr_scrub_t scrub_data; /* thread local data used by btr_scrub-functions
1249 * when iterating pages of tablespace */
1250
1251 /** @return whether this thread should terminate */
1252 bool should_shutdown() const {
1253 switch (srv_shutdown_state) {
1254 case SRV_SHUTDOWN_NONE:
1255 return thread_no >= srv_n_fil_crypt_threads;
1256 case SRV_SHUTDOWN_EXIT_THREADS:
1257 /* srv_init_abort() must have been invoked */
1258 case SRV_SHUTDOWN_CLEANUP:
1259 case SRV_SHUTDOWN_INITIATED:
1260 return true;
1261 case SRV_SHUTDOWN_FLUSH_PHASE:
1262 case SRV_SHUTDOWN_LAST_PHASE:
1263 break;
1264 }
1265 ut_ad(0);
1266 return true;
1267 }
1268 };
1269
1270 /** Avoid the removal of the tablespace from
1271 default_encrypt_list only when
1272 1) Another active encryption thread working on tablespace
1273 2) Eligible for tablespace key rotation
1274 3) Tablespace is in flushing phase
1275 @return true if tablespace should be removed from
1276 default encrypt */
1277 static bool fil_crypt_must_remove(const fil_space_t &space)
1278 {
1279 ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1280 fil_space_crypt_t *crypt_data = space.crypt_data;
1281 ut_ad(mutex_own(&fil_system.mutex));
1282 const ulong encrypt_tables= srv_encrypt_tables;
1283 if (!crypt_data)
1284 return !encrypt_tables;
1285 if (!crypt_data->is_key_found())
1286 return true;
1287
1288 mutex_enter(&crypt_data->mutex);
1289 const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1290 (!crypt_data->rotate_state.flushing &&
1291 !encrypt_tables == !!crypt_data->min_key_version &&
1292 !crypt_data->rotate_state.active_threads);
1293 mutex_exit(&crypt_data->mutex);
1294 return remove;
1295 }
1296
1297 /***********************************************************************
1298 Check if space needs rotation given a key_state
1299 @param[in,out] state Key rotation state
1300 @param[in,out] key_state Key state
1301 @param[in,out] recheck needs recheck ?
1302 @return true if space needs key rotation */
1303 static
1304 bool
1305 fil_crypt_space_needs_rotation(
1306 rotate_thread_t* state,
1307 key_state_t* key_state,
1308 bool* recheck)
1309 {
1310 fil_space_t* space = state->space;
1311
1312 /* Make sure that tablespace is normal tablespace */
1313 if (space->purpose != FIL_TYPE_TABLESPACE) {
1314 return false;
1315 }
1316
1317 ut_ad(space->referenced());
1318
1319 fil_space_crypt_t *crypt_data = space->crypt_data;
1320
1321 if (crypt_data == NULL) {
1322 /**
1323 * space has no crypt data
1324 * start encrypting it...
1325 */
1326 *recheck = fil_crypt_start_encrypting_space(space);
1327 crypt_data = space->crypt_data;
1328
1329 if (crypt_data == NULL) {
1330 return false;
1331 }
1332
1333 crypt_data->key_get_latest_version();
1334 }
1335
1336 /* If used key_id is not found from encryption plugin we can't
1337 continue to rotate the tablespace */
1338 if (!crypt_data->is_key_found()) {
1339 return false;
1340 }
1341
1342 mutex_enter(&crypt_data->mutex);
1343
1344 do {
1345 /* prevent threads from starting to rotate space */
1346 if (crypt_data->rotate_state.starting) {
1347 /* recheck this space later */
1348 *recheck = true;
1349 break;
1350 }
1351
1352 /* prevent threads from starting to rotate space */
1353 if (space->is_stopping()) {
1354 break;
1355 }
1356
1357 if (crypt_data->rotate_state.flushing) {
1358 break;
1359 }
1360
1361 /* No need to rotate space if encryption is disabled */
1362 if (crypt_data->not_encrypted()) {
1363 break;
1364 }
1365
1366 if (crypt_data->key_id != key_state->key_id) {
1367 key_state->key_id= crypt_data->key_id;
1368 fil_crypt_get_key_state(key_state, crypt_data);
1369 }
1370
1371 bool need_key_rotation = fil_crypt_needs_rotation(
1372 crypt_data,
1373 crypt_data->min_key_version,
1374 key_state->key_version,
1375 key_state->rotate_key_age);
1376
1377 crypt_data->rotate_state.scrubbing.is_active =
1378 btr_scrub_start_space(*space, &state->scrub_data);
1379
1380 time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
1381 last_scrub_completed;
1382
1383 bool need_scrubbing =
1384 (srv_background_scrub_data_uncompressed ||
1385 srv_background_scrub_data_compressed) &&
1386 crypt_data->rotate_state.scrubbing.is_active
1387 && diff >= 0
1388 && ulint(diff) >= srv_background_scrub_data_interval;
1389
1390 if (need_key_rotation == false && need_scrubbing == false) {
1391 break;
1392 }
1393
1394 mutex_exit(&crypt_data->mutex);
1395
1396 return true;
1397 } while (0);
1398
1399 mutex_exit(&crypt_data->mutex);
1400
1401
1402 return false;
1403 }
1404
1405 /***********************************************************************
1406 Update global statistics with thread statistics
1407 @param[in,out] state key rotation statistics */
1408 static void
1409 fil_crypt_update_total_stat(
1410 rotate_thread_t *state)
1411 {
1412 mutex_enter(&crypt_stat_mutex);
1413 crypt_stat.pages_read_from_cache +=
1414 state->crypt_stat.pages_read_from_cache;
1415 crypt_stat.pages_read_from_disk +=
1416 state->crypt_stat.pages_read_from_disk;
1417 crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1418 crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1419 // remote old estimate
1420 crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1421 // add new estimate
1422 crypt_stat.estimated_iops += state->estimated_max_iops;
1423 mutex_exit(&crypt_stat_mutex);
1424
1425 // make new estimate "current" estimate
1426 memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1427 // record our old (current) estimate
1428 state->crypt_stat.estimated_iops = state->estimated_max_iops;
1429 }
1430
1431 /***********************************************************************
1432 Allocate iops to thread from global setting,
1433 used before starting to rotate a space.
1434 @param[in,out] state Rotation state
1435 @return true if allocation succeeded, false if failed */
1436 static
1437 bool
1438 fil_crypt_alloc_iops(
1439 rotate_thread_t *state)
1440 {
1441 ut_ad(state->allocated_iops == 0);
1442
1443 /* We have not yet selected the space to rotate, thus
1444 state might not contain space and we can't check
1445 its status yet. */
1446
1447 uint max_iops = state->estimated_max_iops;
1448 mutex_enter(&fil_crypt_threads_mutex);
1449
1450 if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1451 /* this can happen when user decreases srv_fil_crypt_iops */
1452 mutex_exit(&fil_crypt_threads_mutex);
1453 return false;
1454 }
1455
1456 uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1457
1458 if (alloc > max_iops) {
1459 alloc = max_iops;
1460 }
1461
1462 n_fil_crypt_iops_allocated += alloc;
1463 mutex_exit(&fil_crypt_threads_mutex);
1464
1465 state->allocated_iops = alloc;
1466
1467 return alloc > 0;
1468 }
1469
1470 /***********************************************************************
1471 Reallocate iops to thread,
1472 used when inside a space
1473 @param[in,out] state Rotation state */
1474 static
1475 void
1476 fil_crypt_realloc_iops(
1477 rotate_thread_t *state)
1478 {
1479 ut_a(state->allocated_iops > 0);
1480
1481 if (10 * state->cnt_waited > state->batch) {
1482 /* if we waited more than 10% re-estimate max_iops */
1483 ulint avg_wait_time_us =
1484 ulint(state->sum_waited_us / state->cnt_waited);
1485
1486 if (avg_wait_time_us == 0) {
1487 avg_wait_time_us = 1; // prevent division by zero
1488 }
1489
1490 DBUG_PRINT("ib_crypt",
1491 ("thr_no: %u - update estimated_max_iops from %u to "
1492 ULINTPF ".",
1493 state->thread_no,
1494 state->estimated_max_iops,
1495 1000000 / avg_wait_time_us));
1496
1497 state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1498 state->cnt_waited = 0;
1499 state->sum_waited_us = 0;
1500 } else {
1501 DBUG_PRINT("ib_crypt",
1502 ("thr_no: %u only waited " ULINTPF
1503 "%% skip re-estimate.",
1504 state->thread_no,
1505 (100 * state->cnt_waited)
1506 / (state->batch ? state->batch : 1)));
1507 }
1508
1509 if (state->estimated_max_iops <= state->allocated_iops) {
1510 /* return extra iops */
1511 uint extra = state->allocated_iops - state->estimated_max_iops;
1512
1513 if (extra > 0) {
1514 mutex_enter(&fil_crypt_threads_mutex);
1515 if (n_fil_crypt_iops_allocated < extra) {
1516 /* unknown bug!
1517 * crash in debug
1518 * keep n_fil_crypt_iops_allocated unchanged
1519 * in release */
1520 ut_ad(0);
1521 extra = 0;
1522 }
1523 n_fil_crypt_iops_allocated -= extra;
1524 state->allocated_iops -= extra;
1525
1526 if (state->allocated_iops == 0) {
1527 /* no matter how slow io system seems to be
1528 * never decrease allocated_iops to 0... */
1529 state->allocated_iops ++;
1530 n_fil_crypt_iops_allocated ++;
1531 }
1532
1533 os_event_set(fil_crypt_threads_event);
1534 mutex_exit(&fil_crypt_threads_mutex);
1535 }
1536 } else {
1537 /* see if there are more to get */
1538 mutex_enter(&fil_crypt_threads_mutex);
1539 if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1540 /* there are extra iops free */
1541 uint extra = srv_n_fil_crypt_iops -
1542 n_fil_crypt_iops_allocated;
1543 if (state->allocated_iops + extra >
1544 state->estimated_max_iops) {
1545 /* but don't alloc more than our max */
1546 extra = state->estimated_max_iops -
1547 state->allocated_iops;
1548 }
1549 n_fil_crypt_iops_allocated += extra;
1550 state->allocated_iops += extra;
1551
1552 DBUG_PRINT("ib_crypt",
1553 ("thr_no: %u increased iops from %u to %u.",
1554 state->thread_no,
1555 state->allocated_iops - extra,
1556 state->allocated_iops));
1557
1558 }
1559 mutex_exit(&fil_crypt_threads_mutex);
1560 }
1561
1562 fil_crypt_update_total_stat(state);
1563 }
1564
1565 /***********************************************************************
1566 Return allocated iops to global
1567 @param[in,out] state Rotation state */
1568 static
1569 void
1570 fil_crypt_return_iops(
1571 rotate_thread_t *state)
1572 {
1573 if (state->allocated_iops > 0) {
1574 uint iops = state->allocated_iops;
1575 mutex_enter(&fil_crypt_threads_mutex);
1576 if (n_fil_crypt_iops_allocated < iops) {
1577 /* unknown bug!
1578 * crash in debug
1579 * keep n_fil_crypt_iops_allocated unchanged
1580 * in release */
1581 ut_ad(0);
1582 iops = 0;
1583 }
1584
1585 n_fil_crypt_iops_allocated -= iops;
1586 state->allocated_iops = 0;
1587 os_event_set(fil_crypt_threads_event);
1588 mutex_exit(&fil_crypt_threads_mutex);
1589 }
1590
1591 fil_crypt_update_total_stat(state);
1592 }
1593
1594 bool fil_crypt_must_default_encrypt()
1595 {
1596 return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1597 }
1598
1599 /** Return the next tablespace from default_encrypt_tables.
1600 @param space previous tablespace (NULL to start from the start)
1601 @param recheck whether the removal condition needs to be rechecked after
1602 the encryption parameters were changed
1603 @param encrypt expected state of innodb_encrypt_tables
1604 @return the next tablespace to process (n_pending_ops incremented)
1605 @retval NULL if this was the last */
1606 inline fil_space_t *fil_system_t::default_encrypt_next(
1607 fil_space_t *space, bool recheck, bool encrypt)
1608 {
1609 ut_ad(mutex_own(&mutex));
1610
1611 sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1612 space && space->is_in_default_encrypt
1613 ? space
1614 : default_encrypt_tables.begin();
1615 const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1616 default_encrypt_tables.end();
1617
1618 if (space)
1619 {
1620 const bool released= !space->release();
1621
1622 if (space->is_in_default_encrypt)
1623 {
1624 while (++it != end &&
1625 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1626
1627 /* If one of the encryption threads already started
1628 the encryption of the table then don't remove the
1629 unencrypted spaces from default encrypt list.
1630
1631 If there is a change in innodb_encrypt_tables variables
1632 value then don't remove the last processed tablespace
1633 from the default encrypt list. */
1634 if (released && !recheck && fil_crypt_must_remove(*space))
1635 {
1636 ut_a(!default_encrypt_tables.empty());
1637 default_encrypt_tables.remove(*space);
1638 space->is_in_default_encrypt= false;
1639 }
1640 }
1641 }
1642 else while (it != end &&
1643 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1644 {
1645 /* Find the next suitable default encrypt table if
1646 beginning of default_encrypt_tables list has been scheduled
1647 to be deleted */
1648 it++;
1649 }
1650
1651 while (it != end)
1652 {
1653 space= &*it;
1654 if (space->acquire())
1655 return space;
1656 while (++it != end && (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1657 }
1658
1659 return NULL;
1660 }
1661
1662 /** Return the next tablespace.
1663 @param space previous tablespace (NULL to start from the beginning)
1664 @param recheck whether the removal condition needs to be rechecked after
1665 the encryption parameters were changed
1666 @param encrypt expected state of innodb_encrypt_tables
1667 @return pointer to the next tablespace (with n_pending_ops incremented)
1668 @retval NULL if this was the last */
1669 static fil_space_t *fil_space_next(fil_space_t *space, bool recheck,
1670 bool encrypt)
1671 {
1672 mutex_enter(&fil_system.mutex);
1673
1674 if (fil_crypt_must_default_encrypt())
1675 space= fil_system.default_encrypt_next(space, recheck, encrypt);
1676 else if (!space)
1677 {
1678 space= UT_LIST_GET_FIRST(fil_system.space_list);
1679 /* We can trust that space is not NULL because at least the
1680 system tablespace is always present and loaded first. */
1681 if (!space->acquire())
1682 goto next;
1683 }
1684 else
1685 {
1686 /* Move on to the next fil_space_t */
1687 space->release();
1688 next:
1689 space= UT_LIST_GET_NEXT(space_list, space);
1690
1691 /* Skip abnormal tablespaces or those that are being created by
1692 fil_ibd_create(), or being dropped. */
1693 while (space &&
1694 (UT_LIST_GET_LEN(space->chain) == 0 ||
1695 space->is_stopping() || space->purpose != FIL_TYPE_TABLESPACE))
1696 space= UT_LIST_GET_NEXT(space_list, space);
1697
1698 if (space && !space->acquire())
1699 goto next;
1700 }
1701
1702 mutex_exit(&fil_system.mutex);
1703 return space;
1704 }
1705
1706 /** Search for a space needing rotation
1707 @param[in,out] key_state Key state
1708 @param[in,out] state Rotation state
1709 @param[in,out] recheck recheck of the tablespace is needed or
1710 still encryption thread does write page 0 */
1711 static bool fil_crypt_find_space_to_rotate(
1712 key_state_t* key_state,
1713 rotate_thread_t* state,
1714 bool* recheck)
1715 {
1716 /* we need iops to start rotating */
1717 while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1718 if (state->space && state->space->is_stopping()) {
1719 state->space->release();
1720 state->space = NULL;
1721 }
1722
1723 os_event_reset(fil_crypt_threads_event);
1724 os_event_wait_time(fil_crypt_threads_event, 100000);
1725 }
1726
1727 if (state->should_shutdown()) {
1728 if (state->space) {
1729 state->space->release();
1730 state->space = NULL;
1731 }
1732 return false;
1733 }
1734
1735 if (state->first) {
1736 state->first = false;
1737 if (state->space) {
1738 state->space->release();
1739 }
1740 state->space = NULL;
1741 }
1742
1743 state->space = fil_space_next(state->space, *recheck,
1744 key_state->key_version != 0);
1745
1746 while (!state->should_shutdown() && state->space) {
1747 /* If there is no crypt data and we have not yet read
1748 page 0 for this tablespace, we need to read it before
1749 we can continue. */
1750 if (!state->space->crypt_data) {
1751 fil_crypt_read_crypt_data(state->space);
1752 }
1753
1754 if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1755 ut_ad(key_state->key_id);
1756 /* init state->min_key_version_found before
1757 * starting on a space */
1758 state->min_key_version_found = key_state->key_version;
1759 return true;
1760 }
1761
1762 state->space = fil_space_next(state->space, *recheck,
1763 key_state->key_version != 0);
1764 }
1765
1766 if (state->space) {
1767 state->space->release();
1768 state->space = NULL;
1769 }
1770
1771 /* no work to do; release our allocation of I/O capacity */
1772 fil_crypt_return_iops(state);
1773
1774 return false;
1775
1776 }
1777
1778 /***********************************************************************
1779 Start rotating a space
1780 @param[in] key_state Key state
1781 @param[in,out] state Rotation state */
1782 static
1783 void
1784 fil_crypt_start_rotate_space(
1785 const key_state_t* key_state,
1786 rotate_thread_t* state)
1787 {
1788 fil_space_crypt_t *crypt_data = state->space->crypt_data;
1789
1790 ut_ad(crypt_data);
1791 mutex_enter(&crypt_data->mutex);
1792 ut_ad(key_state->key_id == crypt_data->key_id);
1793
1794 if (crypt_data->rotate_state.active_threads == 0) {
1795 /* only first thread needs to init */
1796 crypt_data->rotate_state.next_offset = 1; // skip page 0
1797 /* no need to rotate beyond current max
1798 * if space extends, it will be encrypted with newer version */
1799 /* FIXME: max_offset could be removed and instead
1800 space->size consulted.*/
1801 crypt_data->rotate_state.max_offset = state->space->size;
1802 crypt_data->rotate_state.end_lsn = 0;
1803 crypt_data->rotate_state.min_key_version_found =
1804 key_state->key_version;
1805
1806 crypt_data->rotate_state.start_time = time(0);
1807
1808 if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1809 crypt_data->is_encrypted() &&
1810 key_state->key_version != 0) {
1811 /* this is rotation unencrypted => encrypted */
1812 crypt_data->type = CRYPT_SCHEME_1;
1813 }
1814 }
1815
1816 /* count active threads in space */
1817 crypt_data->rotate_state.active_threads++;
1818
1819 /* Initialize thread local state */
1820 state->end_lsn = crypt_data->rotate_state.end_lsn;
1821 state->min_key_version_found =
1822 crypt_data->rotate_state.min_key_version_found;
1823
1824 mutex_exit(&crypt_data->mutex);
1825 }
1826
1827 /***********************************************************************
1828 Search for batch of pages needing rotation
1829 @param[in] key_state Key state
1830 @param[in,out] state Rotation state
1831 @return true if page needing key rotation found, false if not found */
1832 static
1833 bool
1834 fil_crypt_find_page_to_rotate(
1835 const key_state_t* key_state,
1836 rotate_thread_t* state)
1837 {
1838 ulint batch = srv_alloc_time * state->allocated_iops;
1839 fil_space_t* space = state->space;
1840
1841 ut_ad(!space || space->referenced());
1842
1843 /* If space is marked to be dropped stop rotation. */
1844 if (!space || space->is_stopping()) {
1845 return false;
1846 }
1847
1848 fil_space_crypt_t *crypt_data = space->crypt_data;
1849
1850 mutex_enter(&crypt_data->mutex);
1851 ut_ad(key_state->key_id == crypt_data->key_id);
1852
1853 bool found = crypt_data->rotate_state.max_offset >=
1854 crypt_data->rotate_state.next_offset;
1855
1856 if (found) {
1857 state->offset = crypt_data->rotate_state.next_offset;
1858 ulint remaining = crypt_data->rotate_state.max_offset -
1859 crypt_data->rotate_state.next_offset;
1860
1861 if (batch <= remaining) {
1862 state->batch = batch;
1863 } else {
1864 state->batch = remaining;
1865 }
1866 }
1867
1868 crypt_data->rotate_state.next_offset += batch;
1869 mutex_exit(&crypt_data->mutex);
1870 return found;
1871 }
1872
1873 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1874 fil_crypt_get_page_throttle_func(state, offset, mtr, \
1875 sleeptime_ms, __FILE__, __LINE__)
1876
1877 /***********************************************************************
1878 Get a page and compute sleep time
1879 @param[in,out] state Rotation state
1880 @param[in] offset Page offset
1881 @param[in,out] mtr Minitransaction
1882 @param[out] sleeptime_ms Sleep time
1883 @param[in] file File where called
1884 @param[in] line Line where called
1885 @return page or NULL*/
1886 static
1887 buf_block_t*
1888 fil_crypt_get_page_throttle_func(
1889 rotate_thread_t* state,
1890 ulint offset,
1891 mtr_t* mtr,
1892 ulint* sleeptime_ms,
1893 const char* file,
1894 unsigned line)
1895 {
1896 fil_space_t* space = state->space;
1897 const ulint zip_size = space->zip_size();
1898 const page_id_t page_id(space->id, offset);
1899 ut_ad(space->referenced());
1900
1901 /* Before reading from tablespace we need to make sure that
1902 the tablespace is not about to be dropped. */
1903 if (space->is_stopping()) {
1904 return NULL;
1905 }
1906
1907 dberr_t err = DB_SUCCESS;
1908 buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
1909 NULL,
1910 BUF_PEEK_IF_IN_POOL, file, line,
1911 mtr, &err);
1912 if (block != NULL) {
1913 /* page was in buffer pool */
1914 state->crypt_stat.pages_read_from_cache++;
1915 return block;
1916 }
1917
1918 if (space->is_stopping()) {
1919 return NULL;
1920 }
1921
1922 state->crypt_stat.pages_read_from_disk++;
1923
1924 const ulonglong start = my_interval_timer();
1925 block = buf_page_get_gen(page_id, zip_size,
1926 RW_X_LATCH,
1927 NULL, BUF_GET_POSSIBLY_FREED,
1928 file, line, mtr, &err);
1929 const ulonglong end = my_interval_timer();
1930
1931 state->cnt_waited++;
1932
1933 if (end > start) {
1934 state->sum_waited_us += (end - start) / 1000;
1935 }
1936
1937 /* average page load */
1938 ulint add_sleeptime_ms = 0;
1939 ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1940 ulint alloc_wait_us = 1000000 / state->allocated_iops;
1941
1942 if (avg_wait_time_us < alloc_wait_us) {
1943 /* we reading faster than we allocated */
1944 add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1945 } else {
1946 /* if page load time is longer than we want, skip sleeping */
1947 }
1948
1949 *sleeptime_ms += add_sleeptime_ms;
1950
1951 return block;
1952 }
1953
1954
1955 /***********************************************************************
1956 Get block and allocation status
1957
1958 note: innodb locks fil_space_latch and then block when allocating page
1959 but locks block and then fil_space_latch when freeing page.
1960
1961 @param[in,out] state Rotation state
1962 @param[in] offset Page offset
1963 @param[in,out] mtr Minitransaction
1964 @param[out] allocation_status Allocation status
1965 @param[out] sleeptime_ms Sleep time
1966 @return block or NULL
1967 */
1968 static
1969 buf_block_t*
1970 btr_scrub_get_block_and_allocation_status(
1971 rotate_thread_t* state,
1972 ulint offset,
1973 mtr_t* mtr,
1974 btr_scrub_page_allocation_status_t *allocation_status,
1975 ulint* sleeptime_ms)
1976 {
1977 mtr_t local_mtr;
1978 buf_block_t *block = NULL;
1979 fil_space_t* space = state->space;
1980
1981 ut_ad(space->referenced());
1982
1983 mtr_start(&local_mtr);
1984
1985 *allocation_status = fseg_page_is_free(space, (uint32_t)offset) ?
1986 BTR_SCRUB_PAGE_FREE :
1987 BTR_SCRUB_PAGE_ALLOCATED;
1988
1989 if (*allocation_status == BTR_SCRUB_PAGE_FREE) {
1990 /* this is easy case, we lock fil_space_latch first and
1991 then block */
1992 block = fil_crypt_get_page_throttle(state,
1993 offset, mtr,
1994 sleeptime_ms);
1995 mtr_commit(&local_mtr);
1996 } else {
1997 /* page is allocated according to xdes */
1998
1999 /* release fil_space_latch *before* fetching block */
2000 mtr_commit(&local_mtr);
2001
2002 /* NOTE: when we have locked dict_index_get_lock(),
2003 * it's safe to release fil_space_latch and then fetch block
2004 * as dict_index_get_lock() is needed to make tree modifications
2005 * such as free-ing a page
2006 */
2007
2008 block = fil_crypt_get_page_throttle(state,
2009 offset, mtr,
2010 sleeptime_ms);
2011 }
2012
2013 return block;
2014 }
2015
2016
2017 /***********************************************************************
2018 Rotate one page
2019 @param[in,out] key_state Key state
2020 @param[in,out] state Rotation state */
2021 static
2022 void
2023 fil_crypt_rotate_page(
2024 const key_state_t* key_state,
2025 rotate_thread_t* state)
2026 {
2027 fil_space_t*space = state->space;
2028 ulint space_id = space->id;
2029 ulint offset = state->offset;
2030 ulint sleeptime_ms = 0;
2031 fil_space_crypt_t *crypt_data = space->crypt_data;
2032
2033 ut_ad(space->referenced());
2034 ut_ad(offset > 0);
2035
2036 /* In fil_crypt_thread where key rotation is done we have
2037 acquired space and checked that this space is not yet
2038 marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
2039 Check here also to give DROP TABLE or similar a change. */
2040 if (space->is_stopping()) {
2041 return;
2042 }
2043
2044 if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
2045 /* don't encrypt this as it contains address to dblwr buffer */
2046 return;
2047 }
2048
2049 mtr_t mtr;
2050 mtr.start();
2051 if (buf_block_t* block = fil_crypt_get_page_throttle(state,
2052 offset, &mtr,
2053 &sleeptime_ms)) {
2054 bool modified = false;
2055 int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
2056 lsn_t block_lsn = block->page.newest_modification;
2057 byte* frame = buf_block_get_frame(block);
2058 uint kv = buf_page_get_key_version(frame, space->flags);
2059
2060 if (space->is_stopping()) {
2061 /* The tablespace is closing (in DROP TABLE or
2062 TRUNCATE TABLE or similar): avoid further access */
2063 } else if (!kv && !*reinterpret_cast<uint16_t*>
2064 (&frame[FIL_PAGE_TYPE])) {
2065 /* It looks like this page is not
2066 allocated. Because key rotation is accessing
2067 pages in a pattern that is unlike the normal
2068 B-tree and undo log access pattern, we cannot
2069 invoke fseg_page_is_free() here, because that
2070 could result in a deadlock. If we invoked
2071 fseg_page_is_free() and released the
2072 tablespace latch before acquiring block->lock,
2073 then the fseg_page_is_free() information
2074 could be stale already. */
2075
2076 /* If the data file was originally created
2077 before MariaDB 10.0 or MySQL 5.6, some
2078 allocated data pages could carry 0 in
2079 FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
2080 pages will be updated in
2081 buf_flush_init_for_writing() when the page
2082 is modified the next time.
2083
2084 Also, when the doublewrite buffer pages are
2085 allocated on bootstrap in a non-debug build,
2086 some dummy pages will be allocated, with 0 in
2087 the FIL_PAGE_TYPE. Those pages should be
2088 skipped from key rotation forever. */
2089 } else if (fil_crypt_needs_rotation(
2090 crypt_data,
2091 kv,
2092 key_state->key_version,
2093 key_state->rotate_key_age)) {
2094
2095 mtr.set_named_space(space);
2096 modified = true;
2097
2098 /* force rotation by dummy updating page */
2099 mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
2100 space_id, MLOG_4BYTES, &mtr);
2101
2102 /* statistics */
2103 state->crypt_stat.pages_modified++;
2104 } else {
2105 if (crypt_data->is_encrypted()) {
2106 if (kv < state->min_key_version_found) {
2107 state->min_key_version_found = kv;
2108 }
2109 }
2110
2111 needs_scrubbing = btr_page_needs_scrubbing(
2112 &state->scrub_data, block,
2113 BTR_SCRUB_PAGE_ALLOCATION_UNKNOWN);
2114 }
2115
2116 mtr.commit();
2117 lsn_t end_lsn = mtr.commit_lsn();
2118
2119 if (needs_scrubbing == BTR_SCRUB_PAGE) {
2120 mtr.start();
2121 /*
2122 * refetch page and allocation status
2123 */
2124 btr_scrub_page_allocation_status_t allocated;
2125
2126 block = btr_scrub_get_block_and_allocation_status(
2127 state, offset, &mtr,
2128 &allocated,
2129 &sleeptime_ms);
2130
2131 if (block) {
2132 mtr.set_named_space(space);
2133
2134 /* get required table/index and index-locks */
2135 needs_scrubbing = btr_scrub_recheck_page(
2136 &state->scrub_data, block, allocated, &mtr);
2137
2138 if (needs_scrubbing == BTR_SCRUB_PAGE) {
2139 /* we need to refetch it once more now that we have
2140 * index locked */
2141 block = btr_scrub_get_block_and_allocation_status(
2142 state, offset, &mtr,
2143 &allocated,
2144 &sleeptime_ms);
2145
2146 needs_scrubbing = btr_scrub_page(&state->scrub_data,
2147 block, allocated,
2148 &mtr);
2149 }
2150
2151 /* NOTE: mtr is committed inside btr_scrub_recheck_page()
2152 * and/or btr_scrub_page. This is to make sure that
2153 * locks & pages are latched in corrected order,
2154 * the mtr is in some circumstances restarted.
2155 * (mtr_commit() + mtr_start())
2156 */
2157 }
2158 }
2159
2160 if (needs_scrubbing != BTR_SCRUB_PAGE) {
2161 /* if page didn't need scrubbing it might be that cleanups
2162 are needed. do those outside of any mtr to prevent deadlocks.
2163
2164 the information what kinds of cleanups that are needed are
2165 encoded inside the needs_scrubbing, but this is opaque to
2166 this function (except the value BTR_SCRUB_PAGE) */
2167 btr_scrub_skip_page(&state->scrub_data, needs_scrubbing);
2168 }
2169
2170 if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
2171 /* if we just detected that scrubbing was turned off
2172 * update global state to reflect this */
2173 ut_ad(crypt_data);
2174 mutex_enter(&crypt_data->mutex);
2175 crypt_data->rotate_state.scrubbing.is_active = false;
2176 mutex_exit(&crypt_data->mutex);
2177 }
2178
2179 if (modified) {
2180 /* if we modified page, we take lsn from mtr */
2181 ut_a(end_lsn > state->end_lsn);
2182 ut_a(end_lsn > block_lsn);
2183 state->end_lsn = end_lsn;
2184 } else {
2185 /* if we did not modify page, check for max lsn */
2186 if (block_lsn > state->end_lsn) {
2187 state->end_lsn = block_lsn;
2188 }
2189 }
2190 } else {
2191 /* If block read failed mtr memo and log should be empty. */
2192 ut_ad(!mtr.has_modifications());
2193 ut_ad(!mtr.is_dirty());
2194 ut_ad(mtr.get_memo()->size() == 0);
2195 ut_ad(mtr.get_log()->size() == 0);
2196 mtr.commit();
2197 }
2198
2199 if (sleeptime_ms) {
2200 os_event_reset(fil_crypt_throttle_sleep_event);
2201 os_event_wait_time(fil_crypt_throttle_sleep_event,
2202 1000 * sleeptime_ms);
2203 }
2204 }
2205
2206 /***********************************************************************
2207 Rotate a batch of pages
2208 @param[in,out] key_state Key state
2209 @param[in,out] state Rotation state */
2210 static
2211 void
2212 fil_crypt_rotate_pages(
2213 const key_state_t* key_state,
2214 rotate_thread_t* state)
2215 {
2216 ulint space = state->space->id;
2217 ulint end = std::min(state->offset + state->batch,
2218 state->space->free_limit);
2219
2220 ut_ad(state->space->referenced());
2221
2222 for (; state->offset < end; state->offset++) {
2223
2224 /* we can't rotate pages in dblwr buffer as
2225 * it's not possible to read those due to lots of asserts
2226 * in buffer pool.
2227 *
2228 * However since these are only (short-lived) copies of
2229 * real pages, they will be updated anyway when the
2230 * real page is updated
2231 */
2232 if (space == TRX_SYS_SPACE &&
2233 buf_dblwr_page_inside(state->offset)) {
2234 continue;
2235 }
2236
2237 /* If space is marked as stopping, stop rotating
2238 pages. */
2239 if (state->space->is_stopping()) {
2240 break;
2241 }
2242
2243 fil_crypt_rotate_page(key_state, state);
2244 }
2245 }
2246
2247 /***********************************************************************
2248 Flush rotated pages and then update page 0
2249
2250 @param[in,out] state rotation state */
2251 static
2252 void
2253 fil_crypt_flush_space(
2254 rotate_thread_t* state)
2255 {
2256 fil_space_t* space = state->space;
2257 fil_space_crypt_t *crypt_data = space->crypt_data;
2258
2259 ut_ad(space->referenced());
2260
2261 /* flush tablespace pages so that there are no pages left with old key */
2262 lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2263
2264 if (end_lsn > 0 && !space->is_stopping()) {
2265 bool success = false;
2266 ulint n_pages = 0;
2267 ulint sum_pages = 0;
2268 const ulonglong start = my_interval_timer();
2269
2270 do {
2271 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
2272 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
2273 sum_pages += n_pages;
2274 } while (!success && !space->is_stopping());
2275
2276 const ulonglong end = my_interval_timer();
2277
2278 if (sum_pages && end > start) {
2279 state->cnt_waited += sum_pages;
2280 state->sum_waited_us += (end - start) / 1000;
2281
2282 /* statistics */
2283 state->crypt_stat.pages_flushed += sum_pages;
2284 }
2285 }
2286
2287 if (crypt_data->min_key_version == 0) {
2288 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2289 }
2290
2291 if (space->is_stopping()) {
2292 return;
2293 }
2294
2295 /* update page 0 */
2296 mtr_t mtr;
2297 mtr.start();
2298
2299 dberr_t err;
2300
2301 if (buf_block_t* block = buf_page_get_gen(
2302 page_id_t(space->id, 0), space->zip_size(),
2303 RW_X_LATCH, NULL, BUF_GET,
2304 __FILE__, __LINE__, &mtr, &err)) {
2305 mtr.set_named_space(space);
2306 crypt_data->write_page0(space, block->frame, &mtr);
2307 }
2308
2309 mtr.commit();
2310 }
2311
2312 /***********************************************************************
2313 Complete rotating a space
2314 @param[in,out] state Rotation state */
2315 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2316 {
2317 fil_space_crypt_t *crypt_data = state->space->crypt_data;
2318
2319 ut_ad(crypt_data);
2320 ut_ad(state->space->referenced());
2321
2322 /* Space might already be dropped */
2323 if (!state->space->is_stopping()) {
2324 mutex_enter(&crypt_data->mutex);
2325
2326 /**
2327 * Update crypt data state with state from thread
2328 */
2329 if (state->min_key_version_found <
2330 crypt_data->rotate_state.min_key_version_found) {
2331 crypt_data->rotate_state.min_key_version_found =
2332 state->min_key_version_found;
2333 }
2334
2335 if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2336 crypt_data->rotate_state.end_lsn = state->end_lsn;
2337 }
2338
2339 ut_a(crypt_data->rotate_state.active_threads > 0);
2340 crypt_data->rotate_state.active_threads--;
2341 bool last = crypt_data->rotate_state.active_threads == 0;
2342
2343 /**
2344 * check if space is fully done
2345 * this as when threads shutdown, it could be that we "complete"
2346 * iterating before we have scanned the full space.
2347 */
2348 bool done = crypt_data->rotate_state.next_offset >=
2349 crypt_data->rotate_state.max_offset;
2350
2351 /**
2352 * we should flush space if we're last thread AND
2353 * the iteration is done
2354 */
2355 bool should_flush = last && done;
2356
2357 if (should_flush) {
2358 /* we're the last active thread */
2359 crypt_data->rotate_state.flushing = true;
2360 crypt_data->min_key_version =
2361 crypt_data->rotate_state.min_key_version_found;
2362 }
2363
2364 /* inform scrubbing */
2365 crypt_data->rotate_state.scrubbing.is_active = false;
2366 mutex_exit(&crypt_data->mutex);
2367
2368 /* all threads must call btr_scrub_complete_space wo/ mutex held */
2369 if (state->scrub_data.scrubbing) {
2370 btr_scrub_complete_space(&state->scrub_data);
2371 if (should_flush) {
2372 /* only last thread updates last_scrub_completed */
2373 ut_ad(crypt_data);
2374 mutex_enter(&crypt_data->mutex);
2375 crypt_data->rotate_state.scrubbing.
2376 last_scrub_completed = time(0);
2377 mutex_exit(&crypt_data->mutex);
2378 }
2379 }
2380
2381 if (should_flush) {
2382 fil_crypt_flush_space(state);
2383
2384 mutex_enter(&crypt_data->mutex);
2385 crypt_data->rotate_state.flushing = false;
2386 mutex_exit(&crypt_data->mutex);
2387 }
2388 } else {
2389 mutex_enter(&crypt_data->mutex);
2390 ut_a(crypt_data->rotate_state.active_threads > 0);
2391 crypt_data->rotate_state.active_threads--;
2392 mutex_exit(&crypt_data->mutex);
2393 }
2394 }
2395
2396 /*********************************************************************//**
2397 A thread which monitors global key state and rotates tablespaces accordingly
2398 @return a dummy parameter */
2399 extern "C" UNIV_INTERN
2400 os_thread_ret_t
2401 DECLARE_THREAD(fil_crypt_thread)(void*)
2402 {
2403 mutex_enter(&fil_crypt_threads_mutex);
2404 uint thread_no = srv_n_fil_crypt_threads_started;
2405 srv_n_fil_crypt_threads_started++;
2406 os_event_set(fil_crypt_event); /* signal that we started */
2407 mutex_exit(&fil_crypt_threads_mutex);
2408
2409 /* state of this thread */
2410 rotate_thread_t thr(thread_no);
2411
2412 /* if we find a space that is starting, skip over it and recheck it later */
2413 bool recheck = false;
2414
2415 while (!thr.should_shutdown()) {
2416
2417 key_state_t new_state;
2418
2419 time_t wait_start = time(0);
2420
2421 while (!thr.should_shutdown()) {
2422
2423 /* wait for key state changes
2424 * i.e either new key version of change or
2425 * new rotate_key_age */
2426 os_event_reset(fil_crypt_threads_event);
2427
2428 if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2429 break;
2430 }
2431
2432 if (recheck) {
2433 /* check recheck here, after sleep, so
2434 * that we don't busy loop while when one thread is starting
2435 * a space*/
2436 break;
2437 }
2438
2439 time_t waited = time(0) - wait_start;
2440
2441 /* Break if we have waited the background scrub
2442 internal and background scrubbing is enabled */
2443 if (waited >= 0
2444 && ulint(waited) >= srv_background_scrub_data_check_interval
2445 && (srv_background_scrub_data_uncompressed
2446 || srv_background_scrub_data_compressed)) {
2447 break;
2448 }
2449 }
2450
2451 recheck = false;
2452 thr.first = true; // restart from first tablespace
2453
2454 /* iterate all spaces searching for those needing rotation */
2455 while (!thr.should_shutdown() &&
2456 fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2457
2458 /* we found a space to rotate */
2459 fil_crypt_start_rotate_space(&new_state, &thr);
2460
2461 /* iterate all pages (cooperativly with other threads) */
2462 while (!thr.should_shutdown() &&
2463 fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2464
2465 if (!thr.space->is_stopping()) {
2466 /* rotate a (set) of pages */
2467 fil_crypt_rotate_pages(&new_state, &thr);
2468 }
2469
2470 /* If space is marked as stopping, release
2471 space and stop rotation. */
2472 if (thr.space->is_stopping()) {
2473 fil_crypt_complete_rotate_space(&thr);
2474 thr.space->release();
2475 thr.space = NULL;
2476 break;
2477 }
2478
2479 /* realloc iops */
2480 fil_crypt_realloc_iops(&thr);
2481 }
2482
2483 /* complete rotation */
2484 if (thr.space) {
2485 fil_crypt_complete_rotate_space(&thr);
2486 }
2487
2488 /* force key state refresh */
2489 new_state.key_id = 0;
2490
2491 /* return iops */
2492 fil_crypt_return_iops(&thr);
2493 }
2494 }
2495
2496 /* return iops if shutting down */
2497 fil_crypt_return_iops(&thr);
2498
2499 /* release current space if shutting down */
2500 if (thr.space) {
2501 thr.space->release();
2502 thr.space = NULL;
2503 }
2504
2505 mutex_enter(&fil_crypt_threads_mutex);
2506 srv_n_fil_crypt_threads_started--;
2507 os_event_set(fil_crypt_event); /* signal that we stopped */
2508 mutex_exit(&fil_crypt_threads_mutex);
2509
2510 /* We count the number of threads in os_thread_exit(). A created
2511 thread should always use that to exit and not use return() to exit. */
2512
2513 os_thread_exit();
2514
2515 OS_THREAD_DUMMY_RETURN;
2516 }
2517
2518 /*********************************************************************
2519 Adjust thread count for key rotation
2520 @param[in] enw_cnt Number of threads to be used */
2521 UNIV_INTERN
2522 void
2523 fil_crypt_set_thread_cnt(
2524 const uint new_cnt)
2525 {
2526 if (!fil_crypt_threads_inited) {
2527 fil_crypt_threads_init();
2528 }
2529
2530 mutex_enter(&fil_crypt_threads_mutex);
2531
2532 if (new_cnt > srv_n_fil_crypt_threads) {
2533 uint add = new_cnt - srv_n_fil_crypt_threads;
2534 srv_n_fil_crypt_threads = new_cnt;
2535 for (uint i = 0; i < add; i++) {
2536 os_thread_id_t rotation_thread_id;
2537 os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
2538 ib::info() << "Creating #"
2539 << i+1 << " encryption thread id "
2540 << os_thread_pf(rotation_thread_id)
2541 << " total threads " << new_cnt << ".";
2542 }
2543 } else if (new_cnt < srv_n_fil_crypt_threads) {
2544 srv_n_fil_crypt_threads = new_cnt;
2545 os_event_set(fil_crypt_threads_event);
2546 }
2547
2548 mutex_exit(&fil_crypt_threads_mutex);
2549
2550 while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2551 os_event_reset(fil_crypt_event);
2552 os_event_wait_time(fil_crypt_event, 100000);
2553 }
2554
2555 /* Send a message to encryption threads that there could be
2556 something to do. */
2557 if (srv_n_fil_crypt_threads) {
2558 os_event_set(fil_crypt_threads_event);
2559 }
2560 }
2561
2562 /** Initialize the tablespace default_encrypt_tables
2563 if innodb_encryption_rotate_key_age=0. */
2564 static void fil_crypt_default_encrypt_tables_fill()
2565 {
2566 ut_ad(mutex_own(&fil_system.mutex));
2567
2568 for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2569 space != NULL;
2570 space = UT_LIST_GET_NEXT(space_list, space)) {
2571 if (space->purpose != FIL_TYPE_TABLESPACE
2572 || space->is_in_default_encrypt
2573 || UT_LIST_GET_LEN(space->chain) == 0
2574 || !space->acquire()) {
2575 continue;
2576 }
2577
2578 /* Ensure that crypt_data has been initialized. */
2579 if (!space->size) {
2580 ut_d(const fil_space_t* s=)
2581 fil_system.read_page0(space->id);
2582 ut_ad(!s || s == space);
2583 if (!space->size) {
2584 /* Page 0 was not loaded.
2585 Skip this tablespace. */
2586 goto next;
2587 }
2588 }
2589
2590 /* Skip ENCRYPTION!=DEFAULT tablespaces. */
2591 if (space->crypt_data
2592 && !space->crypt_data->is_default_encryption()) {
2593 goto next;
2594 }
2595
2596 if (srv_encrypt_tables) {
2597 /* Skip encrypted tablespaces if
2598 innodb_encrypt_tables!=OFF */
2599 if (space->crypt_data
2600 && space->crypt_data->min_key_version) {
2601 goto next;
2602 }
2603 } else {
2604 /* Skip unencrypted tablespaces if
2605 innodb_encrypt_tables=OFF */
2606 if (!space->crypt_data
2607 || !space->crypt_data->min_key_version) {
2608 goto next;
2609 }
2610 }
2611
2612 fil_system.default_encrypt_tables.push_back(*space);
2613 space->is_in_default_encrypt = true;
2614 next:
2615 space->release();
2616 }
2617 }
2618
2619 /*********************************************************************
2620 Adjust max key age
2621 @param[in] val New max key age */
2622 UNIV_INTERN
2623 void
2624 fil_crypt_set_rotate_key_age(
2625 uint val)
2626 {
2627 mutex_enter(&fil_system.mutex);
2628 srv_fil_crypt_rotate_key_age = val;
2629 if (val == 0) {
2630 fil_crypt_default_encrypt_tables_fill();
2631 }
2632 mutex_exit(&fil_system.mutex);
2633 os_event_set(fil_crypt_threads_event);
2634 }
2635
2636 /*********************************************************************
2637 Adjust rotation iops
2638 @param[in] val New max roation iops */
2639 UNIV_INTERN
2640 void
2641 fil_crypt_set_rotation_iops(
2642 uint val)
2643 {
2644 srv_n_fil_crypt_iops = val;
2645 os_event_set(fil_crypt_threads_event);
2646 }
2647
2648 /*********************************************************************
2649 Adjust encrypt tables
2650 @param[in] val New setting for innodb-encrypt-tables */
2651 UNIV_INTERN
2652 void
2653 fil_crypt_set_encrypt_tables(
2654 uint val)
2655 {
2656 if (!fil_crypt_threads_inited) {
2657 return;
2658 }
2659
2660 mutex_enter(&fil_system.mutex);
2661
2662 srv_encrypt_tables = val;
2663
2664 if (fil_crypt_must_default_encrypt()) {
2665 fil_crypt_default_encrypt_tables_fill();
2666 }
2667
2668 mutex_exit(&fil_system.mutex);
2669
2670 os_event_set(fil_crypt_threads_event);
2671 }
2672
2673 /*********************************************************************
2674 Init threads for key rotation */
2675 UNIV_INTERN
2676 void
2677 fil_crypt_threads_init()
2678 {
2679 if (!fil_crypt_threads_inited) {
2680 fil_crypt_event = os_event_create(0);
2681 fil_crypt_threads_event = os_event_create(0);
2682 mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2683 &fil_crypt_threads_mutex);
2684
2685 uint cnt = srv_n_fil_crypt_threads;
2686 srv_n_fil_crypt_threads = 0;
2687 fil_crypt_threads_inited = true;
2688 fil_crypt_set_thread_cnt(cnt);
2689 }
2690 }
2691
2692 /*********************************************************************
2693 Clean up key rotation threads resources */
2694 UNIV_INTERN
2695 void
2696 fil_crypt_threads_cleanup()
2697 {
2698 if (!fil_crypt_threads_inited) {
2699 return;
2700 }
2701 ut_a(!srv_n_fil_crypt_threads_started);
2702 os_event_destroy(fil_crypt_event);
2703 os_event_destroy(fil_crypt_threads_event);
2704 mutex_free(&fil_crypt_threads_mutex);
2705 fil_crypt_threads_inited = false;
2706 }
2707
2708 /*********************************************************************
2709 Wait for crypt threads to stop accessing space
2710 @param[in] space Tablespace */
2711 UNIV_INTERN
2712 void
2713 fil_space_crypt_close_tablespace(
2714 const fil_space_t* space)
2715 {
2716 fil_space_crypt_t* crypt_data = space->crypt_data;
2717
2718 if (!crypt_data || srv_n_fil_crypt_threads == 0
2719 || !fil_crypt_threads_inited) {
2720 return;
2721 }
2722
2723 mutex_enter(&fil_crypt_threads_mutex);
2724
2725 time_t start = time(0);
2726 time_t last = start;
2727
2728 mutex_enter(&crypt_data->mutex);
2729 mutex_exit(&fil_crypt_threads_mutex);
2730
2731 ulint cnt = crypt_data->rotate_state.active_threads;
2732 bool flushing = crypt_data->rotate_state.flushing;
2733
2734 while (cnt > 0 || flushing) {
2735 mutex_exit(&crypt_data->mutex);
2736 /* release dict mutex so that scrub threads can release their
2737 * table references */
2738 dict_mutex_exit_for_mysql();
2739
2740 /* wakeup throttle (all) sleepers */
2741 os_event_set(fil_crypt_throttle_sleep_event);
2742 os_event_set(fil_crypt_threads_event);
2743
2744 os_thread_sleep(20000);
2745 dict_mutex_enter_for_mysql();
2746 mutex_enter(&crypt_data->mutex);
2747 cnt = crypt_data->rotate_state.active_threads;
2748 flushing = crypt_data->rotate_state.flushing;
2749
2750 time_t now = time(0);
2751
2752 if (now >= last + 30) {
2753 ib::warn() << "Waited "
2754 << now - start
2755 << " seconds to drop space: "
2756 << space->name << " ("
2757 << space->id << ") active threads "
2758 << cnt << "flushing="
2759 << flushing << ".";
2760 last = now;
2761 }
2762 }
2763
2764 mutex_exit(&crypt_data->mutex);
2765 }
2766
2767 /*********************************************************************
2768 Get crypt status for a space (used by information_schema)
2769 @param[in] space Tablespace
2770 @param[out] status Crypt status */
2771 UNIV_INTERN
2772 void
2773 fil_space_crypt_get_status(
2774 const fil_space_t* space,
2775 struct fil_space_crypt_status_t* status)
2776 {
2777 memset(status, 0, sizeof(*status));
2778
2779 ut_ad(space->referenced());
2780
2781 /* If there is no crypt data and we have not yet read
2782 page 0 for this tablespace, we need to read it before
2783 we can continue. */
2784 if (!space->crypt_data) {
2785 fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2786 }
2787
2788 status->space = ULINT_UNDEFINED;
2789
2790 if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2791 status->space = space->id;
2792 mutex_enter(&crypt_data->mutex);
2793 status->scheme = crypt_data->type;
2794 status->keyserver_requests = crypt_data->keyserver_requests;
2795 status->min_key_version = crypt_data->min_key_version;
2796 status->key_id = crypt_data->key_id;
2797
2798 if (crypt_data->rotate_state.active_threads > 0 ||
2799 crypt_data->rotate_state.flushing) {
2800 status->rotating = true;
2801 status->flushing =
2802 crypt_data->rotate_state.flushing;
2803 status->rotate_next_page_number =
2804 crypt_data->rotate_state.next_offset;
2805 status->rotate_max_page_number =
2806 crypt_data->rotate_state.max_offset;
2807 }
2808
2809 mutex_exit(&crypt_data->mutex);
2810
2811 if (srv_encrypt_tables || crypt_data->min_key_version) {
2812 status->current_key_version =
2813 fil_crypt_get_latest_key_version(crypt_data);
2814 }
2815 }
2816 }
2817
2818 /*********************************************************************
2819 Return crypt statistics
2820 @param[out] stat Crypt statistics */
2821 UNIV_INTERN
2822 void
2823 fil_crypt_total_stat(
2824 fil_crypt_stat_t *stat)
2825 {
2826 mutex_enter(&crypt_stat_mutex);
2827 *stat = crypt_stat;
2828 mutex_exit(&crypt_stat_mutex);
2829 }
2830
2831 /*********************************************************************
2832 Get scrub status for a space (used by information_schema)
2833
2834 @param[in] space Tablespace
2835 @param[out] status Scrub status */
2836 UNIV_INTERN
2837 void
2838 fil_space_get_scrub_status(
2839 const fil_space_t* space,
2840 struct fil_space_scrub_status_t* status)
2841 {
2842 memset(status, 0, sizeof(*status));
2843
2844 ut_ad(space->referenced());
2845 fil_space_crypt_t* crypt_data = space->crypt_data;
2846
2847 status->space = space->id;
2848
2849 if (crypt_data != NULL) {
2850 status->compressed = FSP_FLAGS_GET_ZIP_SSIZE(space->flags) > 0;
2851 mutex_enter(&crypt_data->mutex);
2852 status->last_scrub_completed =
2853 crypt_data->rotate_state.scrubbing.last_scrub_completed;
2854 if (crypt_data->rotate_state.active_threads > 0 &&
2855 crypt_data->rotate_state.scrubbing.is_active) {
2856 status->scrubbing = true;
2857 status->current_scrub_started =
2858 crypt_data->rotate_state.start_time;
2859 status->current_scrub_active_threads =
2860 crypt_data->rotate_state.active_threads;
2861 status->current_scrub_page_number =
2862 crypt_data->rotate_state.next_offset;
2863 status->current_scrub_max_page_number =
2864 crypt_data->rotate_state.max_offset;
2865 }
2866
2867 mutex_exit(&crypt_data->mutex);
2868 }
2869 }
2870 #endif /* UNIV_INNOCHECKSUM */
2871
2872 /**
2873 Verify that post encryption checksum match calculated checksum.
2874 This function should be called only if tablespace contains crypt_data
2875 metadata (this is strong indication that tablespace is encrypted).
2876 Function also verifies that traditional checksum does not match
2877 calculated checksum as if it does page could be valid unencrypted,
2878 encrypted, or corrupted.
2879
2880 @param[in,out] page page frame (checksum is temporarily modified)
2881 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
2882 @return true if page is encrypted AND OK, false otherwise */
2883 bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
2884 {
2885 if (ENCRYPTION_KEY_NOT_ENCRYPTED == mach_read_from_4(
2886 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) {
2887 return false;
2888 }
2889
2890 /* Compressed and encrypted pages do not have checksum. Assume not
2891 corrupted. Page verification happens after decompression in
2892 buf_page_io_complete() using buf_page_is_corrupted(). */
2893 if (mach_read_from_2(page + FIL_PAGE_TYPE)
2894 == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2895 return true;
2896 }
2897
2898 /* Read stored post encryption checksum. */
2899 const ib_uint32_t checksum = mach_read_from_4(
2900 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2901
2902 /* If stored checksum matches one of the calculated checksums
2903 page is not corrupted. */
2904
2905 switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2906 case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
2907 case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2908 if (zip_size) {
2909 return checksum == page_zip_calc_checksum(
2910 page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
2911 }
2912
2913 return checksum == buf_calc_page_crc32(page);
2914 case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2915 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2916 due to MDEV-12114, fil_crypt_calculate_checksum()
2917 is only using CRC32 for the encrypted pages.
2918 Due to this, we must treat "strict_none" as "none". */
2919 case SRV_CHECKSUM_ALGORITHM_NONE:
2920 return true;
2921 case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2922 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2923 due to MDEV-12114, fil_crypt_calculate_checksum()
2924 is only using CRC32 for the encrypted pages.
2925 Due to this, we must treat "strict_innodb" as "innodb". */
2926 case SRV_CHECKSUM_ALGORITHM_INNODB:
2927 case SRV_CHECKSUM_ALGORITHM_CRC32:
2928 case SRV_CHECKSUM_ALGORITHM_FULL_CRC32:
2929 if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2930 return true;
2931 }
2932 if (zip_size) {
2933 return checksum == page_zip_calc_checksum(
2934 page, zip_size,
2935 SRV_CHECKSUM_ALGORITHM_CRC32)
2936 || checksum == page_zip_calc_checksum(
2937 page, zip_size,
2938 SRV_CHECKSUM_ALGORITHM_INNODB);
2939 }
2940
2941 return checksum == buf_calc_page_crc32(page)
2942 || checksum == buf_calc_page_new_checksum(page);
2943 }
2944
2945 ut_ad(!"unhandled innodb_checksum_algorithm");
2946 return false;
2947 }
2948