1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21
22 Created Jonas Oreland Google
23 Modified Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25
26 #include "fil0fil.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0size.h"
30 #include "page0zip.h"
31 #ifndef UNIV_INNOCHECKSUM
32 #include "fil0crypt.h"
33 #include "srv0srv.h"
34 #include "srv0start.h"
35 #include "log0recv.h"
36 #include "mtr0mtr.h"
37 #include "mtr0log.h"
38 #include "ut0ut.h"
39 #include "btr0scrub.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43
44 static bool fil_crypt_threads_inited = false;
45
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop
78 static uint srv_alloc_time = 3; // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80
81 /** Variables for scrubbing */
82 extern uint srv_background_scrub_data_interval;
83 extern uint srv_background_scrub_data_check_interval;
84
85 #define DEBUG_KEYROTATION_THROTTLING 0
86
87 /** Statistics variables */
88 static fil_crypt_stat_t crypt_stat;
89 static ib_mutex_t crypt_stat_mutex;
90
91 /** Is background scrubbing enabled, defined on btr0scrub.cc */
92 extern my_bool srv_background_scrub_data_uncompressed;
93 extern my_bool srv_background_scrub_data_compressed;
94
95 /***********************************************************************
96 Check if a key needs rotation given a key_state
97 @param[in] crypt_data Encryption information
98 @param[in] key_version Current key version
99 @param[in] latest_key_version Latest key version
100 @param[in] rotate_key_age when to rotate
101 @return true if key needs rotation, false if not */
102 static bool
103 fil_crypt_needs_rotation(
104 const fil_space_crypt_t* crypt_data,
105 uint key_version,
106 uint latest_key_version,
107 uint rotate_key_age)
108 MY_ATTRIBUTE((warn_unused_result));
109
110 /*********************************************************************
111 Init space crypt */
112 UNIV_INTERN
113 void
fil_space_crypt_init()114 fil_space_crypt_init()
115 {
116 fil_crypt_throttle_sleep_event = os_event_create(0);
117
118 mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
119 memset(&crypt_stat, 0, sizeof(crypt_stat));
120 }
121
122 /*********************************************************************
123 Cleanup space crypt */
124 UNIV_INTERN
125 void
fil_space_crypt_cleanup()126 fil_space_crypt_cleanup()
127 {
128 os_event_destroy(fil_crypt_throttle_sleep_event);
129 mutex_free(&crypt_stat_mutex);
130 }
131
132 /**
133 Get latest key version from encryption plugin.
134 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
135 uint
key_get_latest_version(void)136 fil_space_crypt_t::key_get_latest_version(void)
137 {
138 uint key_version = key_found;
139
140 if (is_key_found()) {
141 key_version = encryption_key_get_latest_version(key_id);
142 /* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
143 It doesn't matter because srv_encrypt_rotate
144 can be set to true only once */
145 if (!srv_encrypt_rotate
146 && key_version > srv_fil_crypt_rotate_key_age) {
147 srv_encrypt_rotate = true;
148 }
149
150 srv_stats.n_key_requests.inc();
151 key_found = key_version;
152 }
153
154 return key_version;
155 }
156
157 /******************************************************************
158 Get the latest(key-version), waking the encrypt thread, if needed
159 @param[in,out] crypt_data Crypt data */
160 static inline
161 uint
fil_crypt_get_latest_key_version(fil_space_crypt_t * crypt_data)162 fil_crypt_get_latest_key_version(
163 fil_space_crypt_t* crypt_data)
164 {
165 ut_ad(crypt_data != NULL);
166
167 uint key_version = crypt_data->key_get_latest_version();
168
169 if (crypt_data->is_key_found()) {
170
171 if (fil_crypt_needs_rotation(
172 crypt_data,
173 crypt_data->min_key_version,
174 key_version,
175 srv_fil_crypt_rotate_key_age)) {
176 /* Below event seen as NULL-pointer at startup
177 when new database was created and we create a
178 checkpoint. Only seen when debugging. */
179 if (fil_crypt_threads_inited) {
180 os_event_set(fil_crypt_threads_event);
181 }
182 }
183 }
184
185 return key_version;
186 }
187
188 /******************************************************************
189 Mutex helper for crypt_data->scheme */
190 void
crypt_data_scheme_locker(st_encryption_scheme * scheme,int exit)191 crypt_data_scheme_locker(
192 /*=====================*/
193 st_encryption_scheme* scheme,
194 int exit)
195 {
196 fil_space_crypt_t* crypt_data =
197 static_cast<fil_space_crypt_t*>(scheme);
198
199 if (exit) {
200 mutex_exit(&crypt_data->mutex);
201 } else {
202 mutex_enter(&crypt_data->mutex);
203 }
204 }
205
206 /******************************************************************
207 Create a fil_space_crypt_t object
208 @param[in] type CRYPT_SCHEME_UNENCRYPTE or
209 CRYPT_SCHEME_1
210 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
211 FIL_ENCRYPTION_ON or
212 FIL_ENCRYPTION_OFF
213 @param[in] min_key_version key_version or 0
214 @param[in] key_id Used key id
215 @return crypt object */
216 static
217 fil_space_crypt_t*
fil_space_create_crypt_data(uint type,fil_encryption_t encrypt_mode,uint min_key_version,uint key_id)218 fil_space_create_crypt_data(
219 uint type,
220 fil_encryption_t encrypt_mode,
221 uint min_key_version,
222 uint key_id)
223 {
224 fil_space_crypt_t* crypt_data = NULL;
225 if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
226 crypt_data = new(buf)
227 fil_space_crypt_t(
228 type,
229 min_key_version,
230 key_id,
231 encrypt_mode);
232 }
233
234 return crypt_data;
235 }
236
237 /******************************************************************
238 Create a fil_space_crypt_t object
239 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
240 FIL_ENCRYPTION_ON or
241 FIL_ENCRYPTION_OFF
242
243 @param[in] key_id Encryption key id
244 @return crypt object */
245 UNIV_INTERN
246 fil_space_crypt_t*
fil_space_create_crypt_data(fil_encryption_t encrypt_mode,uint key_id)247 fil_space_create_crypt_data(
248 fil_encryption_t encrypt_mode,
249 uint key_id)
250 {
251 return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
252 }
253
254 /******************************************************************
255 Merge fil_space_crypt_t object
256 @param[in,out] dst Destination cryp data
257 @param[in] src Source crypt data */
258 UNIV_INTERN
259 void
fil_space_merge_crypt_data(fil_space_crypt_t * dst,const fil_space_crypt_t * src)260 fil_space_merge_crypt_data(
261 fil_space_crypt_t* dst,
262 const fil_space_crypt_t* src)
263 {
264 mutex_enter(&dst->mutex);
265
266 /* validate that they are mergeable */
267 ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
268 src->type == CRYPT_SCHEME_1);
269
270 ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
271 dst->type == CRYPT_SCHEME_1);
272
273 dst->encryption = src->encryption;
274 dst->type = src->type;
275 dst->min_key_version = src->min_key_version;
276 dst->keyserver_requests += src->keyserver_requests;
277
278 mutex_exit(&dst->mutex);
279 }
280
281 /** Initialize encryption parameters from a tablespace header page.
282 @param[in] page_size page size of the tablespace
283 @param[in] page first page of the tablespace
284 @return crypt data from page 0
285 @retval NULL if not present or not valid */
286 UNIV_INTERN
287 fil_space_crypt_t*
fil_space_read_crypt_data(const page_size_t & page_size,const byte * page)288 fil_space_read_crypt_data(const page_size_t& page_size, const byte* page)
289 {
290 const ulint offset = FSP_HEADER_OFFSET
291 + fsp_header_get_encryption_offset(page_size);
292
293 if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
294 /* Crypt data is not stored. */
295 return NULL;
296 }
297
298 uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
299 uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
300 fil_space_crypt_t* crypt_data;
301
302 if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
303 type == CRYPT_SCHEME_1)
304 || iv_length != sizeof crypt_data->iv) {
305 ib::error() << "Found non sensible crypt scheme: "
306 << type << "," << iv_length
307 << " for space: "
308 << page_get_space_id(page);
309 return NULL;
310 }
311
312 uint min_key_version = mach_read_from_4
313 (page + offset + MAGIC_SZ + 2 + iv_length);
314
315 uint key_id = mach_read_from_4
316 (page + offset + MAGIC_SZ + 2 + iv_length + 4);
317
318 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
319 page + offset + MAGIC_SZ + 2 + iv_length + 8);
320
321 crypt_data = fil_space_create_crypt_data(encryption, key_id);
322 /* We need to overwrite these as above function will initialize
323 members */
324 crypt_data->type = type;
325 crypt_data->min_key_version = min_key_version;
326 crypt_data->page0_offset = offset;
327 memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
328
329 return crypt_data;
330 }
331
332 /******************************************************************
333 Free a crypt data object
334 @param[in,out] crypt_data crypt data to be freed */
335 UNIV_INTERN
336 void
fil_space_destroy_crypt_data(fil_space_crypt_t ** crypt_data)337 fil_space_destroy_crypt_data(
338 fil_space_crypt_t **crypt_data)
339 {
340 if (crypt_data != NULL && (*crypt_data) != NULL) {
341 fil_space_crypt_t* c;
342 if (UNIV_LIKELY(fil_crypt_threads_inited)) {
343 mutex_enter(&fil_crypt_threads_mutex);
344 c = *crypt_data;
345 *crypt_data = NULL;
346 mutex_exit(&fil_crypt_threads_mutex);
347 } else {
348 ut_ad(srv_read_only_mode || !srv_was_started);
349 c = *crypt_data;
350 *crypt_data = NULL;
351 }
352 if (c) {
353 c->~fil_space_crypt_t();
354 ut_free(c);
355 }
356 }
357 }
358
359 /** Fill crypt data information to the give page.
360 It should be called during ibd file creation.
361 @param[in] flags tablespace flags
362 @param[in,out] page first page of the tablespace */
363 void
fill_page0(ulint flags,byte * page)364 fil_space_crypt_t::fill_page0(
365 ulint flags,
366 byte* page)
367 {
368 const uint len = sizeof(iv);
369 const ulint offset = FSP_HEADER_OFFSET
370 + fsp_header_get_encryption_offset(page_size_t(flags));
371 page0_offset = offset;
372
373 memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
374 mach_write_to_1(page + offset + MAGIC_SZ, type);
375 mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
376 memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
377
378 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
379 min_key_version);
380 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
381 key_id);
382 mach_write_to_1(page + offset + MAGIC_SZ + 2 + len + 8,
383 encryption);
384 }
385
386 /******************************************************************
387 Write crypt data to a page (0)
388 @param[in] space tablespace
389 @param[in,out] page0 first page of the tablespace
390 @param[in,out] mtr mini-transaction */
391 UNIV_INTERN
392 void
write_page0(const fil_space_t * space,byte * page,mtr_t * mtr)393 fil_space_crypt_t::write_page0(
394 const fil_space_t* space,
395 byte* page,
396 mtr_t* mtr)
397 {
398 ut_ad(this == space->crypt_data);
399 const uint len = sizeof(iv);
400 const ulint offset = FSP_HEADER_OFFSET
401 + fsp_header_get_encryption_offset(page_size_t(space->flags));
402 page0_offset = offset;
403
404 /*
405 redo log this as bytewise updates to page 0
406 followed by an MLOG_FILE_WRITE_CRYPT_DATA
407 (that will during recovery update fil_space_t)
408 */
409 mlog_write_string(page + offset, CRYPT_MAGIC, MAGIC_SZ, mtr);
410 mlog_write_ulint(page + offset + MAGIC_SZ + 0, type, MLOG_1BYTE, mtr);
411 mlog_write_ulint(page + offset + MAGIC_SZ + 1, len, MLOG_1BYTE, mtr);
412 mlog_write_string(page + offset + MAGIC_SZ + 2, iv, len,
413 mtr);
414 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len, min_key_version,
415 MLOG_4BYTES, mtr);
416 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 4, key_id,
417 MLOG_4BYTES, mtr);
418 mlog_write_ulint(page + offset + MAGIC_SZ + 2 + len + 8, encryption,
419 MLOG_1BYTE, mtr);
420
421 DBUG_EXECUTE_IF("ib_do_not_log_crypt_data", return;);
422
423 byte* log_ptr = mlog_open(mtr, 11 + 17 + len);
424
425 if (log_ptr != NULL) {
426 log_ptr = mlog_write_initial_log_record_fast(
427 page,
428 MLOG_FILE_WRITE_CRYPT_DATA,
429 log_ptr, mtr);
430 mach_write_to_4(log_ptr, space->id);
431 log_ptr += 4;
432 mach_write_to_2(log_ptr, offset);
433 log_ptr += 2;
434 mach_write_to_1(log_ptr, type);
435 log_ptr += 1;
436 mach_write_to_1(log_ptr, len);
437 log_ptr += 1;
438 mach_write_to_4(log_ptr, min_key_version);
439 log_ptr += 4;
440 mach_write_to_4(log_ptr, key_id);
441 log_ptr += 4;
442 mach_write_to_1(log_ptr, encryption);
443 log_ptr += 1;
444 mlog_close(mtr, log_ptr);
445
446 mlog_catenate_string(mtr, iv, len);
447 }
448 }
449
450 /******************************************************************
451 Parse a MLOG_FILE_WRITE_CRYPT_DATA log entry
452 @param[in] ptr Log entry start
453 @param[in] end_ptr Log entry end
454 @param[in] block buffer block
455 @return position on log buffer */
456 UNIV_INTERN
457 byte*
fil_parse_write_crypt_data(byte * ptr,const byte * end_ptr,dberr_t * err)458 fil_parse_write_crypt_data(
459 byte* ptr,
460 const byte* end_ptr,
461 dberr_t* err)
462 {
463 /* check that redo log entry is complete */
464 uint entry_size =
465 4 + // size of space_id
466 2 + // size of offset
467 1 + // size of type
468 1 + // size of iv-len
469 4 + // size of min_key_version
470 4 + // size of key_id
471 1; // fil_encryption_t
472
473 *err = DB_SUCCESS;
474
475 if (ptr + entry_size > end_ptr) {
476 return NULL;
477 }
478
479 ulint space_id = mach_read_from_4(ptr);
480 ptr += 4;
481 uint offset = mach_read_from_2(ptr);
482 ptr += 2;
483 uint type = mach_read_from_1(ptr);
484 ptr += 1;
485 uint len = mach_read_from_1(ptr);
486 ptr += 1;
487
488 if ((type != CRYPT_SCHEME_1 && type != CRYPT_SCHEME_UNENCRYPTED)
489 || len != CRYPT_SCHEME_1_IV_LEN) {
490 *err = DB_CORRUPTION;
491 return NULL;
492 }
493
494 uint min_key_version = mach_read_from_4(ptr);
495 ptr += 4;
496
497 uint key_id = mach_read_from_4(ptr);
498 ptr += 4;
499
500 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(ptr);
501 ptr +=1;
502
503 if (ptr + len > end_ptr) {
504 return NULL;
505 }
506
507 mutex_enter(&fil_system.mutex);
508
509 fil_space_t* space = fil_space_get_by_id(space_id);
510
511 if (!space) {
512 mutex_exit(&fil_system.mutex);
513 return ptr + len;
514 }
515
516 fil_space_crypt_t* crypt_data = fil_space_create_crypt_data(
517 encryption, key_id);
518
519 crypt_data->page0_offset = offset;
520 crypt_data->min_key_version = min_key_version;
521 crypt_data->type = type;
522 memcpy(crypt_data->iv, ptr, len);
523 ptr += len;
524
525 if (space->crypt_data) {
526 fil_space_merge_crypt_data(space->crypt_data, crypt_data);
527 fil_space_destroy_crypt_data(&crypt_data);
528 crypt_data = space->crypt_data;
529 } else {
530 space->crypt_data = crypt_data;
531 }
532
533 mutex_exit(&fil_system.mutex);
534
535 if (crypt_data->should_encrypt() && !crypt_data->is_key_found()) {
536 *err = DB_DECRYPTION_FAILED;
537 }
538
539 return ptr;
540 }
541
542 /** Encrypt a buffer.
543 @param[in,out] crypt_data Crypt data
544 @param[in] space space_id
545 @param[in] offset Page offset
546 @param[in] lsn Log sequence number
547 @param[in] src_frame Page to encrypt
548 @param[in] page_size Page size
549 @param[in,out] dst_frame Output buffer
550 @return encrypted buffer or NULL */
551 UNIV_INTERN
552 byte*
fil_encrypt_buf(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,const page_size_t & page_size,byte * dst_frame)553 fil_encrypt_buf(
554 fil_space_crypt_t* crypt_data,
555 ulint space,
556 ulint offset,
557 lsn_t lsn,
558 const byte* src_frame,
559 const page_size_t& page_size,
560 byte* dst_frame)
561 {
562 uint size = uint(page_size.physical());
563 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
564
565 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
566
567 ulint orig_page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
568 ibool page_compressed = (orig_page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
569 uint header_len = FIL_PAGE_DATA;
570
571 if (page_compressed) {
572 header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
573 }
574
575 /* FIL page header is not encrypted */
576 memcpy(dst_frame, src_frame, header_len);
577
578 /* Store key version */
579 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, key_version);
580
581 /* Calculate the start offset in a page */
582 uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
583 uint srclen = size - unencrypted_bytes;
584 const byte* src = src_frame + header_len;
585 byte* dst = dst_frame + header_len;
586 uint32 dstlen = 0;
587
588 if (page_compressed) {
589 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
590 }
591
592 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
593 crypt_data, key_version,
594 (uint32)space, (uint32)offset, lsn);
595 ut_a(rc == MY_AES_OK);
596 ut_a(dstlen == srclen);
597
598 /* For compressed tables we do not store the FIL header because
599 the whole page is not stored to the disk. In compressed tables only
600 the FIL header + compressed (and now encrypted) payload alligned
601 to sector boundary is written. */
602 if (!page_compressed) {
603 /* FIL page trailer is also not encrypted */
604 memcpy(dst_frame + page_size.physical() - FIL_PAGE_DATA_END,
605 src_frame + page_size.physical() - FIL_PAGE_DATA_END,
606 FIL_PAGE_DATA_END);
607 } else {
608 /* Clean up rest of buffer */
609 memset(dst_frame+header_len+srclen, 0,
610 page_size.physical() - (header_len + srclen));
611 }
612
613 /* handle post encryption checksum */
614 ib_uint32_t checksum = 0;
615
616 checksum = fil_crypt_calculate_checksum(page_size, dst_frame);
617
618 // store the post-encryption checksum after the key-version
619 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4, checksum);
620
621 ut_ad(fil_space_verify_crypt_checksum(dst_frame, page_size));
622
623 srv_stats.pages_encrypted.inc();
624
625 return dst_frame;
626 }
627
628 /******************************************************************
629 Encrypt a page
630
631 @param[in] space Tablespace
632 @param[in] offset Page offset
633 @param[in] lsn Log sequence number
634 @param[in] src_frame Page to encrypt
635 @param[in,out] dst_frame Output buffer
636 @return encrypted buffer or NULL */
637 UNIV_INTERN
638 byte*
fil_space_encrypt(const fil_space_t * space,ulint offset,lsn_t lsn,byte * src_frame,byte * dst_frame)639 fil_space_encrypt(
640 const fil_space_t* space,
641 ulint offset,
642 lsn_t lsn,
643 byte* src_frame,
644 byte* dst_frame)
645 {
646 switch (mach_read_from_2(src_frame+FIL_PAGE_TYPE)) {
647 case FIL_PAGE_TYPE_FSP_HDR:
648 case FIL_PAGE_TYPE_XDES:
649 case FIL_PAGE_RTREE:
650 /* File space header, extent descriptor or spatial index
651 are not encrypted. */
652 return src_frame;
653 }
654
655 if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
656 return (src_frame);
657 }
658
659 ut_ad(space->pending_io());
660 return fil_encrypt_buf(space->crypt_data, space->id, offset, lsn,
661 src_frame, page_size_t(space->flags),
662 dst_frame);
663 }
664
665 /** Decrypt a page.
666 @param[in] crypt_data crypt_data
667 @param[in] tmp_frame Temporary buffer
668 @param[in] page_size Page size
669 @param[in,out] src_frame Page to decrypt
670 @return DB_SUCCESS or error */
671 UNIV_INTERN
672 dberr_t
fil_space_decrypt(fil_space_crypt_t * crypt_data,byte * tmp_frame,const page_size_t & page_size,byte * src_frame)673 fil_space_decrypt(
674 fil_space_crypt_t* crypt_data,
675 byte* tmp_frame,
676 const page_size_t& page_size,
677 byte* src_frame)
678 {
679 ulint page_type = mach_read_from_2(src_frame+FIL_PAGE_TYPE);
680 uint key_version = mach_read_from_4(src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
681 bool page_compressed = (page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
682 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
683 uint space = mach_read_from_4(src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
684 ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
685
686 ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
687 ut_a(crypt_data != NULL && crypt_data->is_encrypted());
688
689 /* read space & lsn */
690 uint header_len = FIL_PAGE_DATA;
691
692 if (page_compressed) {
693 header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
694 }
695
696 /* Copy FIL page header, it is not encrypted */
697 memcpy(tmp_frame, src_frame, header_len);
698
699 /* Calculate the offset where decryption starts */
700 const byte* src = src_frame + header_len;
701 byte* dst = tmp_frame + header_len;
702 uint32 dstlen = 0;
703 uint srclen = uint(page_size.physical())
704 - header_len - FIL_PAGE_DATA_END;
705
706 if (page_compressed) {
707 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
708 }
709
710 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
711 crypt_data, key_version,
712 space, offset, lsn);
713
714 if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
715
716 if (rc == -1) {
717 return DB_DECRYPTION_FAILED;
718 }
719
720 ib::fatal() << "Unable to decrypt data-block "
721 << " src: " << static_cast<const void*>(src)
722 << "srclen: "
723 << srclen << " buf: "
724 << static_cast<const void*>(dst) << "buflen: "
725 << dstlen << " return-code: " << rc
726 << " Can't continue!";
727 }
728
729 /* For compressed tables we do not store the FIL header because
730 the whole page is not stored to the disk. In compressed tables only
731 the FIL header + compressed (and now encrypted) payload alligned
732 to sector boundary is written. */
733 if (!page_compressed) {
734 /* Copy FIL trailer */
735 memcpy(tmp_frame + page_size.physical() - FIL_PAGE_DATA_END,
736 src_frame + page_size.physical() - FIL_PAGE_DATA_END,
737 FIL_PAGE_DATA_END);
738 }
739
740 srv_stats.pages_decrypted.inc();
741
742 return DB_SUCCESS; /* page was decrypted */
743 }
744
745 /**
746 Decrypt a page.
747 @param[in] space Tablespace
748 @param[in] tmp_frame Temporary buffer used for decrypting
749 @param[in,out] src_frame Page to decrypt
750 @return decrypted page, or original not encrypted page if decryption is
751 not needed.*/
752 UNIV_INTERN
753 byte*
fil_space_decrypt(const fil_space_t * space,byte * tmp_frame,byte * src_frame)754 fil_space_decrypt(
755 const fil_space_t* space,
756 byte* tmp_frame,
757 byte* src_frame)
758 {
759 const page_size_t page_size(space->flags);
760
761 ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
762 ut_ad(space->pending_io());
763
764 if (DB_SUCCESS != fil_space_decrypt(space->crypt_data, tmp_frame,
765 page_size, src_frame)) {
766 return NULL;
767 }
768
769 /* Copy the decrypted page back to page buffer, not
770 really any other options. */
771 memcpy(src_frame, tmp_frame, page_size.physical());
772
773 return src_frame;
774 }
775
776 /******************************************************************
777 Calculate post encryption checksum
778 @param[in] page_size page size
779 @param[in] dst_frame Block where checksum is calculated
780 @return page checksum
781 not needed. */
782 UNIV_INTERN
783 uint32_t
fil_crypt_calculate_checksum(const page_size_t & page_size,const byte * dst_frame)784 fil_crypt_calculate_checksum(
785 const page_size_t& page_size,
786 const byte* dst_frame)
787 {
788 /* For encrypted tables we use only crc32 and strict_crc32 */
789 return page_size.is_compressed()
790 ? page_zip_calc_checksum(dst_frame, page_size.physical(),
791 SRV_CHECKSUM_ALGORITHM_CRC32)
792 : buf_calc_page_crc32(dst_frame);
793 }
794
795 /***********************************************************************/
796
797 /** A copy of global key state */
798 struct key_state_t {
key_state_tkey_state_t799 key_state_t() : key_id(0), key_version(0),
800 rotate_key_age(srv_fil_crypt_rotate_key_age) {}
operator ==key_state_t801 bool operator==(const key_state_t& other) const {
802 return key_version == other.key_version &&
803 rotate_key_age == other.rotate_key_age;
804 }
805 uint key_id;
806 uint key_version;
807 uint rotate_key_age;
808 };
809
810 /***********************************************************************
811 Copy global key state
812 @param[in,out] new_state key state
813 @param[in] crypt_data crypt data */
814 static void
fil_crypt_get_key_state(key_state_t * new_state,fil_space_crypt_t * crypt_data)815 fil_crypt_get_key_state(
816 key_state_t* new_state,
817 fil_space_crypt_t* crypt_data)
818 {
819 if (srv_encrypt_tables) {
820 new_state->key_version = crypt_data->key_get_latest_version();
821 new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
822
823 ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
824 } else {
825 new_state->key_version = 0;
826 new_state->rotate_key_age = 0;
827 }
828 }
829
830 /***********************************************************************
831 Check if a key needs rotation given a key_state
832 @param[in] crypt_data Encryption information
833 @param[in] key_version Current key version
834 @param[in] latest_key_version Latest key version
835 @param[in] rotate_key_age when to rotate
836 @return true if key needs rotation, false if not */
837 static bool
fil_crypt_needs_rotation(const fil_space_crypt_t * crypt_data,uint key_version,uint latest_key_version,uint rotate_key_age)838 fil_crypt_needs_rotation(
839 const fil_space_crypt_t* crypt_data,
840 uint key_version,
841 uint latest_key_version,
842 uint rotate_key_age)
843 {
844 if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
845 return false;
846 }
847
848 if (key_version == 0 && latest_key_version != 0) {
849 /* this is rotation unencrypted => encrypted
850 * ignore rotate_key_age */
851 return true;
852 }
853
854 if (latest_key_version == 0 && key_version != 0) {
855 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
856 /* this is rotation encrypted => unencrypted */
857 return true;
858 }
859 return false;
860 }
861
862 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
863 && crypt_data->type == CRYPT_SCHEME_1
864 && !srv_encrypt_tables) {
865 /* This is rotation encrypted => unencrypted */
866 return true;
867 }
868
869 if (rotate_key_age == 0) {
870 return false;
871 }
872
873 /* this is rotation encrypted => encrypted,
874 * only reencrypt if key is sufficiently old */
875 if (key_version + rotate_key_age < latest_key_version) {
876 return true;
877 }
878
879 return false;
880 }
881
882 /** Read page 0 and possible crypt data from there.
883 @param[in,out] space Tablespace */
884 static inline
885 void
fil_crypt_read_crypt_data(fil_space_t * space)886 fil_crypt_read_crypt_data(fil_space_t* space)
887 {
888 if (space->crypt_data || space->size
889 || !fil_space_get_size(space->id)) {
890 /* The encryption metadata has already been read, or
891 the tablespace is not encrypted and the file has been
892 opened already, or the file cannot be accessed,
893 likely due to a concurrent DROP
894 (possibly as part of TRUNCATE or ALTER TABLE).
895 FIXME: The file can become unaccessible any time
896 after this check! We should really remove this
897 function and instead make crypt_data an integral
898 part of fil_space_t. */
899 return;
900 }
901
902 const page_size_t page_size(space->flags);
903 mtr_t mtr;
904 mtr.start();
905 if (buf_block_t* block = buf_page_get(page_id_t(space->id, 0),
906 page_size, RW_S_LATCH, &mtr)) {
907 mutex_enter(&fil_system.mutex);
908 if (!space->crypt_data) {
909 space->crypt_data = fil_space_read_crypt_data(
910 page_size, block->frame);
911 }
912 mutex_exit(&fil_system.mutex);
913 }
914 mtr.commit();
915 }
916
917 /** Start encrypting a space
918 @param[in,out] space Tablespace
919 @return true if a recheck of tablespace is needed by encryption thread. */
fil_crypt_start_encrypting_space(fil_space_t * space)920 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
921 {
922 bool recheck = false;
923
924 mutex_enter(&fil_crypt_threads_mutex);
925
926 fil_space_crypt_t *crypt_data = space->crypt_data;
927
928 /* If space is not encrypted and encryption is not enabled, then
929 do not continue encrypting the space. */
930 if (!crypt_data && !srv_encrypt_tables) {
931 mutex_exit(&fil_crypt_threads_mutex);
932 return false;
933 }
934
935 if (crypt_data != NULL || fil_crypt_start_converting) {
936 /* someone beat us to it */
937 if (fil_crypt_start_converting) {
938 recheck = true;
939 }
940
941 mutex_exit(&fil_crypt_threads_mutex);
942 return recheck;
943 }
944
945 /* NOTE: we need to write and flush page 0 before publishing
946 * the crypt data. This so that after restart there is no
947 * risk of finding encrypted pages without having
948 * crypt data in page 0 */
949
950 /* 1 - create crypt data */
951 crypt_data = fil_space_create_crypt_data(
952 FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
953
954 if (crypt_data == NULL) {
955 mutex_exit(&fil_crypt_threads_mutex);
956 return false;
957 }
958
959 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
960 crypt_data->min_key_version = 0; // all pages are unencrypted
961 crypt_data->rotate_state.start_time = time(0);
962 crypt_data->rotate_state.starting = true;
963 crypt_data->rotate_state.active_threads = 1;
964
965 mutex_enter(&fil_system.mutex);
966 space->crypt_data = crypt_data;
967 mutex_exit(&fil_system.mutex);
968
969 fil_crypt_start_converting = true;
970 mutex_exit(&fil_crypt_threads_mutex);
971
972 do
973 {
974 mtr_t mtr;
975 mtr.start();
976 mtr.set_named_space(space);
977
978 /* 2 - get page 0 */
979 dberr_t err = DB_SUCCESS;
980 buf_block_t* block = buf_page_get_gen(
981 page_id_t(space->id, 0), page_size_t(space->flags),
982 RW_X_LATCH, NULL, BUF_GET,
983 __FILE__, __LINE__,
984 &mtr, &err);
985
986
987 /* 3 - write crypt data to page 0 */
988 byte* frame = buf_block_get_frame(block);
989 crypt_data->type = CRYPT_SCHEME_1;
990 crypt_data->write_page0(space, frame, &mtr);
991
992 mtr.commit();
993
994 /* record lsn of update */
995 lsn_t end_lsn = mtr.commit_lsn();
996
997 /* 4 - sync tablespace before publishing crypt data */
998
999 bool success = false;
1000
1001 do {
1002 ulint n_pages = 0;
1003 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
1004 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
1005 } while (!success);
1006
1007 /* 5 - publish crypt data */
1008 mutex_enter(&fil_crypt_threads_mutex);
1009 mutex_enter(&crypt_data->mutex);
1010 crypt_data->type = CRYPT_SCHEME_1;
1011 ut_a(crypt_data->rotate_state.active_threads == 1);
1012 crypt_data->rotate_state.active_threads = 0;
1013 crypt_data->rotate_state.starting = false;
1014
1015 fil_crypt_start_converting = false;
1016 mutex_exit(&crypt_data->mutex);
1017 mutex_exit(&fil_crypt_threads_mutex);
1018
1019 return recheck;
1020 } while (0);
1021
1022 mutex_enter(&crypt_data->mutex);
1023 ut_a(crypt_data->rotate_state.active_threads == 1);
1024 crypt_data->rotate_state.active_threads = 0;
1025 mutex_exit(&crypt_data->mutex);
1026
1027 mutex_enter(&fil_crypt_threads_mutex);
1028 fil_crypt_start_converting = false;
1029 mutex_exit(&fil_crypt_threads_mutex);
1030
1031 return recheck;
1032 }
1033
1034 /** State of a rotation thread */
1035 struct rotate_thread_t {
rotate_thread_trotate_thread_t1036 explicit rotate_thread_t(uint no) {
1037 memset(this, 0, sizeof(* this));
1038 thread_no = no;
1039 first = true;
1040 estimated_max_iops = 20;
1041 }
1042
1043 uint thread_no;
1044 bool first; /*!< is position before first space */
1045 fil_space_t* space; /*!< current space or NULL */
1046 ulint offset; /*!< current offset */
1047 ulint batch; /*!< #pages to rotate */
1048 uint min_key_version_found;/*!< min key version found but not rotated */
1049 lsn_t end_lsn; /*!< max lsn when rotating this space */
1050
1051 uint estimated_max_iops; /*!< estimation of max iops */
1052 uint allocated_iops; /*!< allocated iops */
1053 ulint cnt_waited; /*!< #times waited during this slot */
1054 uintmax_t sum_waited_us; /*!< wait time during this slot */
1055
1056 fil_crypt_stat_t crypt_stat; // statistics
1057
1058 btr_scrub_t scrub_data; /* thread local data used by btr_scrub-functions
1059 * when iterating pages of tablespace */
1060
1061 /** @return whether this thread should terminate */
should_shutdownrotate_thread_t1062 bool should_shutdown() const {
1063 switch (srv_shutdown_state) {
1064 case SRV_SHUTDOWN_NONE:
1065 return thread_no >= srv_n_fil_crypt_threads;
1066 case SRV_SHUTDOWN_EXIT_THREADS:
1067 /* srv_init_abort() must have been invoked */
1068 case SRV_SHUTDOWN_CLEANUP:
1069 case SRV_SHUTDOWN_INITIATED:
1070 return true;
1071 case SRV_SHUTDOWN_FLUSH_PHASE:
1072 case SRV_SHUTDOWN_LAST_PHASE:
1073 break;
1074 }
1075 ut_ad(0);
1076 return true;
1077 }
1078 };
1079
1080 /** Avoid the removal of the tablespace from
1081 default_encrypt_list only when
1082 1) Another active encryption thread working on tablespace
1083 2) Eligible for tablespace key rotation
1084 3) Tablespace is in flushing phase
1085 @return true if tablespace should be removed from
1086 default encrypt */
fil_crypt_must_remove(const fil_space_t & space)1087 static bool fil_crypt_must_remove(const fil_space_t &space)
1088 {
1089 ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1090 fil_space_crypt_t *crypt_data = space.crypt_data;
1091 ut_ad(mutex_own(&fil_system.mutex));
1092 const ulong encrypt_tables= srv_encrypt_tables;
1093 if (!crypt_data)
1094 return !encrypt_tables;
1095 if (!crypt_data->is_key_found())
1096 return true;
1097
1098 mutex_enter(&crypt_data->mutex);
1099 const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1100 (!crypt_data->rotate_state.flushing &&
1101 !encrypt_tables == !!crypt_data->min_key_version &&
1102 !crypt_data->rotate_state.active_threads);
1103 mutex_exit(&crypt_data->mutex);
1104 return remove;
1105 }
1106
1107 /***********************************************************************
1108 Check if space needs rotation given a key_state
1109 @param[in,out] state Key rotation state
1110 @param[in,out] key_state Key state
1111 @param[in,out] recheck needs recheck ?
1112 @return true if space needs key rotation */
1113 static
1114 bool
fil_crypt_space_needs_rotation(rotate_thread_t * state,key_state_t * key_state,bool * recheck)1115 fil_crypt_space_needs_rotation(
1116 rotate_thread_t* state,
1117 key_state_t* key_state,
1118 bool* recheck)
1119 {
1120 fil_space_t* space = state->space;
1121
1122 /* Make sure that tablespace is normal tablespace */
1123 if (space->purpose != FIL_TYPE_TABLESPACE) {
1124 return false;
1125 }
1126
1127 ut_ad(space->referenced());
1128
1129 fil_space_crypt_t *crypt_data = space->crypt_data;
1130
1131 if (crypt_data == NULL) {
1132 /**
1133 * space has no crypt data
1134 * start encrypting it...
1135 */
1136 *recheck = fil_crypt_start_encrypting_space(space);
1137 crypt_data = space->crypt_data;
1138
1139 if (crypt_data == NULL) {
1140 return false;
1141 }
1142
1143 crypt_data->key_get_latest_version();
1144 }
1145
1146 /* If used key_id is not found from encryption plugin we can't
1147 continue to rotate the tablespace */
1148 if (!crypt_data->is_key_found()) {
1149 return false;
1150 }
1151
1152 mutex_enter(&crypt_data->mutex);
1153
1154 do {
1155 /* prevent threads from starting to rotate space */
1156 if (crypt_data->rotate_state.starting) {
1157 /* recheck this space later */
1158 *recheck = true;
1159 break;
1160 }
1161
1162 /* prevent threads from starting to rotate space */
1163 if (space->is_stopping()) {
1164 break;
1165 }
1166
1167 if (crypt_data->rotate_state.flushing) {
1168 break;
1169 }
1170
1171 /* No need to rotate space if encryption is disabled */
1172 if (crypt_data->not_encrypted()) {
1173 break;
1174 }
1175
1176 if (crypt_data->key_id != key_state->key_id) {
1177 key_state->key_id= crypt_data->key_id;
1178 fil_crypt_get_key_state(key_state, crypt_data);
1179 }
1180
1181 bool need_key_rotation = fil_crypt_needs_rotation(
1182 crypt_data,
1183 crypt_data->min_key_version,
1184 key_state->key_version,
1185 key_state->rotate_key_age);
1186
1187 crypt_data->rotate_state.scrubbing.is_active =
1188 btr_scrub_start_space(*space, &state->scrub_data);
1189
1190 time_t diff = time(0) - crypt_data->rotate_state.scrubbing.
1191 last_scrub_completed;
1192
1193 bool need_scrubbing =
1194 (srv_background_scrub_data_uncompressed ||
1195 srv_background_scrub_data_compressed) &&
1196 crypt_data->rotate_state.scrubbing.is_active
1197 && diff >= 0
1198 && ulint(diff) >= srv_background_scrub_data_interval;
1199
1200 if (need_key_rotation == false && need_scrubbing == false) {
1201 break;
1202 }
1203
1204 mutex_exit(&crypt_data->mutex);
1205
1206 return true;
1207 } while (0);
1208
1209 mutex_exit(&crypt_data->mutex);
1210
1211
1212 return false;
1213 }
1214
1215 /***********************************************************************
1216 Update global statistics with thread statistics
1217 @param[in,out] state key rotation statistics */
1218 static void
fil_crypt_update_total_stat(rotate_thread_t * state)1219 fil_crypt_update_total_stat(
1220 rotate_thread_t *state)
1221 {
1222 mutex_enter(&crypt_stat_mutex);
1223 crypt_stat.pages_read_from_cache +=
1224 state->crypt_stat.pages_read_from_cache;
1225 crypt_stat.pages_read_from_disk +=
1226 state->crypt_stat.pages_read_from_disk;
1227 crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1228 crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1229 // remote old estimate
1230 crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1231 // add new estimate
1232 crypt_stat.estimated_iops += state->estimated_max_iops;
1233 mutex_exit(&crypt_stat_mutex);
1234
1235 // make new estimate "current" estimate
1236 memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1237 // record our old (current) estimate
1238 state->crypt_stat.estimated_iops = state->estimated_max_iops;
1239 }
1240
1241 /***********************************************************************
1242 Allocate iops to thread from global setting,
1243 used before starting to rotate a space.
1244 @param[in,out] state Rotation state
1245 @return true if allocation succeeded, false if failed */
1246 static
1247 bool
fil_crypt_alloc_iops(rotate_thread_t * state)1248 fil_crypt_alloc_iops(
1249 rotate_thread_t *state)
1250 {
1251 ut_ad(state->allocated_iops == 0);
1252
1253 /* We have not yet selected the space to rotate, thus
1254 state might not contain space and we can't check
1255 its status yet. */
1256
1257 uint max_iops = state->estimated_max_iops;
1258 mutex_enter(&fil_crypt_threads_mutex);
1259
1260 if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1261 /* this can happen when user decreases srv_fil_crypt_iops */
1262 mutex_exit(&fil_crypt_threads_mutex);
1263 return false;
1264 }
1265
1266 uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1267
1268 if (alloc > max_iops) {
1269 alloc = max_iops;
1270 }
1271
1272 n_fil_crypt_iops_allocated += alloc;
1273 mutex_exit(&fil_crypt_threads_mutex);
1274
1275 state->allocated_iops = alloc;
1276
1277 return alloc > 0;
1278 }
1279
1280 /***********************************************************************
1281 Reallocate iops to thread,
1282 used when inside a space
1283 @param[in,out] state Rotation state */
1284 static
1285 void
fil_crypt_realloc_iops(rotate_thread_t * state)1286 fil_crypt_realloc_iops(
1287 rotate_thread_t *state)
1288 {
1289 ut_a(state->allocated_iops > 0);
1290
1291 if (10 * state->cnt_waited > state->batch) {
1292 /* if we waited more than 10% re-estimate max_iops */
1293 ulint avg_wait_time_us =
1294 ulint(state->sum_waited_us / state->cnt_waited);
1295
1296 if (avg_wait_time_us == 0) {
1297 avg_wait_time_us = 1; // prevent division by zero
1298 }
1299
1300 DBUG_PRINT("ib_crypt",
1301 ("thr_no: %u - update estimated_max_iops from %u to "
1302 ULINTPF ".",
1303 state->thread_no,
1304 state->estimated_max_iops,
1305 1000000 / avg_wait_time_us));
1306
1307 state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1308 state->cnt_waited = 0;
1309 state->sum_waited_us = 0;
1310 } else {
1311 DBUG_PRINT("ib_crypt",
1312 ("thr_no: %u only waited " ULINTPF
1313 "%% skip re-estimate.",
1314 state->thread_no,
1315 (100 * state->cnt_waited)
1316 / (state->batch ? state->batch : 1)));
1317 }
1318
1319 if (state->estimated_max_iops <= state->allocated_iops) {
1320 /* return extra iops */
1321 uint extra = state->allocated_iops - state->estimated_max_iops;
1322
1323 if (extra > 0) {
1324 mutex_enter(&fil_crypt_threads_mutex);
1325 if (n_fil_crypt_iops_allocated < extra) {
1326 /* unknown bug!
1327 * crash in debug
1328 * keep n_fil_crypt_iops_allocated unchanged
1329 * in release */
1330 ut_ad(0);
1331 extra = 0;
1332 }
1333 n_fil_crypt_iops_allocated -= extra;
1334 state->allocated_iops -= extra;
1335
1336 if (state->allocated_iops == 0) {
1337 /* no matter how slow io system seems to be
1338 * never decrease allocated_iops to 0... */
1339 state->allocated_iops ++;
1340 n_fil_crypt_iops_allocated ++;
1341 }
1342
1343 os_event_set(fil_crypt_threads_event);
1344 mutex_exit(&fil_crypt_threads_mutex);
1345 }
1346 } else {
1347 /* see if there are more to get */
1348 mutex_enter(&fil_crypt_threads_mutex);
1349 if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1350 /* there are extra iops free */
1351 uint extra = srv_n_fil_crypt_iops -
1352 n_fil_crypt_iops_allocated;
1353 if (state->allocated_iops + extra >
1354 state->estimated_max_iops) {
1355 /* but don't alloc more than our max */
1356 extra = state->estimated_max_iops -
1357 state->allocated_iops;
1358 }
1359 n_fil_crypt_iops_allocated += extra;
1360 state->allocated_iops += extra;
1361
1362 DBUG_PRINT("ib_crypt",
1363 ("thr_no: %u increased iops from %u to %u.",
1364 state->thread_no,
1365 state->allocated_iops - extra,
1366 state->allocated_iops));
1367
1368 }
1369 mutex_exit(&fil_crypt_threads_mutex);
1370 }
1371
1372 fil_crypt_update_total_stat(state);
1373 }
1374
1375 /***********************************************************************
1376 Return allocated iops to global
1377 @param[in,out] state Rotation state */
1378 static
1379 void
fil_crypt_return_iops(rotate_thread_t * state)1380 fil_crypt_return_iops(
1381 rotate_thread_t *state)
1382 {
1383 if (state->allocated_iops > 0) {
1384 uint iops = state->allocated_iops;
1385 mutex_enter(&fil_crypt_threads_mutex);
1386 if (n_fil_crypt_iops_allocated < iops) {
1387 /* unknown bug!
1388 * crash in debug
1389 * keep n_fil_crypt_iops_allocated unchanged
1390 * in release */
1391 ut_ad(0);
1392 iops = 0;
1393 }
1394
1395 n_fil_crypt_iops_allocated -= iops;
1396 state->allocated_iops = 0;
1397 os_event_set(fil_crypt_threads_event);
1398 mutex_exit(&fil_crypt_threads_mutex);
1399 }
1400
1401 fil_crypt_update_total_stat(state);
1402 }
1403
fil_crypt_must_default_encrypt()1404 bool fil_crypt_must_default_encrypt()
1405 {
1406 return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1407 }
1408
1409 /** Return the next tablespace from default_encrypt_tables.
1410 @param space previous tablespace (NULL to start from the start)
1411 @param recheck whether the removal condition needs to be rechecked after
1412 the encryption parameters were changed
1413 @param encrypt expected state of innodb_encrypt_tables
1414 @return the next tablespace to process (n_pending_ops incremented)
1415 @retval NULL if this was the last */
default_encrypt_next(fil_space_t * space,bool recheck,bool encrypt)1416 inline fil_space_t *fil_system_t::default_encrypt_next(
1417 fil_space_t *space, bool recheck, bool encrypt)
1418 {
1419 ut_ad(mutex_own(&mutex));
1420
1421 sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1422 space && space->is_in_default_encrypt
1423 ? space
1424 : default_encrypt_tables.begin();
1425 const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1426 default_encrypt_tables.end();
1427
1428 if (space)
1429 {
1430 const bool released= !space->release();
1431
1432 if (space->is_in_default_encrypt)
1433 {
1434 while (++it != end &&
1435 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1436
1437 /* If one of the encryption threads already started
1438 the encryption of the table then don't remove the
1439 unencrypted spaces from default encrypt list.
1440
1441 If there is a change in innodb_encrypt_tables variables
1442 value then don't remove the last processed tablespace
1443 from the default encrypt list. */
1444 if (released && !recheck && fil_crypt_must_remove(*space))
1445 {
1446 ut_a(!default_encrypt_tables.empty());
1447 default_encrypt_tables.remove(*space);
1448 space->is_in_default_encrypt= false;
1449 }
1450 }
1451 }
1452 else while (it != end &&
1453 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1454 {
1455 /* Find the next suitable default encrypt table if
1456 beginning of default_encrypt_tables list has been scheduled
1457 to be deleted */
1458 it++;
1459 }
1460
1461 while (it != end)
1462 {
1463 space= &*it;
1464 if (space->acquire())
1465 return space;
1466 while (++it != end && (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1467 }
1468
1469 return NULL;
1470 }
1471
1472 /** Return the next tablespace.
1473 @param space previous tablespace (NULL to start from the beginning)
1474 @param recheck whether the removal condition needs to be rechecked after
1475 the encryption parameters were changed
1476 @param encrypt expected state of innodb_encrypt_tables
1477 @return pointer to the next tablespace (with n_pending_ops incremented)
1478 @retval NULL if this was the last */
fil_space_next(fil_space_t * space,bool recheck,bool encrypt)1479 static fil_space_t *fil_space_next(fil_space_t *space, bool recheck,
1480 bool encrypt)
1481 {
1482 mutex_enter(&fil_system.mutex);
1483
1484 if (fil_crypt_must_default_encrypt())
1485 space= fil_system.default_encrypt_next(space, recheck, encrypt);
1486 else if (!space)
1487 {
1488 space= UT_LIST_GET_FIRST(fil_system.space_list);
1489 /* We can trust that space is not NULL because at least the
1490 system tablespace is always present and loaded first. */
1491 if (!space->acquire())
1492 goto next;
1493 }
1494 else
1495 {
1496 /* Move on to the next fil_space_t */
1497 space->release();
1498 next:
1499 space= UT_LIST_GET_NEXT(space_list, space);
1500
1501 /* Skip abnormal tablespaces or those that are being created by
1502 fil_ibd_create(), or being dropped. */
1503 while (space &&
1504 (UT_LIST_GET_LEN(space->chain) == 0 ||
1505 space->is_stopping() || space->purpose != FIL_TYPE_TABLESPACE))
1506 space= UT_LIST_GET_NEXT(space_list, space);
1507
1508 if (space && !space->acquire())
1509 goto next;
1510 }
1511
1512 mutex_exit(&fil_system.mutex);
1513 return space;
1514 }
1515
1516 /** Search for a space needing rotation
1517 @param[in,out] key_state Key state
1518 @param[in,out] state Rotation state
1519 @param[in,out] recheck recheck of the tablespace is needed or
1520 still encryption thread does write page 0 */
fil_crypt_find_space_to_rotate(key_state_t * key_state,rotate_thread_t * state,bool * recheck)1521 static bool fil_crypt_find_space_to_rotate(
1522 key_state_t* key_state,
1523 rotate_thread_t* state,
1524 bool* recheck)
1525 {
1526 /* we need iops to start rotating */
1527 while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1528 if (state->space && state->space->is_stopping()) {
1529 state->space->release();
1530 state->space = NULL;
1531 }
1532
1533 os_event_reset(fil_crypt_threads_event);
1534 os_event_wait_time(fil_crypt_threads_event, 100000);
1535 }
1536
1537 if (state->should_shutdown()) {
1538 if (state->space) {
1539 state->space->release();
1540 state->space = NULL;
1541 }
1542 return false;
1543 }
1544
1545 if (state->first) {
1546 state->first = false;
1547 if (state->space) {
1548 state->space->release();
1549 }
1550 state->space = NULL;
1551 }
1552
1553 state->space = fil_space_next(state->space, *recheck,
1554 key_state->key_version != 0);
1555
1556 while (!state->should_shutdown() && state->space) {
1557 /* If there is no crypt data and we have not yet read
1558 page 0 for this tablespace, we need to read it before
1559 we can continue. */
1560 if (!state->space->crypt_data) {
1561 fil_crypt_read_crypt_data(state->space);
1562 }
1563
1564 if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1565 ut_ad(key_state->key_id);
1566 /* init state->min_key_version_found before
1567 * starting on a space */
1568 state->min_key_version_found = key_state->key_version;
1569 return true;
1570 }
1571
1572 state->space = fil_space_next(state->space, *recheck,
1573 key_state->key_version != 0);
1574 }
1575
1576 if (state->space) {
1577 state->space->release();
1578 state->space = NULL;
1579 }
1580
1581 /* no work to do; release our allocation of I/O capacity */
1582 fil_crypt_return_iops(state);
1583
1584 return false;
1585
1586 }
1587
1588 /***********************************************************************
1589 Start rotating a space
1590 @param[in] key_state Key state
1591 @param[in,out] state Rotation state */
1592 static
1593 void
fil_crypt_start_rotate_space(const key_state_t * key_state,rotate_thread_t * state)1594 fil_crypt_start_rotate_space(
1595 const key_state_t* key_state,
1596 rotate_thread_t* state)
1597 {
1598 fil_space_crypt_t *crypt_data = state->space->crypt_data;
1599
1600 ut_ad(crypt_data);
1601 mutex_enter(&crypt_data->mutex);
1602 ut_ad(key_state->key_id == crypt_data->key_id);
1603
1604 if (crypt_data->rotate_state.active_threads == 0) {
1605 /* only first thread needs to init */
1606 crypt_data->rotate_state.next_offset = 1; // skip page 0
1607 /* no need to rotate beyond current max
1608 * if space extends, it will be encrypted with newer version */
1609 /* FIXME: max_offset could be removed and instead
1610 space->size consulted.*/
1611 crypt_data->rotate_state.max_offset = state->space->size;
1612 crypt_data->rotate_state.end_lsn = 0;
1613 crypt_data->rotate_state.min_key_version_found =
1614 key_state->key_version;
1615
1616 crypt_data->rotate_state.start_time = time(0);
1617
1618 if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1619 crypt_data->is_encrypted() &&
1620 key_state->key_version != 0) {
1621 /* this is rotation unencrypted => encrypted */
1622 crypt_data->type = CRYPT_SCHEME_1;
1623 }
1624 }
1625
1626 /* count active threads in space */
1627 crypt_data->rotate_state.active_threads++;
1628
1629 /* Initialize thread local state */
1630 state->end_lsn = crypt_data->rotate_state.end_lsn;
1631 state->min_key_version_found =
1632 crypt_data->rotate_state.min_key_version_found;
1633
1634 mutex_exit(&crypt_data->mutex);
1635 }
1636
1637 /***********************************************************************
1638 Search for batch of pages needing rotation
1639 @param[in] key_state Key state
1640 @param[in,out] state Rotation state
1641 @return true if page needing key rotation found, false if not found */
1642 static
1643 bool
fil_crypt_find_page_to_rotate(const key_state_t * key_state,rotate_thread_t * state)1644 fil_crypt_find_page_to_rotate(
1645 const key_state_t* key_state,
1646 rotate_thread_t* state)
1647 {
1648 ulint batch = srv_alloc_time * state->allocated_iops;
1649 fil_space_t* space = state->space;
1650
1651 ut_ad(!space || space->referenced());
1652
1653 /* If space is marked to be dropped stop rotation. */
1654 if (!space || space->is_stopping()) {
1655 return false;
1656 }
1657
1658 fil_space_crypt_t *crypt_data = space->crypt_data;
1659
1660 mutex_enter(&crypt_data->mutex);
1661 ut_ad(key_state->key_id == crypt_data->key_id);
1662
1663 bool found = crypt_data->rotate_state.max_offset >=
1664 crypt_data->rotate_state.next_offset;
1665
1666 if (found) {
1667 state->offset = crypt_data->rotate_state.next_offset;
1668 ulint remaining = crypt_data->rotate_state.max_offset -
1669 crypt_data->rotate_state.next_offset;
1670
1671 if (batch <= remaining) {
1672 state->batch = batch;
1673 } else {
1674 state->batch = remaining;
1675 }
1676 }
1677
1678 crypt_data->rotate_state.next_offset += batch;
1679 mutex_exit(&crypt_data->mutex);
1680 return found;
1681 }
1682
1683 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1684 fil_crypt_get_page_throttle_func(state, offset, mtr, \
1685 sleeptime_ms, __FILE__, __LINE__)
1686
1687 /***********************************************************************
1688 Get a page and compute sleep time
1689 @param[in,out] state Rotation state
1690 @param[in] offset Page offset
1691 @param[in,out] mtr Minitransaction
1692 @param[out] sleeptime_ms Sleep time
1693 @param[in] file File where called
1694 @param[in] line Line where called
1695 @return page or NULL*/
1696 static
1697 buf_block_t*
fil_crypt_get_page_throttle_func(rotate_thread_t * state,ulint offset,mtr_t * mtr,ulint * sleeptime_ms,const char * file,unsigned line)1698 fil_crypt_get_page_throttle_func(
1699 rotate_thread_t* state,
1700 ulint offset,
1701 mtr_t* mtr,
1702 ulint* sleeptime_ms,
1703 const char* file,
1704 unsigned line)
1705 {
1706 fil_space_t* space = state->space;
1707 const page_size_t page_size = page_size_t(space->flags);
1708 const page_id_t page_id(space->id, offset);
1709 ut_ad(space->referenced());
1710
1711 /* Before reading from tablespace we need to make sure that
1712 the tablespace is not about to be dropped. */
1713 if (space->is_stopping()) {
1714 return NULL;
1715 }
1716
1717 dberr_t err = DB_SUCCESS;
1718 buf_block_t* block = buf_page_get_gen(page_id, page_size, RW_X_LATCH,
1719 NULL,
1720 BUF_PEEK_IF_IN_POOL, file, line,
1721 mtr, &err);
1722 if (block != NULL) {
1723 /* page was in buffer pool */
1724 state->crypt_stat.pages_read_from_cache++;
1725 return block;
1726 }
1727
1728 if (space->is_stopping()) {
1729 return NULL;
1730 }
1731
1732 state->crypt_stat.pages_read_from_disk++;
1733
1734 const ulonglong start = my_interval_timer();
1735 block = buf_page_get_gen(page_id, page_size,
1736 RW_X_LATCH,
1737 NULL, BUF_GET_POSSIBLY_FREED,
1738 file, line, mtr, &err);
1739 const ulonglong end = my_interval_timer();
1740
1741 state->cnt_waited++;
1742
1743 if (end > start) {
1744 state->sum_waited_us += (end - start) / 1000;
1745 }
1746
1747 /* average page load */
1748 ulint add_sleeptime_ms = 0;
1749 ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1750 ulint alloc_wait_us = 1000000 / state->allocated_iops;
1751
1752 if (avg_wait_time_us < alloc_wait_us) {
1753 /* we reading faster than we allocated */
1754 add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1755 } else {
1756 /* if page load time is longer than we want, skip sleeping */
1757 }
1758
1759 *sleeptime_ms += add_sleeptime_ms;
1760
1761 return block;
1762 }
1763
1764
1765 /***********************************************************************
1766 Get block and allocation status
1767
1768 note: innodb locks fil_space_latch and then block when allocating page
1769 but locks block and then fil_space_latch when freeing page.
1770
1771 @param[in,out] state Rotation state
1772 @param[in] offset Page offset
1773 @param[in,out] mtr Minitransaction
1774 @param[out] allocation_status Allocation status
1775 @param[out] sleeptime_ms Sleep time
1776 @return block or NULL
1777 */
1778 static
1779 buf_block_t*
btr_scrub_get_block_and_allocation_status(rotate_thread_t * state,ulint offset,mtr_t * mtr,btr_scrub_page_allocation_status_t * allocation_status,ulint * sleeptime_ms)1780 btr_scrub_get_block_and_allocation_status(
1781 rotate_thread_t* state,
1782 ulint offset,
1783 mtr_t* mtr,
1784 btr_scrub_page_allocation_status_t *allocation_status,
1785 ulint* sleeptime_ms)
1786 {
1787 mtr_t local_mtr;
1788 buf_block_t *block = NULL;
1789 fil_space_t* space = state->space;
1790
1791 ut_ad(space->referenced());
1792
1793 mtr_start(&local_mtr);
1794
1795 *allocation_status = fseg_page_is_free(space, (uint32_t)offset) ?
1796 BTR_SCRUB_PAGE_FREE :
1797 BTR_SCRUB_PAGE_ALLOCATED;
1798
1799 if (*allocation_status == BTR_SCRUB_PAGE_FREE) {
1800 /* this is easy case, we lock fil_space_latch first and
1801 then block */
1802 block = fil_crypt_get_page_throttle(state,
1803 offset, mtr,
1804 sleeptime_ms);
1805 mtr_commit(&local_mtr);
1806 } else {
1807 /* page is allocated according to xdes */
1808
1809 /* release fil_space_latch *before* fetching block */
1810 mtr_commit(&local_mtr);
1811
1812 /* NOTE: when we have locked dict_index_get_lock(),
1813 * it's safe to release fil_space_latch and then fetch block
1814 * as dict_index_get_lock() is needed to make tree modifications
1815 * such as free-ing a page
1816 */
1817
1818 block = fil_crypt_get_page_throttle(state,
1819 offset, mtr,
1820 sleeptime_ms);
1821 }
1822
1823 return block;
1824 }
1825
1826
1827 /***********************************************************************
1828 Rotate one page
1829 @param[in,out] key_state Key state
1830 @param[in,out] state Rotation state */
1831 static
1832 void
fil_crypt_rotate_page(const key_state_t * key_state,rotate_thread_t * state)1833 fil_crypt_rotate_page(
1834 const key_state_t* key_state,
1835 rotate_thread_t* state)
1836 {
1837 fil_space_t*space = state->space;
1838 ulint space_id = space->id;
1839 ulint offset = state->offset;
1840 ulint sleeptime_ms = 0;
1841 fil_space_crypt_t *crypt_data = space->crypt_data;
1842
1843 ut_ad(space->referenced());
1844 ut_ad(offset > 0);
1845
1846 /* In fil_crypt_thread where key rotation is done we have
1847 acquired space and checked that this space is not yet
1848 marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
1849 Check here also to give DROP TABLE or similar a change. */
1850 if (space->is_stopping()) {
1851 return;
1852 }
1853
1854 if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
1855 /* don't encrypt this as it contains address to dblwr buffer */
1856 return;
1857 }
1858
1859 mtr_t mtr;
1860 mtr.start();
1861 if (buf_block_t* block = fil_crypt_get_page_throttle(state,
1862 offset, &mtr,
1863 &sleeptime_ms)) {
1864 bool modified = false;
1865 int needs_scrubbing = BTR_SCRUB_SKIP_PAGE;
1866 lsn_t block_lsn = block->page.newest_modification;
1867 byte* frame = buf_block_get_frame(block);
1868 uint kv = mach_read_from_4(frame+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
1869
1870 if (space->is_stopping()) {
1871 /* The tablespace is closing (in DROP TABLE or
1872 TRUNCATE TABLE or similar): avoid further access */
1873 } else if (!kv && !*reinterpret_cast<uint16_t*>
1874 (&frame[FIL_PAGE_TYPE])) {
1875 /* It looks like this page is not
1876 allocated. Because key rotation is accessing
1877 pages in a pattern that is unlike the normal
1878 B-tree and undo log access pattern, we cannot
1879 invoke fseg_page_is_free() here, because that
1880 could result in a deadlock. If we invoked
1881 fseg_page_is_free() and released the
1882 tablespace latch before acquiring block->lock,
1883 then the fseg_page_is_free() information
1884 could be stale already. */
1885
1886 /* If the data file was originally created
1887 before MariaDB 10.0 or MySQL 5.6, some
1888 allocated data pages could carry 0 in
1889 FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
1890 pages will be updated in
1891 buf_flush_init_for_writing() when the page
1892 is modified the next time.
1893
1894 Also, when the doublewrite buffer pages are
1895 allocated on bootstrap in a non-debug build,
1896 some dummy pages will be allocated, with 0 in
1897 the FIL_PAGE_TYPE. Those pages should be
1898 skipped from key rotation forever. */
1899 } else if (fil_crypt_needs_rotation(
1900 crypt_data,
1901 kv,
1902 key_state->key_version,
1903 key_state->rotate_key_age)) {
1904
1905 mtr.set_named_space(space);
1906 modified = true;
1907
1908 /* force rotation by dummy updating page */
1909 mlog_write_ulint(frame + FIL_PAGE_SPACE_ID,
1910 space_id, MLOG_4BYTES, &mtr);
1911
1912 /* statistics */
1913 state->crypt_stat.pages_modified++;
1914 } else {
1915 if (crypt_data->is_encrypted()) {
1916 if (kv < state->min_key_version_found) {
1917 state->min_key_version_found = kv;
1918 }
1919 }
1920
1921 needs_scrubbing = btr_page_needs_scrubbing(
1922 &state->scrub_data, block,
1923 BTR_SCRUB_PAGE_ALLOCATION_UNKNOWN);
1924 }
1925
1926 mtr.commit();
1927 lsn_t end_lsn = mtr.commit_lsn();
1928
1929 if (needs_scrubbing == BTR_SCRUB_PAGE) {
1930 mtr.start();
1931 /*
1932 * refetch page and allocation status
1933 */
1934 btr_scrub_page_allocation_status_t allocated;
1935
1936 block = btr_scrub_get_block_and_allocation_status(
1937 state, offset, &mtr,
1938 &allocated,
1939 &sleeptime_ms);
1940
1941 if (block) {
1942 mtr.set_named_space(space);
1943
1944 /* get required table/index and index-locks */
1945 needs_scrubbing = btr_scrub_recheck_page(
1946 &state->scrub_data, block, allocated, &mtr);
1947
1948 if (needs_scrubbing == BTR_SCRUB_PAGE) {
1949 /* we need to refetch it once more now that we have
1950 * index locked */
1951 block = btr_scrub_get_block_and_allocation_status(
1952 state, offset, &mtr,
1953 &allocated,
1954 &sleeptime_ms);
1955
1956 needs_scrubbing = btr_scrub_page(&state->scrub_data,
1957 block, allocated,
1958 &mtr);
1959 }
1960
1961 /* NOTE: mtr is committed inside btr_scrub_recheck_page()
1962 * and/or btr_scrub_page. This is to make sure that
1963 * locks & pages are latched in corrected order,
1964 * the mtr is in some circumstances restarted.
1965 * (mtr_commit() + mtr_start())
1966 */
1967 }
1968 }
1969
1970 if (needs_scrubbing != BTR_SCRUB_PAGE) {
1971 /* if page didn't need scrubbing it might be that cleanups
1972 are needed. do those outside of any mtr to prevent deadlocks.
1973
1974 the information what kinds of cleanups that are needed are
1975 encoded inside the needs_scrubbing, but this is opaque to
1976 this function (except the value BTR_SCRUB_PAGE) */
1977 btr_scrub_skip_page(&state->scrub_data, needs_scrubbing);
1978 }
1979
1980 if (needs_scrubbing == BTR_SCRUB_TURNED_OFF) {
1981 /* if we just detected that scrubbing was turned off
1982 * update global state to reflect this */
1983 ut_ad(crypt_data);
1984 mutex_enter(&crypt_data->mutex);
1985 crypt_data->rotate_state.scrubbing.is_active = false;
1986 mutex_exit(&crypt_data->mutex);
1987 }
1988
1989 if (modified) {
1990 /* if we modified page, we take lsn from mtr */
1991 ut_a(end_lsn > state->end_lsn);
1992 ut_a(end_lsn > block_lsn);
1993 state->end_lsn = end_lsn;
1994 } else {
1995 /* if we did not modify page, check for max lsn */
1996 if (block_lsn > state->end_lsn) {
1997 state->end_lsn = block_lsn;
1998 }
1999 }
2000 } else {
2001 /* If block read failed mtr memo and log should be empty. */
2002 ut_ad(!mtr.has_modifications());
2003 ut_ad(!mtr.is_dirty());
2004 ut_ad(mtr.get_memo()->size() == 0);
2005 ut_ad(mtr.get_log()->size() == 0);
2006 mtr.commit();
2007 }
2008
2009 if (sleeptime_ms) {
2010 os_event_reset(fil_crypt_throttle_sleep_event);
2011 os_event_wait_time(fil_crypt_throttle_sleep_event,
2012 1000 * sleeptime_ms);
2013 }
2014 }
2015
2016 /***********************************************************************
2017 Rotate a batch of pages
2018 @param[in,out] key_state Key state
2019 @param[in,out] state Rotation state */
2020 static
2021 void
fil_crypt_rotate_pages(const key_state_t * key_state,rotate_thread_t * state)2022 fil_crypt_rotate_pages(
2023 const key_state_t* key_state,
2024 rotate_thread_t* state)
2025 {
2026 ulint space = state->space->id;
2027 ulint end = std::min(state->offset + state->batch,
2028 state->space->free_limit);
2029
2030 ut_ad(state->space->referenced());
2031
2032 for (; state->offset < end; state->offset++) {
2033
2034 /* we can't rotate pages in dblwr buffer as
2035 * it's not possible to read those due to lots of asserts
2036 * in buffer pool.
2037 *
2038 * However since these are only (short-lived) copies of
2039 * real pages, they will be updated anyway when the
2040 * real page is updated
2041 */
2042 if (space == TRX_SYS_SPACE &&
2043 buf_dblwr_page_inside(state->offset)) {
2044 continue;
2045 }
2046
2047 /* If space is marked as stopping, stop rotating
2048 pages. */
2049 if (state->space->is_stopping()) {
2050 break;
2051 }
2052
2053 fil_crypt_rotate_page(key_state, state);
2054 }
2055 }
2056
2057 /***********************************************************************
2058 Flush rotated pages and then update page 0
2059
2060 @param[in,out] state rotation state */
2061 static
2062 void
fil_crypt_flush_space(rotate_thread_t * state)2063 fil_crypt_flush_space(
2064 rotate_thread_t* state)
2065 {
2066 fil_space_t* space = state->space;
2067 fil_space_crypt_t *crypt_data = space->crypt_data;
2068
2069 ut_ad(space->referenced());
2070
2071 /* flush tablespace pages so that there are no pages left with old key */
2072 lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2073
2074 if (end_lsn > 0 && !space->is_stopping()) {
2075 bool success = false;
2076 ulint n_pages = 0;
2077 ulint sum_pages = 0;
2078 const ulonglong start = my_interval_timer();
2079
2080 do {
2081 success = buf_flush_lists(ULINT_MAX, end_lsn, &n_pages);
2082 buf_flush_wait_batch_end(NULL, BUF_FLUSH_LIST);
2083 sum_pages += n_pages;
2084 } while (!success && !space->is_stopping());
2085
2086 const ulonglong end = my_interval_timer();
2087
2088 if (sum_pages && end > start) {
2089 state->cnt_waited += sum_pages;
2090 state->sum_waited_us += (end - start) / 1000;
2091
2092 /* statistics */
2093 state->crypt_stat.pages_flushed += sum_pages;
2094 }
2095 }
2096
2097 if (crypt_data->min_key_version == 0) {
2098 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2099 }
2100
2101 if (space->is_stopping()) {
2102 return;
2103 }
2104
2105 /* update page 0 */
2106 mtr_t mtr;
2107 mtr.start();
2108
2109 dberr_t err;
2110
2111 if (buf_block_t* block = buf_page_get_gen(
2112 page_id_t(space->id, 0), page_size_t(space->flags),
2113 RW_X_LATCH, NULL, BUF_GET,
2114 __FILE__, __LINE__, &mtr, &err)) {
2115 mtr.set_named_space(space);
2116 crypt_data->write_page0(space, block->frame, &mtr);
2117 }
2118
2119 mtr.commit();
2120 }
2121
2122 /***********************************************************************
2123 Complete rotating a space
2124 @param[in,out] state Rotation state */
fil_crypt_complete_rotate_space(rotate_thread_t * state)2125 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2126 {
2127 fil_space_crypt_t *crypt_data = state->space->crypt_data;
2128
2129 ut_ad(crypt_data);
2130 ut_ad(state->space->referenced());
2131
2132 /* Space might already be dropped */
2133 if (!state->space->is_stopping()) {
2134 mutex_enter(&crypt_data->mutex);
2135
2136 /**
2137 * Update crypt data state with state from thread
2138 */
2139 if (state->min_key_version_found <
2140 crypt_data->rotate_state.min_key_version_found) {
2141 crypt_data->rotate_state.min_key_version_found =
2142 state->min_key_version_found;
2143 }
2144
2145 if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2146 crypt_data->rotate_state.end_lsn = state->end_lsn;
2147 }
2148
2149 ut_a(crypt_data->rotate_state.active_threads > 0);
2150 crypt_data->rotate_state.active_threads--;
2151 bool last = crypt_data->rotate_state.active_threads == 0;
2152
2153 /**
2154 * check if space is fully done
2155 * this as when threads shutdown, it could be that we "complete"
2156 * iterating before we have scanned the full space.
2157 */
2158 bool done = crypt_data->rotate_state.next_offset >=
2159 crypt_data->rotate_state.max_offset;
2160
2161 /**
2162 * we should flush space if we're last thread AND
2163 * the iteration is done
2164 */
2165 bool should_flush = last && done;
2166
2167 if (should_flush) {
2168 /* we're the last active thread */
2169 crypt_data->rotate_state.flushing = true;
2170 crypt_data->min_key_version =
2171 crypt_data->rotate_state.min_key_version_found;
2172 }
2173
2174 /* inform scrubbing */
2175 crypt_data->rotate_state.scrubbing.is_active = false;
2176 mutex_exit(&crypt_data->mutex);
2177
2178 /* all threads must call btr_scrub_complete_space wo/ mutex held */
2179 if (state->scrub_data.scrubbing) {
2180 btr_scrub_complete_space(&state->scrub_data);
2181 if (should_flush) {
2182 /* only last thread updates last_scrub_completed */
2183 ut_ad(crypt_data);
2184 mutex_enter(&crypt_data->mutex);
2185 crypt_data->rotate_state.scrubbing.
2186 last_scrub_completed = time(0);
2187 mutex_exit(&crypt_data->mutex);
2188 }
2189 }
2190
2191 if (should_flush) {
2192 fil_crypt_flush_space(state);
2193
2194 mutex_enter(&crypt_data->mutex);
2195 crypt_data->rotate_state.flushing = false;
2196 mutex_exit(&crypt_data->mutex);
2197 }
2198 } else {
2199 mutex_enter(&crypt_data->mutex);
2200 ut_a(crypt_data->rotate_state.active_threads > 0);
2201 crypt_data->rotate_state.active_threads--;
2202 mutex_exit(&crypt_data->mutex);
2203 }
2204 }
2205
2206 /*********************************************************************//**
2207 A thread which monitors global key state and rotates tablespaces accordingly
2208 @return a dummy parameter */
2209 extern "C" UNIV_INTERN
2210 os_thread_ret_t
DECLARE_THREAD(fil_crypt_thread)2211 DECLARE_THREAD(fil_crypt_thread)(void*)
2212 {
2213 mutex_enter(&fil_crypt_threads_mutex);
2214 uint thread_no = srv_n_fil_crypt_threads_started;
2215 srv_n_fil_crypt_threads_started++;
2216 os_event_set(fil_crypt_event); /* signal that we started */
2217 mutex_exit(&fil_crypt_threads_mutex);
2218
2219 /* state of this thread */
2220 rotate_thread_t thr(thread_no);
2221
2222 /* if we find a space that is starting, skip over it and recheck it later */
2223 bool recheck = false;
2224
2225 while (!thr.should_shutdown()) {
2226
2227 key_state_t new_state;
2228
2229 time_t wait_start = time(0);
2230
2231 while (!thr.should_shutdown()) {
2232
2233 /* wait for key state changes
2234 * i.e either new key version of change or
2235 * new rotate_key_age */
2236 os_event_reset(fil_crypt_threads_event);
2237
2238 if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2239 break;
2240 }
2241
2242 if (recheck) {
2243 /* check recheck here, after sleep, so
2244 * that we don't busy loop while when one thread is starting
2245 * a space*/
2246 break;
2247 }
2248
2249 time_t waited = time(0) - wait_start;
2250
2251 /* Break if we have waited the background scrub
2252 internal and background scrubbing is enabled */
2253 if (waited >= 0
2254 && ulint(waited) >= srv_background_scrub_data_check_interval
2255 && (srv_background_scrub_data_uncompressed
2256 || srv_background_scrub_data_compressed)) {
2257 break;
2258 }
2259 }
2260
2261 recheck = false;
2262 thr.first = true; // restart from first tablespace
2263
2264 /* iterate all spaces searching for those needing rotation */
2265 while (!thr.should_shutdown() &&
2266 fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2267
2268 /* we found a space to rotate */
2269 fil_crypt_start_rotate_space(&new_state, &thr);
2270
2271 /* iterate all pages (cooperativly with other threads) */
2272 while (!thr.should_shutdown() &&
2273 fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2274
2275 if (!thr.space->is_stopping()) {
2276 /* rotate a (set) of pages */
2277 fil_crypt_rotate_pages(&new_state, &thr);
2278 }
2279
2280 /* If space is marked as stopping, release
2281 space and stop rotation. */
2282 if (thr.space->is_stopping()) {
2283 fil_crypt_complete_rotate_space(&thr);
2284 thr.space->release();
2285 thr.space = NULL;
2286 break;
2287 }
2288
2289 /* realloc iops */
2290 fil_crypt_realloc_iops(&thr);
2291 }
2292
2293 /* complete rotation */
2294 if (thr.space) {
2295 fil_crypt_complete_rotate_space(&thr);
2296 }
2297
2298 /* force key state refresh */
2299 new_state.key_id = 0;
2300
2301 /* return iops */
2302 fil_crypt_return_iops(&thr);
2303 }
2304 }
2305
2306 /* return iops if shutting down */
2307 fil_crypt_return_iops(&thr);
2308
2309 /* release current space if shutting down */
2310 if (thr.space) {
2311 thr.space->release();
2312 thr.space = NULL;
2313 }
2314
2315 mutex_enter(&fil_crypt_threads_mutex);
2316 srv_n_fil_crypt_threads_started--;
2317 os_event_set(fil_crypt_event); /* signal that we stopped */
2318 mutex_exit(&fil_crypt_threads_mutex);
2319
2320 /* We count the number of threads in os_thread_exit(). A created
2321 thread should always use that to exit and not use return() to exit. */
2322
2323 os_thread_exit();
2324
2325 OS_THREAD_DUMMY_RETURN;
2326 }
2327
2328 /*********************************************************************
2329 Adjust thread count for key rotation
2330 @param[in] enw_cnt Number of threads to be used */
2331 UNIV_INTERN
2332 void
fil_crypt_set_thread_cnt(const uint new_cnt)2333 fil_crypt_set_thread_cnt(
2334 const uint new_cnt)
2335 {
2336 if (!fil_crypt_threads_inited) {
2337 fil_crypt_threads_init();
2338 }
2339
2340 mutex_enter(&fil_crypt_threads_mutex);
2341
2342 if (new_cnt > srv_n_fil_crypt_threads) {
2343 uint add = new_cnt - srv_n_fil_crypt_threads;
2344 srv_n_fil_crypt_threads = new_cnt;
2345 for (uint i = 0; i < add; i++) {
2346 os_thread_id_t rotation_thread_id;
2347 os_thread_create(fil_crypt_thread, NULL, &rotation_thread_id);
2348 ib::info() << "Creating #"
2349 << i+1 << " encryption thread id "
2350 << os_thread_pf(rotation_thread_id)
2351 << " total threads " << new_cnt << ".";
2352 }
2353 } else if (new_cnt < srv_n_fil_crypt_threads) {
2354 srv_n_fil_crypt_threads = new_cnt;
2355 os_event_set(fil_crypt_threads_event);
2356 }
2357
2358 mutex_exit(&fil_crypt_threads_mutex);
2359
2360 while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2361 os_event_reset(fil_crypt_event);
2362 os_event_wait_time(fil_crypt_event, 100000);
2363 }
2364
2365 /* Send a message to encryption threads that there could be
2366 something to do. */
2367 if (srv_n_fil_crypt_threads) {
2368 os_event_set(fil_crypt_threads_event);
2369 }
2370 }
2371
2372 /** Initialize the tablespace default_encrypt_tables
2373 if innodb_encryption_rotate_key_age=0. */
fil_crypt_default_encrypt_tables_fill()2374 static void fil_crypt_default_encrypt_tables_fill()
2375 {
2376 ut_ad(mutex_own(&fil_system.mutex));
2377
2378 for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2379 space != NULL;
2380 space = UT_LIST_GET_NEXT(space_list, space)) {
2381 if (space->purpose != FIL_TYPE_TABLESPACE
2382 || space->is_in_default_encrypt
2383 || UT_LIST_GET_LEN(space->chain) == 0
2384 || !space->acquire()) {
2385 continue;
2386 }
2387
2388 /* Ensure that crypt_data has been initialized. */
2389 if (!space->size) {
2390 ut_d(const fil_space_t* s=)
2391 fil_system.read_page0(space->id);
2392 ut_ad(!s || s == space);
2393 if (!space->size) {
2394 /* Page 0 was not loaded.
2395 Skip this tablespace. */
2396 goto next;
2397 }
2398 }
2399
2400 /* Skip ENCRYPTION!=DEFAULT tablespaces. */
2401 if (space->crypt_data
2402 && !space->crypt_data->is_default_encryption()) {
2403 goto next;
2404 }
2405
2406 if (srv_encrypt_tables) {
2407 /* Skip encrypted tablespaces if
2408 innodb_encrypt_tables!=OFF */
2409 if (space->crypt_data
2410 && space->crypt_data->min_key_version) {
2411 goto next;
2412 }
2413 } else {
2414 /* Skip unencrypted tablespaces if
2415 innodb_encrypt_tables=OFF */
2416 if (!space->crypt_data
2417 || !space->crypt_data->min_key_version) {
2418 goto next;
2419 }
2420 }
2421
2422 fil_system.default_encrypt_tables.push_back(*space);
2423 space->is_in_default_encrypt = true;
2424 next:
2425 space->release();
2426 }
2427 }
2428
2429 /*********************************************************************
2430 Adjust max key age
2431 @param[in] val New max key age */
2432 UNIV_INTERN
2433 void
fil_crypt_set_rotate_key_age(uint val)2434 fil_crypt_set_rotate_key_age(
2435 uint val)
2436 {
2437 mutex_enter(&fil_system.mutex);
2438 srv_fil_crypt_rotate_key_age = val;
2439 if (val == 0) {
2440 fil_crypt_default_encrypt_tables_fill();
2441 }
2442 mutex_exit(&fil_system.mutex);
2443 os_event_set(fil_crypt_threads_event);
2444 }
2445
2446 /*********************************************************************
2447 Adjust rotation iops
2448 @param[in] val New max roation iops */
2449 UNIV_INTERN
2450 void
fil_crypt_set_rotation_iops(uint val)2451 fil_crypt_set_rotation_iops(
2452 uint val)
2453 {
2454 srv_n_fil_crypt_iops = val;
2455 os_event_set(fil_crypt_threads_event);
2456 }
2457
2458 /*********************************************************************
2459 Adjust encrypt tables
2460 @param[in] val New setting for innodb-encrypt-tables */
2461 UNIV_INTERN
2462 void
fil_crypt_set_encrypt_tables(uint val)2463 fil_crypt_set_encrypt_tables(
2464 uint val)
2465 {
2466 if (!fil_crypt_threads_inited) {
2467 return;
2468 }
2469
2470 mutex_enter(&fil_system.mutex);
2471
2472 srv_encrypt_tables = val;
2473
2474 if (fil_crypt_must_default_encrypt()) {
2475 fil_crypt_default_encrypt_tables_fill();
2476 }
2477
2478 mutex_exit(&fil_system.mutex);
2479
2480 os_event_set(fil_crypt_threads_event);
2481 }
2482
2483 /*********************************************************************
2484 Init threads for key rotation */
2485 UNIV_INTERN
2486 void
fil_crypt_threads_init()2487 fil_crypt_threads_init()
2488 {
2489 if (!fil_crypt_threads_inited) {
2490 fil_crypt_event = os_event_create(0);
2491 fil_crypt_threads_event = os_event_create(0);
2492 mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2493 &fil_crypt_threads_mutex);
2494
2495 uint cnt = srv_n_fil_crypt_threads;
2496 srv_n_fil_crypt_threads = 0;
2497 fil_crypt_threads_inited = true;
2498 fil_crypt_set_thread_cnt(cnt);
2499 }
2500 }
2501
2502 /*********************************************************************
2503 Clean up key rotation threads resources */
2504 UNIV_INTERN
2505 void
fil_crypt_threads_cleanup()2506 fil_crypt_threads_cleanup()
2507 {
2508 if (!fil_crypt_threads_inited) {
2509 return;
2510 }
2511 ut_a(!srv_n_fil_crypt_threads_started);
2512 os_event_destroy(fil_crypt_event);
2513 os_event_destroy(fil_crypt_threads_event);
2514 mutex_free(&fil_crypt_threads_mutex);
2515 fil_crypt_threads_inited = false;
2516 }
2517
2518 /*********************************************************************
2519 Wait for crypt threads to stop accessing space
2520 @param[in] space Tablespace */
2521 UNIV_INTERN
2522 void
fil_space_crypt_close_tablespace(const fil_space_t * space)2523 fil_space_crypt_close_tablespace(
2524 const fil_space_t* space)
2525 {
2526 fil_space_crypt_t* crypt_data = space->crypt_data;
2527
2528 if (!crypt_data || srv_n_fil_crypt_threads == 0
2529 || !fil_crypt_threads_inited) {
2530 return;
2531 }
2532
2533 mutex_enter(&fil_crypt_threads_mutex);
2534
2535 time_t start = time(0);
2536 time_t last = start;
2537
2538 mutex_enter(&crypt_data->mutex);
2539 mutex_exit(&fil_crypt_threads_mutex);
2540
2541 ulint cnt = crypt_data->rotate_state.active_threads;
2542 bool flushing = crypt_data->rotate_state.flushing;
2543
2544 while (cnt > 0 || flushing) {
2545 mutex_exit(&crypt_data->mutex);
2546 /* release dict mutex so that scrub threads can release their
2547 * table references */
2548 dict_mutex_exit_for_mysql();
2549
2550 /* wakeup throttle (all) sleepers */
2551 os_event_set(fil_crypt_throttle_sleep_event);
2552 os_event_set(fil_crypt_threads_event);
2553
2554 os_thread_sleep(20000);
2555 dict_mutex_enter_for_mysql();
2556 mutex_enter(&crypt_data->mutex);
2557 cnt = crypt_data->rotate_state.active_threads;
2558 flushing = crypt_data->rotate_state.flushing;
2559
2560 time_t now = time(0);
2561
2562 if (now >= last + 30) {
2563 ib::warn() << "Waited "
2564 << now - start
2565 << " seconds to drop space: "
2566 << space->name << " ("
2567 << space->id << ") active threads "
2568 << cnt << "flushing="
2569 << flushing << ".";
2570 last = now;
2571 }
2572 }
2573
2574 mutex_exit(&crypt_data->mutex);
2575 }
2576
2577 /*********************************************************************
2578 Get crypt status for a space (used by information_schema)
2579 @param[in] space Tablespace
2580 @param[out] status Crypt status */
2581 UNIV_INTERN
2582 void
fil_space_crypt_get_status(const fil_space_t * space,struct fil_space_crypt_status_t * status)2583 fil_space_crypt_get_status(
2584 const fil_space_t* space,
2585 struct fil_space_crypt_status_t* status)
2586 {
2587 memset(status, 0, sizeof(*status));
2588
2589 ut_ad(space->referenced());
2590
2591 /* If there is no crypt data and we have not yet read
2592 page 0 for this tablespace, we need to read it before
2593 we can continue. */
2594 if (!space->crypt_data) {
2595 fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2596 }
2597
2598 status->space = ULINT_UNDEFINED;
2599
2600 if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2601 status->space = space->id;
2602 mutex_enter(&crypt_data->mutex);
2603 status->scheme = crypt_data->type;
2604 status->keyserver_requests = crypt_data->keyserver_requests;
2605 status->min_key_version = crypt_data->min_key_version;
2606 status->key_id = crypt_data->key_id;
2607
2608 if (crypt_data->rotate_state.active_threads > 0 ||
2609 crypt_data->rotate_state.flushing) {
2610 status->rotating = true;
2611 status->flushing =
2612 crypt_data->rotate_state.flushing;
2613 status->rotate_next_page_number =
2614 crypt_data->rotate_state.next_offset;
2615 status->rotate_max_page_number =
2616 crypt_data->rotate_state.max_offset;
2617 }
2618
2619 mutex_exit(&crypt_data->mutex);
2620
2621 if (srv_encrypt_tables || crypt_data->min_key_version) {
2622 status->current_key_version =
2623 fil_crypt_get_latest_key_version(crypt_data);
2624 }
2625 }
2626 }
2627
2628 /*********************************************************************
2629 Return crypt statistics
2630 @param[out] stat Crypt statistics */
2631 UNIV_INTERN
2632 void
fil_crypt_total_stat(fil_crypt_stat_t * stat)2633 fil_crypt_total_stat(
2634 fil_crypt_stat_t *stat)
2635 {
2636 mutex_enter(&crypt_stat_mutex);
2637 *stat = crypt_stat;
2638 mutex_exit(&crypt_stat_mutex);
2639 }
2640
2641 /*********************************************************************
2642 Get scrub status for a space (used by information_schema)
2643
2644 @param[in] space Tablespace
2645 @param[out] status Scrub status */
2646 UNIV_INTERN
2647 void
fil_space_get_scrub_status(const fil_space_t * space,struct fil_space_scrub_status_t * status)2648 fil_space_get_scrub_status(
2649 const fil_space_t* space,
2650 struct fil_space_scrub_status_t* status)
2651 {
2652 memset(status, 0, sizeof(*status));
2653
2654 ut_ad(space->referenced());
2655 fil_space_crypt_t* crypt_data = space->crypt_data;
2656
2657 status->space = space->id;
2658
2659 if (crypt_data != NULL) {
2660 status->compressed = FSP_FLAGS_GET_ZIP_SSIZE(space->flags) > 0;
2661 mutex_enter(&crypt_data->mutex);
2662 status->last_scrub_completed =
2663 crypt_data->rotate_state.scrubbing.last_scrub_completed;
2664 if (crypt_data->rotate_state.active_threads > 0 &&
2665 crypt_data->rotate_state.scrubbing.is_active) {
2666 status->scrubbing = true;
2667 status->current_scrub_started =
2668 crypt_data->rotate_state.start_time;
2669 status->current_scrub_active_threads =
2670 crypt_data->rotate_state.active_threads;
2671 status->current_scrub_page_number =
2672 crypt_data->rotate_state.next_offset;
2673 status->current_scrub_max_page_number =
2674 crypt_data->rotate_state.max_offset;
2675 }
2676
2677 mutex_exit(&crypt_data->mutex);
2678 }
2679 }
2680 #endif /* UNIV_INNOCHECKSUM */
2681
2682 /**
2683 Verify that post encryption checksum match calculated checksum.
2684 This function should be called only if tablespace contains crypt_data
2685 metadata (this is strong indication that tablespace is encrypted).
2686 Function also verifies that traditional checksum does not match
2687 calculated checksum as if it does page could be valid unencrypted,
2688 encrypted, or corrupted.
2689
2690 @param[in,out] page page frame (checksum is temporarily modified)
2691 @param[in] page_size page size
2692 @return whether the encrypted page is OK */
2693 bool
fil_space_verify_crypt_checksum(const byte * page,const page_size_t & page_size)2694 fil_space_verify_crypt_checksum(const byte* page, const page_size_t& page_size)
2695 {
2696 ut_ad(mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION));
2697
2698 /* Compressed and encrypted pages do not have checksum. Assume not
2699 corrupted. Page verification happens after decompression in
2700 buf_page_io_complete() using buf_page_is_corrupted(). */
2701 if (mach_read_from_2(page + FIL_PAGE_TYPE)
2702 == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2703 return true;
2704 }
2705
2706 /* Read stored post encryption checksum. */
2707 const ib_uint32_t checksum = mach_read_from_4(
2708 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2709
2710 /* If stored checksum matches one of the calculated checksums
2711 page is not corrupted. */
2712
2713 switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2714 case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2715 if (page_size.is_compressed()) {
2716 return checksum == page_zip_calc_checksum(
2717 page, page_size.physical(),
2718 SRV_CHECKSUM_ALGORITHM_CRC32)
2719 #ifdef INNODB_BUG_ENDIAN_CRC32
2720 || checksum == page_zip_calc_checksum(
2721 page, page_size.physical(),
2722 SRV_CHECKSUM_ALGORITHM_CRC32, true)
2723 #endif
2724 ;
2725 }
2726
2727 return checksum == buf_calc_page_crc32(page)
2728 #ifdef INNODB_BUG_ENDIAN_CRC32
2729 || checksum == buf_calc_page_crc32(page, true)
2730 #endif
2731 ;
2732 case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2733 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2734 due to MDEV-12114, fil_crypt_calculate_checksum()
2735 is only using CRC32 for the encrypted pages.
2736 Due to this, we must treat "strict_none" as "none". */
2737 case SRV_CHECKSUM_ALGORITHM_NONE:
2738 return true;
2739 case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2740 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2741 due to MDEV-12114, fil_crypt_calculate_checksum()
2742 is only using CRC32 for the encrypted pages.
2743 Due to this, we must treat "strict_innodb" as "innodb". */
2744 case SRV_CHECKSUM_ALGORITHM_INNODB:
2745 case SRV_CHECKSUM_ALGORITHM_CRC32:
2746 if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2747 return true;
2748 }
2749 if (page_size.is_compressed()) {
2750 return checksum == page_zip_calc_checksum(
2751 page, page_size.physical(),
2752 SRV_CHECKSUM_ALGORITHM_CRC32)
2753 #ifdef INNODB_BUG_ENDIAN_CRC32
2754 || checksum == page_zip_calc_checksum(
2755 page, page_size.physical(),
2756 SRV_CHECKSUM_ALGORITHM_CRC32, true)
2757 #endif
2758 || checksum == page_zip_calc_checksum(
2759 page, page_size.physical(),
2760 SRV_CHECKSUM_ALGORITHM_INNODB);
2761 }
2762
2763 return checksum == buf_calc_page_crc32(page)
2764 #ifdef INNODB_BUG_ENDIAN_CRC32
2765 || checksum == buf_calc_page_crc32(page, true)
2766 #endif
2767 || checksum == buf_calc_page_new_checksum(page);
2768 }
2769
2770 ut_ad(!"unhandled innodb_checksum_algorithm");
2771 return false;
2772 }
2773