1 /*****************************************************************************
2 Copyright (C) 2013, 2015, Google Inc. All Rights Reserved.
3 Copyright (c) 2014, 2021, MariaDB Corporation.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
16
17 *****************************************************************************/
18 /**************************************************//**
19 @file fil0crypt.cc
20 Innodb file space encrypt/decrypt
21
22 Created Jonas Oreland Google
23 Modified Jan Lindström jan.lindstrom@mariadb.com
24 *******************************************************/
25
26 #include "fil0crypt.h"
27 #include "mtr0types.h"
28 #include "mach0data.h"
29 #include "page0zip.h"
30 #include "buf0checksum.h"
31 #ifdef UNIV_INNOCHECKSUM
32 # include "buf0buf.h"
33 #else
34 #include "buf0dblwr.h"
35 #include "srv0srv.h"
36 #include "srv0start.h"
37 #include "mtr0mtr.h"
38 #include "mtr0log.h"
39 #include "ut0ut.h"
40 #include "fsp0fsp.h"
41 #include "fil0pagecompress.h"
42 #include <my_crypt.h>
43
44 static bool fil_crypt_threads_inited = false;
45
46 /** Is encryption enabled/disabled */
47 UNIV_INTERN ulong srv_encrypt_tables = 0;
48
49 /** No of key rotation threads requested */
50 UNIV_INTERN uint srv_n_fil_crypt_threads = 0;
51
52 /** No of key rotation threads started */
53 UNIV_INTERN uint srv_n_fil_crypt_threads_started = 0;
54
55 /** At this age or older a space/page will be rotated */
56 UNIV_INTERN uint srv_fil_crypt_rotate_key_age;
57
58 /** Whether the encryption plugin does key rotation */
59 static bool srv_encrypt_rotate;
60
61 /** Event to signal FROM the key rotation threads. */
62 static os_event_t fil_crypt_event;
63
64 /** Event to signal TO the key rotation threads. */
65 UNIV_INTERN os_event_t fil_crypt_threads_event;
66
67 /** Event for waking up threads throttle. */
68 static os_event_t fil_crypt_throttle_sleep_event;
69
70 /** Mutex for key rotation threads. */
71 UNIV_INTERN ib_mutex_t fil_crypt_threads_mutex;
72
73 /** Variable ensuring only 1 thread at time does initial conversion */
74 static bool fil_crypt_start_converting = false;
75
76 /** Variables for throttling */
77 UNIV_INTERN uint srv_n_fil_crypt_iops = 100; // 10ms per iop
78 static uint srv_alloc_time = 3; // allocate iops for 3s at a time
79 static uint n_fil_crypt_iops_allocated = 0;
80
81 #define DEBUG_KEYROTATION_THROTTLING 0
82
83 /** Statistics variables */
84 static fil_crypt_stat_t crypt_stat;
85 static ib_mutex_t crypt_stat_mutex;
86
87 /***********************************************************************
88 Check if a key needs rotation given a key_state
89 @param[in] crypt_data Encryption information
90 @param[in] key_version Current key version
91 @param[in] latest_key_version Latest key version
92 @param[in] rotate_key_age when to rotate
93 @return true if key needs rotation, false if not */
94 static bool
95 fil_crypt_needs_rotation(
96 const fil_space_crypt_t* crypt_data,
97 uint key_version,
98 uint latest_key_version,
99 uint rotate_key_age)
100 MY_ATTRIBUTE((warn_unused_result));
101
102 /*********************************************************************
103 Init space crypt */
104 UNIV_INTERN
105 void
fil_space_crypt_init()106 fil_space_crypt_init()
107 {
108 fil_crypt_throttle_sleep_event = os_event_create(0);
109
110 mutex_create(LATCH_ID_FIL_CRYPT_STAT_MUTEX, &crypt_stat_mutex);
111 memset(&crypt_stat, 0, sizeof(crypt_stat));
112 }
113
114 /*********************************************************************
115 Cleanup space crypt */
116 UNIV_INTERN
117 void
fil_space_crypt_cleanup()118 fil_space_crypt_cleanup()
119 {
120 os_event_destroy(fil_crypt_throttle_sleep_event);
121 mutex_free(&crypt_stat_mutex);
122 }
123
124 /**
125 Get latest key version from encryption plugin.
126 @return key version or ENCRYPTION_KEY_VERSION_INVALID */
127 uint
key_get_latest_version(void)128 fil_space_crypt_t::key_get_latest_version(void)
129 {
130 uint key_version = key_found;
131
132 if (is_key_found()) {
133 key_version = encryption_key_get_latest_version(key_id);
134 /* InnoDB does dirty read of srv_fil_crypt_rotate_key_age.
135 It doesn't matter because srv_encrypt_rotate
136 can be set to true only once */
137 if (!srv_encrypt_rotate
138 && key_version > srv_fil_crypt_rotate_key_age) {
139 srv_encrypt_rotate = true;
140 }
141
142 srv_stats.n_key_requests.inc();
143 key_found = key_version;
144 }
145
146 return key_version;
147 }
148
149 /******************************************************************
150 Get the latest(key-version), waking the encrypt thread, if needed
151 @param[in,out] crypt_data Crypt data */
152 static inline
153 uint
fil_crypt_get_latest_key_version(fil_space_crypt_t * crypt_data)154 fil_crypt_get_latest_key_version(
155 fil_space_crypt_t* crypt_data)
156 {
157 ut_ad(crypt_data != NULL);
158
159 uint key_version = crypt_data->key_get_latest_version();
160
161 if (crypt_data->is_key_found()) {
162
163 if (fil_crypt_needs_rotation(
164 crypt_data,
165 crypt_data->min_key_version,
166 key_version,
167 srv_fil_crypt_rotate_key_age)) {
168 /* Below event seen as NULL-pointer at startup
169 when new database was created and we create a
170 checkpoint. Only seen when debugging. */
171 if (fil_crypt_threads_inited) {
172 os_event_set(fil_crypt_threads_event);
173 }
174 }
175 }
176
177 return key_version;
178 }
179
180 /******************************************************************
181 Mutex helper for crypt_data->scheme */
182 void
crypt_data_scheme_locker(st_encryption_scheme * scheme,int exit)183 crypt_data_scheme_locker(
184 /*=====================*/
185 st_encryption_scheme* scheme,
186 int exit)
187 {
188 fil_space_crypt_t* crypt_data =
189 static_cast<fil_space_crypt_t*>(scheme);
190
191 if (exit) {
192 mutex_exit(&crypt_data->mutex);
193 } else {
194 mutex_enter(&crypt_data->mutex);
195 }
196 }
197
198 /******************************************************************
199 Create a fil_space_crypt_t object
200 @param[in] type CRYPT_SCHEME_UNENCRYPTE or
201 CRYPT_SCHEME_1
202 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
203 FIL_ENCRYPTION_ON or
204 FIL_ENCRYPTION_OFF
205 @param[in] min_key_version key_version or 0
206 @param[in] key_id Used key id
207 @return crypt object */
208 static
209 fil_space_crypt_t*
fil_space_create_crypt_data(uint type,fil_encryption_t encrypt_mode,uint min_key_version,uint key_id)210 fil_space_create_crypt_data(
211 uint type,
212 fil_encryption_t encrypt_mode,
213 uint min_key_version,
214 uint key_id)
215 {
216 fil_space_crypt_t* crypt_data = NULL;
217 if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
218 crypt_data = new(buf)
219 fil_space_crypt_t(
220 type,
221 min_key_version,
222 key_id,
223 encrypt_mode);
224 }
225
226 return crypt_data;
227 }
228
229 /******************************************************************
230 Create a fil_space_crypt_t object
231 @param[in] encrypt_mode FIL_ENCRYPTION_DEFAULT or
232 FIL_ENCRYPTION_ON or
233 FIL_ENCRYPTION_OFF
234
235 @param[in] key_id Encryption key id
236 @return crypt object */
237 UNIV_INTERN
238 fil_space_crypt_t*
fil_space_create_crypt_data(fil_encryption_t encrypt_mode,uint key_id)239 fil_space_create_crypt_data(
240 fil_encryption_t encrypt_mode,
241 uint key_id)
242 {
243 return (fil_space_create_crypt_data(0, encrypt_mode, 0, key_id));
244 }
245
246 /******************************************************************
247 Merge fil_space_crypt_t object
248 @param[in,out] dst Destination cryp data
249 @param[in] src Source crypt data */
250 UNIV_INTERN
251 void
fil_space_merge_crypt_data(fil_space_crypt_t * dst,const fil_space_crypt_t * src)252 fil_space_merge_crypt_data(
253 fil_space_crypt_t* dst,
254 const fil_space_crypt_t* src)
255 {
256 mutex_enter(&dst->mutex);
257
258 /* validate that they are mergeable */
259 ut_a(src->type == CRYPT_SCHEME_UNENCRYPTED ||
260 src->type == CRYPT_SCHEME_1);
261
262 ut_a(dst->type == CRYPT_SCHEME_UNENCRYPTED ||
263 dst->type == CRYPT_SCHEME_1);
264
265 dst->encryption = src->encryption;
266 dst->type = src->type;
267 dst->min_key_version = src->min_key_version;
268 dst->keyserver_requests += src->keyserver_requests;
269
270 mutex_exit(&dst->mutex);
271 }
272
273 /** Initialize encryption parameters from a tablespace header page.
274 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
275 @param[in] page first page of the tablespace
276 @return crypt data from page 0
277 @retval NULL if not present or not valid */
fil_space_read_crypt_data(ulint zip_size,const byte * page)278 fil_space_crypt_t* fil_space_read_crypt_data(ulint zip_size, const byte* page)
279 {
280 const ulint offset = FSP_HEADER_OFFSET
281 + fsp_header_get_encryption_offset(zip_size);
282
283 if (memcmp(page + offset, CRYPT_MAGIC, MAGIC_SZ) != 0) {
284 /* Crypt data is not stored. */
285 return NULL;
286 }
287
288 uint8_t type = mach_read_from_1(page + offset + MAGIC_SZ + 0);
289 uint8_t iv_length = mach_read_from_1(page + offset + MAGIC_SZ + 1);
290 fil_space_crypt_t* crypt_data;
291
292 if (!(type == CRYPT_SCHEME_UNENCRYPTED ||
293 type == CRYPT_SCHEME_1)
294 || iv_length != sizeof crypt_data->iv) {
295 ib::error() << "Found non sensible crypt scheme: "
296 << type << "," << iv_length
297 << " for space: "
298 << page_get_space_id(page);
299 return NULL;
300 }
301
302 uint min_key_version = mach_read_from_4
303 (page + offset + MAGIC_SZ + 2 + iv_length);
304
305 uint key_id = mach_read_from_4
306 (page + offset + MAGIC_SZ + 2 + iv_length + 4);
307
308 fil_encryption_t encryption = (fil_encryption_t)mach_read_from_1(
309 page + offset + MAGIC_SZ + 2 + iv_length + 8);
310
311 crypt_data = fil_space_create_crypt_data(encryption, key_id);
312 /* We need to overwrite these as above function will initialize
313 members */
314 crypt_data->type = type;
315 crypt_data->min_key_version = min_key_version;
316 memcpy(crypt_data->iv, page + offset + MAGIC_SZ + 2, iv_length);
317
318 return crypt_data;
319 }
320
321 /******************************************************************
322 Free a crypt data object
323 @param[in,out] crypt_data crypt data to be freed */
324 UNIV_INTERN
325 void
fil_space_destroy_crypt_data(fil_space_crypt_t ** crypt_data)326 fil_space_destroy_crypt_data(
327 fil_space_crypt_t **crypt_data)
328 {
329 if (crypt_data != NULL && (*crypt_data) != NULL) {
330 fil_space_crypt_t* c;
331 if (UNIV_LIKELY(fil_crypt_threads_inited)) {
332 mutex_enter(&fil_crypt_threads_mutex);
333 c = *crypt_data;
334 *crypt_data = NULL;
335 mutex_exit(&fil_crypt_threads_mutex);
336 } else {
337 ut_ad(srv_read_only_mode || !srv_was_started);
338 c = *crypt_data;
339 *crypt_data = NULL;
340 }
341 if (c) {
342 c->~fil_space_crypt_t();
343 ut_free(c);
344 }
345 }
346 }
347
348 /** Amend encryption information from redo log.
349 @param[in] space tablespace
350 @param[in] data encryption metadata */
fil_crypt_parse(fil_space_t * space,const byte * data)351 void fil_crypt_parse(fil_space_t* space, const byte* data)
352 {
353 ut_ad(data[1] == MY_AES_BLOCK_SIZE);
354 if (void* buf = ut_zalloc_nokey(sizeof(fil_space_crypt_t))) {
355 fil_space_crypt_t* crypt_data = new(buf)
356 fil_space_crypt_t(
357 data[0],
358 mach_read_from_4(&data[2 + MY_AES_BLOCK_SIZE]),
359 mach_read_from_4(&data[6 + MY_AES_BLOCK_SIZE]),
360 static_cast<fil_encryption_t>
361 (data[10 + MY_AES_BLOCK_SIZE]));
362 memcpy(crypt_data->iv, data + 2, MY_AES_BLOCK_SIZE);
363 mutex_enter(&fil_system.mutex);
364 if (space->crypt_data) {
365 fil_space_merge_crypt_data(space->crypt_data,
366 crypt_data);
367 fil_space_destroy_crypt_data(&crypt_data);
368 crypt_data = space->crypt_data;
369 } else {
370 space->crypt_data = crypt_data;
371 }
372 mutex_exit(&fil_system.mutex);
373 }
374 }
375
376 /** Fill crypt data information to the give page.
377 It should be called during ibd file creation.
378 @param[in] flags tablespace flags
379 @param[in,out] page first page of the tablespace */
380 void
fill_page0(ulint flags,byte * page)381 fil_space_crypt_t::fill_page0(
382 ulint flags,
383 byte* page)
384 {
385 const uint len = sizeof(iv);
386 const ulint offset = FSP_HEADER_OFFSET
387 + fsp_header_get_encryption_offset(
388 fil_space_t::zip_size(flags));
389
390 memcpy(page + offset, CRYPT_MAGIC, MAGIC_SZ);
391 mach_write_to_1(page + offset + MAGIC_SZ, type);
392 mach_write_to_1(page + offset + MAGIC_SZ + 1, len);
393 memcpy(page + offset + MAGIC_SZ + 2, &iv, len);
394
395 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len,
396 min_key_version);
397 mach_write_to_4(page + offset + MAGIC_SZ + 2 + len + 4,
398 key_id);
399 mach_write_to_1(page + offset + MAGIC_SZ + 2 + len + 8,
400 encryption);
401 }
402
403 /** Write encryption metadata to the first page.
404 @param[in,out] block first page of the tablespace
405 @param[in,out] mtr mini-transaction */
write_page0(buf_block_t * block,mtr_t * mtr)406 void fil_space_crypt_t::write_page0(buf_block_t* block, mtr_t* mtr)
407 {
408 const ulint offset = FSP_HEADER_OFFSET
409 + fsp_header_get_encryption_offset(block->zip_size());
410 byte* b = block->frame + offset;
411
412 mtr->memcpy<mtr_t::MAYBE_NOP>(*block, b, CRYPT_MAGIC, MAGIC_SZ);
413
414 b += MAGIC_SZ;
415 byte* const start = b;
416 *b++ = static_cast<byte>(type);
417 compile_time_assert(sizeof iv == MY_AES_BLOCK_SIZE);
418 compile_time_assert(sizeof iv == CRYPT_SCHEME_1_IV_LEN);
419 *b++ = sizeof iv;
420 memcpy(b, iv, sizeof iv);
421 b += sizeof iv;
422 mach_write_to_4(b, min_key_version);
423 b += 4;
424 mach_write_to_4(b, key_id);
425 b += 4;
426 *b++ = byte(encryption);
427 ut_ad(b - start == 11 + MY_AES_BLOCK_SIZE);
428 /* We must log also any unchanged bytes, because recovery will
429 invoke fil_crypt_parse() based on this log record. */
430 mtr->memcpy(*block, offset + MAGIC_SZ, b - start);
431 }
432
433 /** Encrypt a buffer for non full checksum.
434 @param[in,out] crypt_data Crypt data
435 @param[in] space space_id
436 @param[in] offset Page offset
437 @param[in] lsn Log sequence number
438 @param[in] src_frame Page to encrypt
439 @param[in] zip_size ROW_FORMAT=COMPRESSED
440 page size, or 0
441 @param[in,out] dst_frame Output buffer
442 @return encrypted buffer or NULL */
fil_encrypt_buf_for_non_full_checksum(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,ulint zip_size,byte * dst_frame)443 static byte* fil_encrypt_buf_for_non_full_checksum(
444 fil_space_crypt_t* crypt_data,
445 ulint space,
446 ulint offset,
447 lsn_t lsn,
448 const byte* src_frame,
449 ulint zip_size,
450 byte* dst_frame)
451 {
452 uint size = uint(zip_size ? zip_size : srv_page_size);
453 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
454 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
455 ut_ad(!ut_align_offset(src_frame, 8));
456 ut_ad(!ut_align_offset(dst_frame, 8));
457
458 const bool page_compressed = fil_page_get_type(src_frame)
459 == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED;
460 uint header_len = FIL_PAGE_DATA;
461
462 if (page_compressed) {
463 header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
464 }
465
466 /* FIL page header is not encrypted */
467 memcpy(dst_frame, src_frame, header_len);
468 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
469 key_version);
470
471 /* Calculate the start offset in a page */
472 uint unencrypted_bytes = header_len + FIL_PAGE_DATA_END;
473 uint srclen = size - unencrypted_bytes;
474 const byte* src = src_frame + header_len;
475 byte* dst = dst_frame + header_len;
476 uint32 dstlen = 0;
477 ib_uint32_t checksum = 0;
478
479 if (page_compressed) {
480 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
481 }
482
483 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
484 crypt_data, key_version,
485 (uint32)space, (uint32)offset, lsn);
486 ut_a(rc == MY_AES_OK);
487 ut_a(dstlen == srclen);
488
489 /* For compressed tables we do not store the FIL header because
490 the whole page is not stored to the disk. In compressed tables only
491 the FIL header + compressed (and now encrypted) payload alligned
492 to sector boundary is written. */
493 if (!page_compressed) {
494 /* FIL page trailer is also not encrypted */
495 static_assert(FIL_PAGE_DATA_END == 8, "alignment");
496 memcpy_aligned<8>(dst_frame + size - FIL_PAGE_DATA_END,
497 src_frame + size - FIL_PAGE_DATA_END, 8);
498 } else {
499 /* Clean up rest of buffer */
500 memset(dst_frame+header_len+srclen, 0,
501 size - (header_len + srclen));
502 }
503
504 checksum = fil_crypt_calculate_checksum(zip_size, dst_frame);
505
506 /* store the post-encryption checksum after the key-version */
507 mach_write_to_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4,
508 checksum);
509
510 ut_ad(fil_space_verify_crypt_checksum(dst_frame, zip_size));
511
512 srv_stats.pages_encrypted.inc();
513
514 return dst_frame;
515 }
516
517 /** Encrypt a buffer for full checksum format.
518 @param[in,out] crypt_data Crypt data
519 @param[in] space space_id
520 @param[in] offset Page offset
521 @param[in] lsn Log sequence number
522 @param[in] src_frame Page to encrypt
523 @param[in,out] dst_frame Output buffer
524 @return encrypted buffer or NULL */
fil_encrypt_buf_for_full_crc32(fil_space_crypt_t * crypt_data,ulint space,ulint offset,lsn_t lsn,const byte * src_frame,byte * dst_frame)525 static byte* fil_encrypt_buf_for_full_crc32(
526 fil_space_crypt_t* crypt_data,
527 ulint space,
528 ulint offset,
529 lsn_t lsn,
530 const byte* src_frame,
531 byte* dst_frame)
532 {
533 uint key_version = fil_crypt_get_latest_key_version(crypt_data);
534 ut_d(bool corrupted = false);
535 const uint size = buf_page_full_crc32_size(src_frame, NULL,
536 #ifdef UNIV_DEBUG
537 &corrupted
538 #else
539 NULL
540 #endif
541 );
542 ut_ad(!corrupted);
543 uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
544 + FIL_PAGE_FCRC32_CHECKSUM);
545 const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
546 byte* dst = dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
547 uint dstlen = 0;
548
549 ut_a(key_version != ENCRYPTION_KEY_VERSION_INVALID);
550
551 /* Till FIL_PAGE_LSN, page is not encrypted */
552 memcpy(dst_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
553
554 /* Write key version to the page. */
555 mach_write_to_4(dst_frame + FIL_PAGE_FCRC32_KEY_VERSION, key_version);
556
557 int rc = encryption_scheme_encrypt(src, srclen, dst, &dstlen,
558 crypt_data, key_version,
559 uint(space), uint(offset), lsn);
560 ut_a(rc == MY_AES_OK);
561 ut_a(dstlen == srclen);
562
563 const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
564 mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
565 /* Clean the rest of the buffer. FIXME: Punch holes when writing! */
566 memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
567
568 srv_stats.pages_encrypted.inc();
569
570 return dst_frame;
571 }
572
573 /** Encrypt a buffer.
574 @param[in,out] crypt_data Crypt data
575 @param[in] space space_id
576 @param[in] offset Page offset
577 @param[in] src_frame Page to encrypt
578 @param[in] zip_size ROW_FORMAT=COMPRESSED
579 page size, or 0
580 @param[in,out] dst_frame Output buffer
581 @param[in] use_full_checksum full crc32 algo is used
582 @return encrypted buffer or NULL */
fil_encrypt_buf(fil_space_crypt_t * crypt_data,ulint space,ulint offset,const byte * src_frame,ulint zip_size,byte * dst_frame,bool use_full_checksum)583 byte* fil_encrypt_buf(
584 fil_space_crypt_t* crypt_data,
585 ulint space,
586 ulint offset,
587 const byte* src_frame,
588 ulint zip_size,
589 byte* dst_frame,
590 bool use_full_checksum)
591 {
592 const lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
593
594 if (use_full_checksum) {
595 ut_ad(!zip_size);
596 return fil_encrypt_buf_for_full_crc32(
597 crypt_data, space, offset,
598 lsn, src_frame, dst_frame);
599 }
600
601 return fil_encrypt_buf_for_non_full_checksum(
602 crypt_data, space, offset, lsn,
603 src_frame, zip_size, dst_frame);
604 }
605
606 /** Check whether these page types are allowed to encrypt.
607 @param[in] space tablespace object
608 @param[in] src_frame source page
609 @return true if it is valid page type */
fil_space_encrypt_valid_page_type(const fil_space_t * space,const byte * src_frame)610 static bool fil_space_encrypt_valid_page_type(
611 const fil_space_t* space,
612 const byte* src_frame)
613 {
614 switch (fil_page_get_type(src_frame)) {
615 case FIL_PAGE_RTREE:
616 return space->full_crc32();
617 case FIL_PAGE_TYPE_FSP_HDR:
618 case FIL_PAGE_TYPE_XDES:
619 return false;
620 }
621
622 return true;
623 }
624
625 /******************************************************************
626 Encrypt a page
627
628 @param[in] space Tablespace
629 @param[in] offset Page offset
630 @param[in] src_frame Page to encrypt
631 @param[in,out] dst_frame Output buffer
632 @return encrypted buffer or NULL */
fil_space_encrypt(const fil_space_t * space,ulint offset,byte * src_frame,byte * dst_frame)633 byte* fil_space_encrypt(
634 const fil_space_t* space,
635 ulint offset,
636 byte* src_frame,
637 byte* dst_frame)
638 {
639 if (!fil_space_encrypt_valid_page_type(space, src_frame)) {
640 return src_frame;
641 }
642
643 if (!space->crypt_data || !space->crypt_data->is_encrypted()) {
644 return (src_frame);
645 }
646
647 ut_ad(space->referenced());
648
649 return fil_encrypt_buf(space->crypt_data, space->id, offset,
650 src_frame, space->zip_size(),
651 dst_frame, space->full_crc32());
652 }
653
654 /** Decrypt a page for full checksum format.
655 @param[in] space space id
656 @param[in] crypt_data crypt_data
657 @param[in] tmp_frame Temporary buffer
658 @param[in,out] src_frame Page to decrypt
659 @return DB_SUCCESS or error */
fil_space_decrypt_full_crc32(ulint space,fil_space_crypt_t * crypt_data,byte * tmp_frame,byte * src_frame)660 static dberr_t fil_space_decrypt_full_crc32(
661 ulint space,
662 fil_space_crypt_t* crypt_data,
663 byte* tmp_frame,
664 byte* src_frame)
665 {
666 uint key_version = mach_read_from_4(
667 src_frame + FIL_PAGE_FCRC32_KEY_VERSION);
668 lsn_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
669 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
670
671 ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
672
673 ut_ad(crypt_data);
674 ut_ad(crypt_data->is_encrypted());
675
676 memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
677
678 /* Calculate the offset where decryption starts */
679 const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
680 byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
681 uint dstlen = 0;
682 bool corrupted = false;
683 uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
684 if (UNIV_UNLIKELY(corrupted)) {
685 return DB_DECRYPTION_FAILED;
686 }
687
688 uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
689 + FIL_PAGE_FCRC32_CHECKSUM);
690
691 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
692 crypt_data, key_version,
693 (uint) space, offset, lsn);
694
695 if (rc != MY_AES_OK || dstlen != srclen) {
696 if (rc == -1) {
697 return DB_DECRYPTION_FAILED;
698 }
699
700 ib::fatal() << "Unable to decrypt data-block "
701 << " src: " << src << "srclen: "
702 << srclen << " buf: " << dst << "buflen: "
703 << dstlen << " return-code: " << rc
704 << " Can't continue!";
705 }
706
707 /* Copy only checksum part in the trailer */
708 memcpy(tmp_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
709 src_frame + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
710 FIL_PAGE_FCRC32_CHECKSUM);
711
712 srv_stats.pages_decrypted.inc();
713
714 return DB_SUCCESS; /* page was decrypted */
715 }
716
717 /** Decrypt a page for non full checksum format.
718 @param[in] crypt_data crypt_data
719 @param[in] tmp_frame Temporary buffer
720 @param[in] physical_size page size
721 @param[in,out] src_frame Page to decrypt
722 @return DB_SUCCESS or error */
fil_space_decrypt_for_non_full_checksum(fil_space_crypt_t * crypt_data,byte * tmp_frame,ulint physical_size,byte * src_frame)723 static dberr_t fil_space_decrypt_for_non_full_checksum(
724 fil_space_crypt_t* crypt_data,
725 byte* tmp_frame,
726 ulint physical_size,
727 byte* src_frame)
728 {
729 uint key_version = mach_read_from_4(
730 src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
731 bool page_compressed = (fil_page_get_type(src_frame)
732 == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
733 uint offset = mach_read_from_4(src_frame + FIL_PAGE_OFFSET);
734 uint space = mach_read_from_4(
735 src_frame + FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID);
736 ib_uint64_t lsn = mach_read_from_8(src_frame + FIL_PAGE_LSN);
737
738 ut_a(key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
739 ut_a(crypt_data != NULL && crypt_data->is_encrypted());
740
741 /* read space & lsn */
742 uint header_len = FIL_PAGE_DATA;
743
744 if (page_compressed) {
745 header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
746 }
747
748 /* Copy FIL page header, it is not encrypted */
749 memcpy(tmp_frame, src_frame, header_len);
750
751 /* Calculate the offset where decryption starts */
752 const byte* src = src_frame + header_len;
753 byte* dst = tmp_frame + header_len;
754 uint32 dstlen = 0;
755 uint srclen = uint(physical_size) - header_len - FIL_PAGE_DATA_END;
756
757 if (page_compressed) {
758 srclen = mach_read_from_2(src_frame + FIL_PAGE_DATA);
759 }
760
761 int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
762 crypt_data, key_version,
763 space, offset, lsn);
764
765 if (! ((rc == MY_AES_OK) && ((ulint) dstlen == srclen))) {
766
767 if (rc == -1) {
768 return DB_DECRYPTION_FAILED;
769 }
770
771 ib::fatal() << "Unable to decrypt data-block "
772 << " src: " << static_cast<const void*>(src)
773 << "srclen: "
774 << srclen << " buf: "
775 << static_cast<const void*>(dst) << "buflen: "
776 << dstlen << " return-code: " << rc
777 << " Can't continue!";
778 }
779
780 /* For compressed tables we do not store the FIL header because
781 the whole page is not stored to the disk. In compressed tables only
782 the FIL header + compressed (and now encrypted) payload alligned
783 to sector boundary is written. */
784 if (!page_compressed) {
785 /* Copy FIL trailer */
786 memcpy(tmp_frame + physical_size - FIL_PAGE_DATA_END,
787 src_frame + physical_size - FIL_PAGE_DATA_END,
788 FIL_PAGE_DATA_END);
789 }
790
791 srv_stats.pages_decrypted.inc();
792
793 return DB_SUCCESS; /* page was decrypted */
794 }
795
796 /** Decrypt a page.
797 @param[in] space_id tablespace id
798 @param[in] crypt_data crypt_data
799 @param[in] tmp_frame Temporary buffer
800 @param[in] physical_size page size
801 @param[in] fsp_flags Tablespace flags
802 @param[in,out] src_frame Page to decrypt
803 @param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
804 @return DB_SUCCESS or error */
805 UNIV_INTERN
806 dberr_t
fil_space_decrypt(ulint space_id,fil_space_crypt_t * crypt_data,byte * tmp_frame,ulint physical_size,ulint fsp_flags,byte * src_frame)807 fil_space_decrypt(
808 ulint space_id,
809 fil_space_crypt_t* crypt_data,
810 byte* tmp_frame,
811 ulint physical_size,
812 ulint fsp_flags,
813 byte* src_frame)
814 {
815 if (fil_space_t::full_crc32(fsp_flags)) {
816 return fil_space_decrypt_full_crc32(
817 space_id, crypt_data, tmp_frame, src_frame);
818 }
819
820 return fil_space_decrypt_for_non_full_checksum(crypt_data, tmp_frame,
821 physical_size,
822 src_frame);
823 }
824
825 /**
826 Decrypt a page.
827 @param[in] space Tablespace
828 @param[in] tmp_frame Temporary buffer used for decrypting
829 @param[in,out] src_frame Page to decrypt
830 @return decrypted page, or original not encrypted page if decryption is
831 not needed.*/
832 UNIV_INTERN
833 byte*
fil_space_decrypt(const fil_space_t * space,byte * tmp_frame,byte * src_frame)834 fil_space_decrypt(
835 const fil_space_t* space,
836 byte* tmp_frame,
837 byte* src_frame)
838 {
839 const ulint physical_size = space->physical_size();
840
841 ut_ad(space->crypt_data != NULL && space->crypt_data->is_encrypted());
842 ut_ad(space->referenced());
843
844 if (DB_SUCCESS != fil_space_decrypt(space->id, space->crypt_data,
845 tmp_frame, physical_size,
846 space->flags, src_frame)) {
847 return nullptr;
848 }
849
850 /* Copy the decrypted page back to page buffer, not
851 really any other options. */
852 memcpy(src_frame, tmp_frame, physical_size);
853
854 return src_frame;
855 }
856
857 /**
858 Calculate post encryption checksum
859 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
860 @param[in] dst_frame Block where checksum is calculated
861 @return page checksum
862 not needed. */
863 uint32_t
fil_crypt_calculate_checksum(ulint zip_size,const byte * dst_frame)864 fil_crypt_calculate_checksum(ulint zip_size, const byte* dst_frame)
865 {
866 /* For encrypted tables we use only crc32 and strict_crc32 */
867 return zip_size
868 ? page_zip_calc_checksum(dst_frame, zip_size,
869 SRV_CHECKSUM_ALGORITHM_CRC32)
870 : buf_calc_page_crc32(dst_frame);
871 }
872
873 /***********************************************************************/
874
875 /** A copy of global key state */
876 struct key_state_t {
key_state_tkey_state_t877 key_state_t() : key_id(0), key_version(0),
878 rotate_key_age(srv_fil_crypt_rotate_key_age) {}
operator ==key_state_t879 bool operator==(const key_state_t& other) const {
880 return key_version == other.key_version &&
881 rotate_key_age == other.rotate_key_age;
882 }
883 uint key_id;
884 uint key_version;
885 uint rotate_key_age;
886 };
887
888 /***********************************************************************
889 Copy global key state
890 @param[in,out] new_state key state
891 @param[in] crypt_data crypt data */
892 static void
fil_crypt_get_key_state(key_state_t * new_state,fil_space_crypt_t * crypt_data)893 fil_crypt_get_key_state(
894 key_state_t* new_state,
895 fil_space_crypt_t* crypt_data)
896 {
897 if (srv_encrypt_tables) {
898 new_state->key_version = crypt_data->key_get_latest_version();
899 new_state->rotate_key_age = srv_fil_crypt_rotate_key_age;
900
901 ut_a(new_state->key_version != ENCRYPTION_KEY_NOT_ENCRYPTED);
902 } else {
903 new_state->key_version = 0;
904 new_state->rotate_key_age = 0;
905 }
906 }
907
908 /***********************************************************************
909 Check if a key needs rotation given a key_state
910 @param[in] crypt_data Encryption information
911 @param[in] key_version Current key version
912 @param[in] latest_key_version Latest key version
913 @param[in] rotate_key_age when to rotate
914 @return true if key needs rotation, false if not */
915 static bool
fil_crypt_needs_rotation(const fil_space_crypt_t * crypt_data,uint key_version,uint latest_key_version,uint rotate_key_age)916 fil_crypt_needs_rotation(
917 const fil_space_crypt_t* crypt_data,
918 uint key_version,
919 uint latest_key_version,
920 uint rotate_key_age)
921 {
922 if (key_version == ENCRYPTION_KEY_VERSION_INVALID) {
923 return false;
924 }
925
926 if (key_version == 0 && latest_key_version != 0) {
927 /* this is rotation unencrypted => encrypted
928 * ignore rotate_key_age */
929 return true;
930 }
931
932 if (latest_key_version == 0 && key_version != 0) {
933 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT) {
934 /* this is rotation encrypted => unencrypted */
935 return true;
936 }
937 return false;
938 }
939
940 if (crypt_data->encryption == FIL_ENCRYPTION_DEFAULT
941 && crypt_data->type == CRYPT_SCHEME_1
942 && !srv_encrypt_tables) {
943 /* This is rotation encrypted => unencrypted */
944 return true;
945 }
946
947 if (rotate_key_age == 0) {
948 return false;
949 }
950
951 /* this is rotation encrypted => encrypted,
952 * only reencrypt if key is sufficiently old */
953 if (key_version + rotate_key_age < latest_key_version) {
954 return true;
955 }
956
957 return false;
958 }
959
960 /** Read page 0 and possible crypt data from there.
961 @param[in,out] space Tablespace */
962 static inline
963 void
fil_crypt_read_crypt_data(fil_space_t * space)964 fil_crypt_read_crypt_data(fil_space_t* space)
965 {
966 if (space->crypt_data || space->size || !space->get_size()) {
967 /* The encryption metadata has already been read, or
968 the tablespace is not encrypted and the file has been
969 opened already, or the file cannot be accessed,
970 likely due to a concurrent DROP
971 (possibly as part of TRUNCATE or ALTER TABLE).
972 FIXME: The file can become unaccessible any time
973 after this check! We should really remove this
974 function and instead make crypt_data an integral
975 part of fil_space_t. */
976 return;
977 }
978
979 const ulint zip_size = space->zip_size();
980 mtr_t mtr;
981 mtr.start();
982 if (buf_block_t* block = buf_page_get_gen(page_id_t(space->id, 0),
983 zip_size, RW_S_LATCH,
984 nullptr,
985 BUF_GET_POSSIBLY_FREED,
986 __FILE__, __LINE__, &mtr)) {
987 if (block->page.status == buf_page_t::FREED) {
988 goto func_exit;
989 }
990 mutex_enter(&fil_system.mutex);
991 if (!space->crypt_data && !space->is_stopping()) {
992 space->crypt_data = fil_space_read_crypt_data(
993 zip_size, block->frame);
994 }
995 mutex_exit(&fil_system.mutex);
996 }
997 func_exit:
998 mtr.commit();
999 }
1000
1001 /** Start encrypting a space
1002 @param[in,out] space Tablespace
1003 @return true if a recheck of tablespace is needed by encryption thread. */
fil_crypt_start_encrypting_space(fil_space_t * space)1004 static bool fil_crypt_start_encrypting_space(fil_space_t* space)
1005 {
1006 mutex_enter(&fil_crypt_threads_mutex);
1007
1008 fil_space_crypt_t *crypt_data = space->crypt_data;
1009
1010 /* If space is not encrypted and encryption is not enabled, then
1011 do not continue encrypting the space. */
1012 if (!crypt_data && !srv_encrypt_tables) {
1013 mutex_exit(&fil_crypt_threads_mutex);
1014 return false;
1015 }
1016
1017 const bool recheck = fil_crypt_start_converting;
1018
1019 if (recheck || crypt_data || space->is_stopping()) {
1020 mutex_exit(&fil_crypt_threads_mutex);
1021 return recheck;
1022 }
1023
1024 /* NOTE: we need to write and flush page 0 before publishing
1025 * the crypt data. This so that after restart there is no
1026 * risk of finding encrypted pages without having
1027 * crypt data in page 0 */
1028
1029 /* 1 - create crypt data */
1030 crypt_data = fil_space_create_crypt_data(
1031 FIL_ENCRYPTION_DEFAULT, FIL_DEFAULT_ENCRYPTION_KEY);
1032
1033 if (crypt_data == NULL) {
1034 mutex_exit(&fil_crypt_threads_mutex);
1035 return false;
1036 }
1037
1038 fil_crypt_start_converting = true;
1039 mutex_exit(&fil_crypt_threads_mutex);
1040
1041 mtr_t mtr;
1042 mtr.start();
1043
1044 /* 2 - get page 0 */
1045 dberr_t err = DB_SUCCESS;
1046 if (buf_block_t* block = buf_page_get_gen(
1047 page_id_t(space->id, 0), space->zip_size(),
1048 RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
1049 __FILE__, __LINE__, &mtr, &err)) {
1050 if (block->page.status == buf_page_t::FREED) {
1051 goto abort;
1052 }
1053
1054 crypt_data->type = CRYPT_SCHEME_1;
1055 crypt_data->min_key_version = 0; // all pages are unencrypted
1056 crypt_data->rotate_state.start_time = time(0);
1057 crypt_data->rotate_state.starting = true;
1058 crypt_data->rotate_state.active_threads = 1;
1059
1060 mutex_enter(&fil_system.mutex);
1061 const bool stopping = space->is_stopping();
1062 if (!stopping) {
1063 space->crypt_data = crypt_data;
1064 }
1065 mutex_exit(&fil_system.mutex);
1066
1067 if (stopping) {
1068 goto abort;
1069 }
1070
1071 /* 3 - write crypt data to page 0 */
1072 mtr.set_named_space(space);
1073 crypt_data->write_page0(block, &mtr);
1074
1075 mtr.commit();
1076
1077 /* 4 - sync tablespace before publishing crypt data */
1078 while (buf_flush_list_space(space));
1079
1080 /* 5 - publish crypt data */
1081 mutex_enter(&fil_crypt_threads_mutex);
1082 mutex_enter(&crypt_data->mutex);
1083 crypt_data->type = CRYPT_SCHEME_1;
1084 ut_a(crypt_data->rotate_state.active_threads == 1);
1085 crypt_data->rotate_state.active_threads = 0;
1086 crypt_data->rotate_state.starting = false;
1087
1088 fil_crypt_start_converting = false;
1089 mutex_exit(&crypt_data->mutex);
1090 mutex_exit(&fil_crypt_threads_mutex);
1091
1092 return false;
1093 }
1094
1095 abort:
1096 mtr.commit();
1097 mutex_enter(&fil_crypt_threads_mutex);
1098 fil_crypt_start_converting = false;
1099 mutex_exit(&fil_crypt_threads_mutex);
1100
1101 crypt_data->~fil_space_crypt_t();
1102 ut_free(crypt_data);
1103 return false;
1104 }
1105
1106 /** State of a rotation thread */
1107 struct rotate_thread_t {
rotate_thread_trotate_thread_t1108 explicit rotate_thread_t(uint no) {
1109 memset(this, 0, sizeof(* this));
1110 thread_no = no;
1111 first = true;
1112 estimated_max_iops = 20;
1113 }
1114
1115 uint thread_no;
1116 bool first; /*!< is position before first space */
1117 fil_space_t* space; /*!< current space or NULL */
1118 uint32_t offset; /*!< current page number */
1119 ulint batch; /*!< #pages to rotate */
1120 uint min_key_version_found;/*!< min key version found but not rotated */
1121 lsn_t end_lsn; /*!< max lsn when rotating this space */
1122
1123 uint estimated_max_iops; /*!< estimation of max iops */
1124 uint allocated_iops; /*!< allocated iops */
1125 ulint cnt_waited; /*!< #times waited during this slot */
1126 uintmax_t sum_waited_us; /*!< wait time during this slot */
1127
1128 fil_crypt_stat_t crypt_stat; // statistics
1129
1130 /** @return whether this thread should terminate */
should_shutdownrotate_thread_t1131 bool should_shutdown() const {
1132 switch (srv_shutdown_state) {
1133 case SRV_SHUTDOWN_NONE:
1134 return thread_no >= srv_n_fil_crypt_threads;
1135 case SRV_SHUTDOWN_EXIT_THREADS:
1136 /* srv_init_abort() must have been invoked */
1137 case SRV_SHUTDOWN_CLEANUP:
1138 case SRV_SHUTDOWN_INITIATED:
1139 return true;
1140 case SRV_SHUTDOWN_LAST_PHASE:
1141 break;
1142 }
1143 ut_ad(0);
1144 return true;
1145 }
1146 };
1147
1148 /** Avoid the removal of the tablespace from
1149 default_encrypt_list only when
1150 1) Another active encryption thread working on tablespace
1151 2) Eligible for tablespace key rotation
1152 3) Tablespace is in flushing phase
1153 @return true if tablespace should be removed from
1154 default encrypt */
fil_crypt_must_remove(const fil_space_t & space)1155 static bool fil_crypt_must_remove(const fil_space_t &space)
1156 {
1157 ut_ad(space.purpose == FIL_TYPE_TABLESPACE);
1158 fil_space_crypt_t *crypt_data = space.crypt_data;
1159 ut_ad(mutex_own(&fil_system.mutex));
1160 const ulong encrypt_tables= srv_encrypt_tables;
1161 if (!crypt_data)
1162 return !encrypt_tables;
1163 if (!crypt_data->is_key_found())
1164 return true;
1165
1166 mutex_enter(&crypt_data->mutex);
1167 const bool remove= (space.is_stopping() || crypt_data->not_encrypted()) &&
1168 (!crypt_data->rotate_state.flushing &&
1169 !encrypt_tables == !!crypt_data->min_key_version &&
1170 !crypt_data->rotate_state.active_threads);
1171 mutex_exit(&crypt_data->mutex);
1172 return remove;
1173 }
1174
1175 /***********************************************************************
1176 Check if space needs rotation given a key_state
1177 @param[in,out] state Key rotation state
1178 @param[in,out] key_state Key state
1179 @param[in,out] recheck needs recheck ?
1180 @return true if space needs key rotation */
1181 static
1182 bool
fil_crypt_space_needs_rotation(rotate_thread_t * state,key_state_t * key_state,bool * recheck)1183 fil_crypt_space_needs_rotation(
1184 rotate_thread_t* state,
1185 key_state_t* key_state,
1186 bool* recheck)
1187 {
1188 fil_space_t* space = state->space;
1189
1190 /* Make sure that tablespace is normal tablespace */
1191 if (space->purpose != FIL_TYPE_TABLESPACE) {
1192 return false;
1193 }
1194
1195 ut_ad(space->referenced());
1196
1197 fil_space_crypt_t *crypt_data = space->crypt_data;
1198
1199 if (crypt_data == NULL) {
1200 /**
1201 * space has no crypt data
1202 * start encrypting it...
1203 */
1204 *recheck = fil_crypt_start_encrypting_space(space);
1205 crypt_data = space->crypt_data;
1206
1207 if (crypt_data == NULL) {
1208 return false;
1209 }
1210
1211 crypt_data->key_get_latest_version();
1212 }
1213
1214 /* If used key_id is not found from encryption plugin we can't
1215 continue to rotate the tablespace */
1216 if (!crypt_data->is_key_found()) {
1217 return false;
1218 }
1219
1220 bool need_key_rotation = false;
1221 mutex_enter(&crypt_data->mutex);
1222
1223 do {
1224 /* prevent threads from starting to rotate space */
1225 if (crypt_data->rotate_state.starting) {
1226 /* recheck this space later */
1227 *recheck = true;
1228 break;
1229 }
1230
1231 /* prevent threads from starting to rotate space */
1232 if (space->is_stopping()) {
1233 break;
1234 }
1235
1236 if (crypt_data->rotate_state.flushing) {
1237 break;
1238 }
1239
1240 /* No need to rotate space if encryption is disabled */
1241 if (crypt_data->not_encrypted()) {
1242 break;
1243 }
1244
1245 if (crypt_data->key_id != key_state->key_id) {
1246 key_state->key_id= crypt_data->key_id;
1247 fil_crypt_get_key_state(key_state, crypt_data);
1248 }
1249
1250 need_key_rotation = fil_crypt_needs_rotation(
1251 crypt_data,
1252 crypt_data->min_key_version,
1253 key_state->key_version,
1254 key_state->rotate_key_age);
1255 } while (0);
1256
1257 mutex_exit(&crypt_data->mutex);
1258 return need_key_rotation;
1259 }
1260
1261 /***********************************************************************
1262 Update global statistics with thread statistics
1263 @param[in,out] state key rotation statistics */
1264 static void
fil_crypt_update_total_stat(rotate_thread_t * state)1265 fil_crypt_update_total_stat(
1266 rotate_thread_t *state)
1267 {
1268 mutex_enter(&crypt_stat_mutex);
1269 crypt_stat.pages_read_from_cache +=
1270 state->crypt_stat.pages_read_from_cache;
1271 crypt_stat.pages_read_from_disk +=
1272 state->crypt_stat.pages_read_from_disk;
1273 crypt_stat.pages_modified += state->crypt_stat.pages_modified;
1274 crypt_stat.pages_flushed += state->crypt_stat.pages_flushed;
1275 // remote old estimate
1276 crypt_stat.estimated_iops -= state->crypt_stat.estimated_iops;
1277 // add new estimate
1278 crypt_stat.estimated_iops += state->estimated_max_iops;
1279 mutex_exit(&crypt_stat_mutex);
1280
1281 // make new estimate "current" estimate
1282 memset(&state->crypt_stat, 0, sizeof(state->crypt_stat));
1283 // record our old (current) estimate
1284 state->crypt_stat.estimated_iops = state->estimated_max_iops;
1285 }
1286
1287 /***********************************************************************
1288 Allocate iops to thread from global setting,
1289 used before starting to rotate a space.
1290 @param[in,out] state Rotation state
1291 @return true if allocation succeeded, false if failed */
1292 static
1293 bool
fil_crypt_alloc_iops(rotate_thread_t * state)1294 fil_crypt_alloc_iops(
1295 rotate_thread_t *state)
1296 {
1297 ut_ad(state->allocated_iops == 0);
1298
1299 /* We have not yet selected the space to rotate, thus
1300 state might not contain space and we can't check
1301 its status yet. */
1302
1303 uint max_iops = state->estimated_max_iops;
1304 mutex_enter(&fil_crypt_threads_mutex);
1305
1306 if (n_fil_crypt_iops_allocated >= srv_n_fil_crypt_iops) {
1307 /* this can happen when user decreases srv_fil_crypt_iops */
1308 mutex_exit(&fil_crypt_threads_mutex);
1309 return false;
1310 }
1311
1312 uint alloc = srv_n_fil_crypt_iops - n_fil_crypt_iops_allocated;
1313
1314 if (alloc > max_iops) {
1315 alloc = max_iops;
1316 }
1317
1318 n_fil_crypt_iops_allocated += alloc;
1319 mutex_exit(&fil_crypt_threads_mutex);
1320
1321 state->allocated_iops = alloc;
1322
1323 return alloc > 0;
1324 }
1325
1326 /***********************************************************************
1327 Reallocate iops to thread,
1328 used when inside a space
1329 @param[in,out] state Rotation state */
1330 static
1331 void
fil_crypt_realloc_iops(rotate_thread_t * state)1332 fil_crypt_realloc_iops(
1333 rotate_thread_t *state)
1334 {
1335 ut_a(state->allocated_iops > 0);
1336
1337 if (10 * state->cnt_waited > state->batch) {
1338 /* if we waited more than 10% re-estimate max_iops */
1339 ulint avg_wait_time_us =
1340 ulint(state->sum_waited_us / state->cnt_waited);
1341
1342 if (avg_wait_time_us == 0) {
1343 avg_wait_time_us = 1; // prevent division by zero
1344 }
1345
1346 DBUG_PRINT("ib_crypt",
1347 ("thr_no: %u - update estimated_max_iops from %u to "
1348 ULINTPF ".",
1349 state->thread_no,
1350 state->estimated_max_iops,
1351 1000000 / avg_wait_time_us));
1352
1353 state->estimated_max_iops = uint(1000000 / avg_wait_time_us);
1354 state->cnt_waited = 0;
1355 state->sum_waited_us = 0;
1356 } else {
1357 DBUG_PRINT("ib_crypt",
1358 ("thr_no: %u only waited " ULINTPF
1359 "%% skip re-estimate.",
1360 state->thread_no,
1361 (100 * state->cnt_waited)
1362 / (state->batch ? state->batch : 1)));
1363 }
1364
1365 if (state->estimated_max_iops <= state->allocated_iops) {
1366 /* return extra iops */
1367 uint extra = state->allocated_iops - state->estimated_max_iops;
1368
1369 if (extra > 0) {
1370 mutex_enter(&fil_crypt_threads_mutex);
1371 if (n_fil_crypt_iops_allocated < extra) {
1372 /* unknown bug!
1373 * crash in debug
1374 * keep n_fil_crypt_iops_allocated unchanged
1375 * in release */
1376 ut_ad(0);
1377 extra = 0;
1378 }
1379 n_fil_crypt_iops_allocated -= extra;
1380 state->allocated_iops -= extra;
1381
1382 if (state->allocated_iops == 0) {
1383 /* no matter how slow io system seems to be
1384 * never decrease allocated_iops to 0... */
1385 state->allocated_iops ++;
1386 n_fil_crypt_iops_allocated ++;
1387 }
1388
1389 os_event_set(fil_crypt_threads_event);
1390 mutex_exit(&fil_crypt_threads_mutex);
1391 }
1392 } else {
1393 /* see if there are more to get */
1394 mutex_enter(&fil_crypt_threads_mutex);
1395 if (n_fil_crypt_iops_allocated < srv_n_fil_crypt_iops) {
1396 /* there are extra iops free */
1397 uint extra = srv_n_fil_crypt_iops -
1398 n_fil_crypt_iops_allocated;
1399 if (state->allocated_iops + extra >
1400 state->estimated_max_iops) {
1401 /* but don't alloc more than our max */
1402 extra = state->estimated_max_iops -
1403 state->allocated_iops;
1404 }
1405 n_fil_crypt_iops_allocated += extra;
1406 state->allocated_iops += extra;
1407
1408 DBUG_PRINT("ib_crypt",
1409 ("thr_no: %u increased iops from %u to %u.",
1410 state->thread_no,
1411 state->allocated_iops - extra,
1412 state->allocated_iops));
1413
1414 }
1415 mutex_exit(&fil_crypt_threads_mutex);
1416 }
1417
1418 fil_crypt_update_total_stat(state);
1419 }
1420
1421 /** Release excess allocated iops
1422 @param state rotation state
1423 @param wake whether to wake up other threads */
fil_crypt_return_iops(rotate_thread_t * state,bool wake=true)1424 static void fil_crypt_return_iops(rotate_thread_t *state, bool wake= true)
1425 {
1426 if (state->allocated_iops > 0) {
1427 uint iops = state->allocated_iops;
1428 mutex_enter(&fil_crypt_threads_mutex);
1429 if (n_fil_crypt_iops_allocated < iops) {
1430 /* unknown bug!
1431 * crash in debug
1432 * keep n_fil_crypt_iops_allocated unchanged
1433 * in release */
1434 ut_ad(0);
1435 iops = 0;
1436 }
1437
1438 n_fil_crypt_iops_allocated -= iops;
1439 state->allocated_iops = 0;
1440 if (wake) {
1441 os_event_set(fil_crypt_threads_event);
1442 }
1443 mutex_exit(&fil_crypt_threads_mutex);
1444 }
1445
1446 fil_crypt_update_total_stat(state);
1447 }
1448
1449 /** Acquire a tablespace reference.
1450 @return whether a tablespace reference was successfully acquired */
acquire_if_not_stopped()1451 inline bool fil_space_t::acquire_if_not_stopped()
1452 {
1453 ut_ad(mutex_own(&fil_system.mutex));
1454 const uint32_t n= acquire_low();
1455 if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
1456 return true;
1457 if (UNIV_UNLIKELY(n & STOPPING))
1458 return false;
1459 return UNIV_LIKELY(!(n & CLOSING)) || prepare(true);
1460 }
1461
fil_crypt_must_default_encrypt()1462 bool fil_crypt_must_default_encrypt()
1463 {
1464 return !srv_fil_crypt_rotate_key_age || !srv_encrypt_rotate;
1465 }
1466
1467 /** Return the next tablespace from default_encrypt_tables list.
1468 @param space previous tablespace (nullptr to start from the start)
1469 @param recheck whether the removal condition needs to be rechecked after
1470 the encryption parameters were changed
1471 @param encrypt expected state of innodb_encrypt_tables
1472 @return the next tablespace to process (n_pending_ops incremented)
1473 @retval fil_system.temp_space if there is no work to do
1474 @retval nullptr upon reaching the end of the iteration */
default_encrypt_next(fil_space_t * space,bool recheck,bool encrypt)1475 inline fil_space_t *fil_system_t::default_encrypt_next(fil_space_t *space,
1476 bool recheck,
1477 bool encrypt)
1478 {
1479 ut_ad(mutex_own(&mutex));
1480
1481 sized_ilist<fil_space_t, rotation_list_tag_t>::iterator it=
1482 space && space->is_in_default_encrypt
1483 ? space
1484 : default_encrypt_tables.begin();
1485 const sized_ilist<fil_space_t, rotation_list_tag_t>::iterator end=
1486 default_encrypt_tables.end();
1487
1488 if (space)
1489 {
1490 const bool released= !space->release();
1491
1492 if (space->is_in_default_encrypt)
1493 {
1494 while (++it != end &&
1495 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()));
1496
1497 /* If one of the encryption threads already started
1498 the encryption of the table then don't remove the
1499 unencrypted spaces from default encrypt list.
1500
1501 If there is a change in innodb_encrypt_tables variables
1502 value then don't remove the last processed tablespace
1503 from the default encrypt list. */
1504 if (released && !recheck && fil_crypt_must_remove(*space))
1505 {
1506 ut_a(!default_encrypt_tables.empty());
1507 default_encrypt_tables.remove(*space);
1508 space->is_in_default_encrypt= false;
1509 }
1510 }
1511 }
1512 else while (it != end &&
1513 (!UT_LIST_GET_LEN(it->chain) || it->is_stopping()))
1514 {
1515 /* Find the next suitable default encrypt table if
1516 beginning of default_encrypt_tables list has been scheduled
1517 to be deleted */
1518 it++;
1519 }
1520
1521 if (it == end)
1522 return temp_space;
1523
1524 do
1525 {
1526 space= &*it;
1527 if (space->acquire_if_not_stopped())
1528 return space;
1529 if (++it == end)
1530 return nullptr;
1531 }
1532 while (!UT_LIST_GET_LEN(it->chain) || it->is_stopping());
1533
1534 return nullptr;
1535 }
1536
1537 /** Determine the next tablespace for encryption key rotation.
1538 @param space current tablespace (nullptr to start from the beginning)
1539 @param recheck whether the removal condition needs to be rechecked after
1540 encryption parameters were changed
1541 @param encrypt expected state of innodb_encrypt_tables
1542 @return the next tablespace
1543 @retval fil_system.temp_space if there is no work to do
1544 @retval nullptr upon reaching the end of the iteration */
next(fil_space_t * space,bool recheck,bool encrypt)1545 inline fil_space_t *fil_space_t::next(fil_space_t *space, bool recheck,
1546 bool encrypt)
1547 {
1548 mutex_enter(&fil_system.mutex);
1549
1550 if (fil_crypt_must_default_encrypt())
1551 space= fil_system.default_encrypt_next(space, recheck, encrypt);
1552 else
1553 {
1554 if (!space)
1555 space= UT_LIST_GET_FIRST(fil_system.space_list);
1556 else
1557 {
1558 /* Move on to the next fil_space_t */
1559 space->release();
1560 space= UT_LIST_GET_NEXT(space_list, space);
1561 }
1562
1563 for (; space; space= UT_LIST_GET_NEXT(space_list, space))
1564 {
1565 if (space->purpose != FIL_TYPE_TABLESPACE)
1566 continue;
1567 const uint32_t n= space->acquire_low();
1568 if (UNIV_LIKELY(!(n & (STOPPING | CLOSING))))
1569 break;
1570 if (!(n & STOPPING) && space->prepare(true))
1571 break;
1572 }
1573 }
1574
1575 mutex_exit(&fil_system.mutex);
1576 return space;
1577 }
1578
1579 /** Search for a space needing rotation
1580 @param[in,out] key_state Key state
1581 @param[in,out] state Rotation state
1582 @param[in,out] recheck recheck of the tablespace is needed or
1583 still encryption thread does write page 0 */
fil_crypt_find_space_to_rotate(key_state_t * key_state,rotate_thread_t * state,bool * recheck)1584 static bool fil_crypt_find_space_to_rotate(
1585 key_state_t* key_state,
1586 rotate_thread_t* state,
1587 bool* recheck)
1588 {
1589 /* we need iops to start rotating */
1590 while (!state->should_shutdown() && !fil_crypt_alloc_iops(state)) {
1591 if (state->space && state->space->is_stopping()) {
1592 state->space->release();
1593 state->space = NULL;
1594 }
1595
1596 os_event_reset(fil_crypt_threads_event);
1597 os_event_wait_time(fil_crypt_threads_event, 100000);
1598 }
1599
1600 if (state->should_shutdown()) {
1601 if (state->space) {
1602 state->space->release();
1603 state->space = NULL;
1604 }
1605 return false;
1606 }
1607
1608 if (state->first) {
1609 state->first = false;
1610 if (state->space) {
1611 state->space->release();
1612 }
1613 state->space = NULL;
1614 }
1615
1616 bool wake;
1617 for (;;) {
1618 state->space = fil_space_t::next(state->space, *recheck,
1619 key_state->key_version != 0);
1620 wake = state->should_shutdown();
1621
1622 if (state->space == fil_system.temp_space) {
1623 goto done;
1624 } else if (wake) {
1625 break;
1626 } else {
1627 wake = true;
1628 }
1629
1630 if (!state->space) {
1631 break;
1632 }
1633
1634 /* If there is no crypt data and we have not yet read
1635 page 0 for this tablespace, we need to read it before
1636 we can continue. */
1637 if (!state->space->crypt_data) {
1638 fil_crypt_read_crypt_data(state->space);
1639 }
1640
1641 if (fil_crypt_space_needs_rotation(state, key_state, recheck)) {
1642 ut_ad(key_state->key_id);
1643 /* init state->min_key_version_found before
1644 * starting on a space */
1645 state->min_key_version_found = key_state->key_version;
1646 return true;
1647 }
1648 }
1649
1650 if (state->space) {
1651 state->space->release();
1652 done:
1653 state->space = NULL;
1654 }
1655
1656 /* no work to do; release our allocation of I/O capacity */
1657 fil_crypt_return_iops(state, wake);
1658
1659 return false;
1660
1661 }
1662
1663 /***********************************************************************
1664 Start rotating a space
1665 @param[in] key_state Key state
1666 @param[in,out] state Rotation state */
1667 static
1668 void
fil_crypt_start_rotate_space(const key_state_t * key_state,rotate_thread_t * state)1669 fil_crypt_start_rotate_space(
1670 const key_state_t* key_state,
1671 rotate_thread_t* state)
1672 {
1673 fil_space_crypt_t *crypt_data = state->space->crypt_data;
1674
1675 ut_ad(crypt_data);
1676 mutex_enter(&crypt_data->mutex);
1677 ut_ad(key_state->key_id == crypt_data->key_id);
1678
1679 if (crypt_data->rotate_state.active_threads == 0) {
1680 /* only first thread needs to init */
1681 crypt_data->rotate_state.next_offset = 1; // skip page 0
1682 /* no need to rotate beyond current max
1683 * if space extends, it will be encrypted with newer version */
1684 /* FIXME: max_offset could be removed and instead
1685 space->size consulted.*/
1686 crypt_data->rotate_state.max_offset = state->space->size;
1687 crypt_data->rotate_state.end_lsn = 0;
1688 crypt_data->rotate_state.min_key_version_found =
1689 key_state->key_version;
1690
1691 crypt_data->rotate_state.start_time = time(0);
1692
1693 if (crypt_data->type == CRYPT_SCHEME_UNENCRYPTED &&
1694 crypt_data->is_encrypted() &&
1695 key_state->key_version != 0) {
1696 /* this is rotation unencrypted => encrypted */
1697 crypt_data->type = CRYPT_SCHEME_1;
1698 }
1699 }
1700
1701 /* count active threads in space */
1702 crypt_data->rotate_state.active_threads++;
1703
1704 /* Initialize thread local state */
1705 state->end_lsn = crypt_data->rotate_state.end_lsn;
1706 state->min_key_version_found =
1707 crypt_data->rotate_state.min_key_version_found;
1708
1709 mutex_exit(&crypt_data->mutex);
1710 }
1711
1712 /***********************************************************************
1713 Search for batch of pages needing rotation
1714 @param[in] key_state Key state
1715 @param[in,out] state Rotation state
1716 @return true if page needing key rotation found, false if not found */
1717 static
1718 bool
fil_crypt_find_page_to_rotate(const key_state_t * key_state,rotate_thread_t * state)1719 fil_crypt_find_page_to_rotate(
1720 const key_state_t* key_state,
1721 rotate_thread_t* state)
1722 {
1723 ulint batch = srv_alloc_time * state->allocated_iops;
1724 fil_space_t* space = state->space;
1725
1726 ut_ad(!space || space->referenced());
1727
1728 /* If space is marked to be dropped stop rotation. */
1729 if (!space || space->is_stopping()) {
1730 return false;
1731 }
1732
1733 fil_space_crypt_t *crypt_data = space->crypt_data;
1734
1735 mutex_enter(&crypt_data->mutex);
1736 ut_ad(key_state->key_id == crypt_data->key_id);
1737
1738 bool found = crypt_data->rotate_state.max_offset >=
1739 crypt_data->rotate_state.next_offset;
1740
1741 if (found) {
1742 state->offset = crypt_data->rotate_state.next_offset;
1743 ulint remaining = crypt_data->rotate_state.max_offset -
1744 crypt_data->rotate_state.next_offset;
1745
1746 if (batch <= remaining) {
1747 state->batch = batch;
1748 } else {
1749 state->batch = remaining;
1750 }
1751 }
1752
1753 crypt_data->rotate_state.next_offset += uint32_t(batch);
1754 mutex_exit(&crypt_data->mutex);
1755 return found;
1756 }
1757
1758 #define fil_crypt_get_page_throttle(state,offset,mtr,sleeptime_ms) \
1759 fil_crypt_get_page_throttle_func(state, offset, mtr, \
1760 sleeptime_ms, __FILE__, __LINE__)
1761
1762 /***********************************************************************
1763 Get a page and compute sleep time
1764 @param[in,out] state Rotation state
1765 @param[in] offset Page offset
1766 @param[in,out] mtr Minitransaction
1767 @param[out] sleeptime_ms Sleep time
1768 @param[in] file File where called
1769 @param[in] line Line where called
1770 @return page or NULL*/
1771 static
1772 buf_block_t*
fil_crypt_get_page_throttle_func(rotate_thread_t * state,uint32_t offset,mtr_t * mtr,ulint * sleeptime_ms,const char * file,unsigned line)1773 fil_crypt_get_page_throttle_func(
1774 rotate_thread_t* state,
1775 uint32_t offset,
1776 mtr_t* mtr,
1777 ulint* sleeptime_ms,
1778 const char* file,
1779 unsigned line)
1780 {
1781 fil_space_t* space = state->space;
1782 const ulint zip_size = space->zip_size();
1783 const page_id_t page_id(space->id, offset);
1784 ut_ad(space->referenced());
1785
1786 /* Before reading from tablespace we need to make sure that
1787 the tablespace is not about to be dropped. */
1788 if (space->is_stopping()) {
1789 return NULL;
1790 }
1791
1792 dberr_t err = DB_SUCCESS;
1793 buf_block_t* block = buf_page_get_gen(page_id, zip_size, RW_X_LATCH,
1794 NULL,
1795 BUF_PEEK_IF_IN_POOL, file, line,
1796 mtr, &err);
1797 if (block != NULL) {
1798 /* page was in buffer pool */
1799 state->crypt_stat.pages_read_from_cache++;
1800 return block;
1801 }
1802
1803 if (space->is_stopping()) {
1804 return NULL;
1805 }
1806
1807 if (fseg_page_is_free(space, state->offset)) {
1808 /* page is already freed */
1809 return NULL;
1810 }
1811
1812 state->crypt_stat.pages_read_from_disk++;
1813
1814 const ulonglong start = my_interval_timer();
1815 block = buf_page_get_gen(page_id, zip_size,
1816 RW_X_LATCH,
1817 NULL, BUF_GET_POSSIBLY_FREED,
1818 file, line, mtr, &err);
1819 const ulonglong end = my_interval_timer();
1820
1821 state->cnt_waited++;
1822
1823 if (end > start) {
1824 state->sum_waited_us += (end - start) / 1000;
1825 }
1826
1827 /* average page load */
1828 ulint add_sleeptime_ms = 0;
1829 ulint avg_wait_time_us =ulint(state->sum_waited_us / state->cnt_waited);
1830 ulint alloc_wait_us = 1000000 / state->allocated_iops;
1831
1832 if (avg_wait_time_us < alloc_wait_us) {
1833 /* we reading faster than we allocated */
1834 add_sleeptime_ms = (alloc_wait_us - avg_wait_time_us) / 1000;
1835 } else {
1836 /* if page load time is longer than we want, skip sleeping */
1837 }
1838
1839 *sleeptime_ms += add_sleeptime_ms;
1840
1841 return block;
1842 }
1843
1844 /***********************************************************************
1845 Rotate one page
1846 @param[in,out] key_state Key state
1847 @param[in,out] state Rotation state */
1848 static
1849 void
fil_crypt_rotate_page(const key_state_t * key_state,rotate_thread_t * state)1850 fil_crypt_rotate_page(
1851 const key_state_t* key_state,
1852 rotate_thread_t* state)
1853 {
1854 fil_space_t*space = state->space;
1855 ulint space_id = space->id;
1856 uint32_t offset = state->offset;
1857 ulint sleeptime_ms = 0;
1858 fil_space_crypt_t *crypt_data = space->crypt_data;
1859
1860 ut_ad(space->referenced());
1861 ut_ad(offset > 0);
1862
1863 /* In fil_crypt_thread where key rotation is done we have
1864 acquired space and checked that this space is not yet
1865 marked to be dropped. Similarly, in fil_crypt_find_page_to_rotate().
1866 Check here also to give DROP TABLE or similar a change. */
1867 if (space->is_stopping()) {
1868 return;
1869 }
1870
1871 if (space_id == TRX_SYS_SPACE && offset == TRX_SYS_PAGE_NO) {
1872 /* don't encrypt this as it contains address to dblwr buffer */
1873 return;
1874 }
1875
1876 mtr_t mtr;
1877 mtr.start();
1878 if (buf_block_t* block = fil_crypt_get_page_throttle(state,
1879 offset, &mtr,
1880 &sleeptime_ms)) {
1881 bool modified = false;
1882 byte* frame = buf_block_get_frame(block);
1883 const lsn_t block_lsn = mach_read_from_8(FIL_PAGE_LSN + frame);
1884 uint kv = buf_page_get_key_version(frame, space->flags);
1885
1886 if (block->page.status == buf_page_t::FREED) {
1887 /* Do not modify freed pages to avoid an assertion
1888 failure on recovery.*/
1889 } else if (block->page.oldest_modification() > 1) {
1890 /* Do not unnecessarily touch pages that are
1891 already dirty. */
1892 } else if (space->is_stopping()) {
1893 /* The tablespace is closing (in DROP TABLE or
1894 TRUNCATE TABLE or similar): avoid further access */
1895 } else if (!kv && !*reinterpret_cast<uint16_t*>
1896 (&frame[FIL_PAGE_TYPE])) {
1897 /* It looks like this page is not
1898 allocated. Because key rotation is accessing
1899 pages in a pattern that is unlike the normal
1900 B-tree and undo log access pattern, we cannot
1901 invoke fseg_page_is_free() here, because that
1902 could result in a deadlock. If we invoked
1903 fseg_page_is_free() and released the
1904 tablespace latch before acquiring block->lock,
1905 then the fseg_page_is_free() information
1906 could be stale already. */
1907
1908 /* If the data file was originally created
1909 before MariaDB 10.0 or MySQL 5.6, some
1910 allocated data pages could carry 0 in
1911 FIL_PAGE_TYPE. The FIL_PAGE_TYPE on those
1912 pages will be updated in
1913 buf_flush_init_for_writing() when the page
1914 is modified the next time.
1915
1916 Also, when the doublewrite buffer pages are
1917 allocated on bootstrap in a non-debug build,
1918 some dummy pages will be allocated, with 0 in
1919 the FIL_PAGE_TYPE. Those pages should be
1920 skipped from key rotation forever. */
1921 } else if (fil_crypt_needs_rotation(
1922 crypt_data,
1923 kv,
1924 key_state->key_version,
1925 key_state->rotate_key_age)) {
1926
1927 mtr.set_named_space(space);
1928 modified = true;
1929
1930 /* force rotation by dummy updating page */
1931 mtr.write<1,mtr_t::FORCED>(*block,
1932 &frame[FIL_PAGE_SPACE_ID],
1933 frame[FIL_PAGE_SPACE_ID]);
1934
1935 /* statistics */
1936 state->crypt_stat.pages_modified++;
1937 } else {
1938 if (crypt_data->is_encrypted()) {
1939 if (kv < state->min_key_version_found) {
1940 state->min_key_version_found = kv;
1941 }
1942 }
1943 }
1944
1945 mtr.commit();
1946 lsn_t end_lsn = mtr.commit_lsn();
1947
1948
1949 if (modified) {
1950 /* if we modified page, we take lsn from mtr */
1951 ut_a(end_lsn > state->end_lsn);
1952 ut_a(end_lsn > block_lsn);
1953 state->end_lsn = end_lsn;
1954 } else {
1955 /* if we did not modify page, check for max lsn */
1956 if (block_lsn > state->end_lsn) {
1957 state->end_lsn = block_lsn;
1958 }
1959 }
1960 } else {
1961 /* If block read failed mtr memo and log should be empty. */
1962 ut_ad(!mtr.has_modifications());
1963 ut_ad(!mtr.is_dirty());
1964 ut_ad(mtr.get_memo()->size() == 0);
1965 ut_ad(mtr.get_log()->size() == 0);
1966 mtr.commit();
1967 }
1968
1969 if (sleeptime_ms) {
1970 os_event_reset(fil_crypt_throttle_sleep_event);
1971 os_event_wait_time(fil_crypt_throttle_sleep_event,
1972 1000 * sleeptime_ms);
1973 }
1974 }
1975
1976 /***********************************************************************
1977 Rotate a batch of pages
1978 @param[in,out] key_state Key state
1979 @param[in,out] state Rotation state */
1980 static
1981 void
fil_crypt_rotate_pages(const key_state_t * key_state,rotate_thread_t * state)1982 fil_crypt_rotate_pages(
1983 const key_state_t* key_state,
1984 rotate_thread_t* state)
1985 {
1986 ulint space_id = state->space->id;
1987 uint32_t end = std::min(state->offset + uint32_t(state->batch),
1988 state->space->free_limit);
1989
1990 ut_ad(state->space->referenced());
1991
1992 for (; state->offset < end; state->offset++) {
1993
1994 /* we can't rotate pages in dblwr buffer as
1995 * it's not possible to read those due to lots of asserts
1996 * in buffer pool.
1997 *
1998 * However since these are only (short-lived) copies of
1999 * real pages, they will be updated anyway when the
2000 * real page is updated
2001 */
2002 if (buf_dblwr.is_inside(page_id_t(space_id, state->offset))) {
2003 continue;
2004 }
2005
2006 /* If space is marked as stopping, stop rotating
2007 pages. */
2008 if (state->space->is_stopping()) {
2009 break;
2010 }
2011
2012 fil_crypt_rotate_page(key_state, state);
2013 }
2014 }
2015
2016 /***********************************************************************
2017 Flush rotated pages and then update page 0
2018
2019 @param[in,out] state rotation state */
2020 static
2021 void
fil_crypt_flush_space(rotate_thread_t * state)2022 fil_crypt_flush_space(
2023 rotate_thread_t* state)
2024 {
2025 fil_space_t* space = state->space;
2026 fil_space_crypt_t *crypt_data = space->crypt_data;
2027
2028 ut_ad(space->referenced());
2029
2030 /* flush tablespace pages so that there are no pages left with old key */
2031 lsn_t end_lsn = crypt_data->rotate_state.end_lsn;
2032
2033 if (end_lsn > 0 && !space->is_stopping()) {
2034 ulint sum_pages = 0;
2035 const ulonglong start = my_interval_timer();
2036 while (buf_flush_list_space(space, &sum_pages));
2037 if (sum_pages) {
2038 const ulonglong end = my_interval_timer();
2039
2040 state->cnt_waited += sum_pages;
2041 state->sum_waited_us += (end - start) / 1000;
2042
2043 /* statistics */
2044 state->crypt_stat.pages_flushed += sum_pages;
2045 }
2046 }
2047
2048 if (crypt_data->min_key_version == 0) {
2049 crypt_data->type = CRYPT_SCHEME_UNENCRYPTED;
2050 }
2051
2052 if (space->is_stopping()) {
2053 return;
2054 }
2055
2056 /* update page 0 */
2057 mtr_t mtr;
2058 mtr.start();
2059
2060 if (buf_block_t* block = buf_page_get_gen(
2061 page_id_t(space->id, 0), space->zip_size(),
2062 RW_X_LATCH, NULL, BUF_GET_POSSIBLY_FREED,
2063 __FILE__, __LINE__, &mtr)) {
2064 if (block->page.status != buf_page_t::FREED) {
2065 mtr.set_named_space(space);
2066 crypt_data->write_page0(block, &mtr);
2067 }
2068 }
2069
2070 mtr.commit();
2071 }
2072
2073 /***********************************************************************
2074 Complete rotating a space
2075 @param[in,out] state Rotation state */
fil_crypt_complete_rotate_space(rotate_thread_t * state)2076 static void fil_crypt_complete_rotate_space(rotate_thread_t* state)
2077 {
2078 fil_space_crypt_t *crypt_data = state->space->crypt_data;
2079
2080 ut_ad(crypt_data);
2081 ut_ad(state->space->referenced());
2082
2083 /* Space might already be dropped */
2084 if (!state->space->is_stopping()) {
2085 mutex_enter(&crypt_data->mutex);
2086
2087 /**
2088 * Update crypt data state with state from thread
2089 */
2090 if (state->min_key_version_found <
2091 crypt_data->rotate_state.min_key_version_found) {
2092 crypt_data->rotate_state.min_key_version_found =
2093 state->min_key_version_found;
2094 }
2095
2096 if (state->end_lsn > crypt_data->rotate_state.end_lsn) {
2097 crypt_data->rotate_state.end_lsn = state->end_lsn;
2098 }
2099
2100 ut_a(crypt_data->rotate_state.active_threads > 0);
2101 crypt_data->rotate_state.active_threads--;
2102 bool last = crypt_data->rotate_state.active_threads == 0;
2103
2104 /**
2105 * check if space is fully done
2106 * this as when threads shutdown, it could be that we "complete"
2107 * iterating before we have scanned the full space.
2108 */
2109 bool done = crypt_data->rotate_state.next_offset >=
2110 crypt_data->rotate_state.max_offset;
2111
2112 /**
2113 * we should flush space if we're last thread AND
2114 * the iteration is done
2115 */
2116 bool should_flush = last && done;
2117
2118 if (should_flush) {
2119 /* we're the last active thread */
2120 crypt_data->rotate_state.flushing = true;
2121 crypt_data->min_key_version =
2122 crypt_data->rotate_state.min_key_version_found;
2123 mutex_exit(&crypt_data->mutex);
2124 fil_crypt_flush_space(state);
2125
2126 mutex_enter(&crypt_data->mutex);
2127 crypt_data->rotate_state.flushing = false;
2128 mutex_exit(&crypt_data->mutex);
2129 } else {
2130 mutex_exit(&crypt_data->mutex);
2131 }
2132 } else {
2133 mutex_enter(&crypt_data->mutex);
2134 ut_a(crypt_data->rotate_state.active_threads > 0);
2135 crypt_data->rotate_state.active_threads--;
2136 mutex_exit(&crypt_data->mutex);
2137 }
2138 }
2139
2140 /*********************************************************************//**
2141 A thread which monitors global key state and rotates tablespaces accordingly
2142 @return a dummy parameter */
2143 extern "C" UNIV_INTERN
2144 os_thread_ret_t
DECLARE_THREAD(fil_crypt_thread)2145 DECLARE_THREAD(fil_crypt_thread)(void*)
2146 {
2147 mutex_enter(&fil_crypt_threads_mutex);
2148 uint thread_no = srv_n_fil_crypt_threads_started;
2149 srv_n_fil_crypt_threads_started++;
2150 os_event_set(fil_crypt_event); /* signal that we started */
2151 mutex_exit(&fil_crypt_threads_mutex);
2152
2153 /* state of this thread */
2154 rotate_thread_t thr(thread_no);
2155
2156 /* if we find a space that is starting, skip over it and recheck it later */
2157 bool recheck = false;
2158
2159 while (!thr.should_shutdown()) {
2160
2161 key_state_t new_state;
2162
2163 while (!thr.should_shutdown()) {
2164
2165 /* wait for key state changes
2166 * i.e either new key version of change or
2167 * new rotate_key_age */
2168 os_event_reset(fil_crypt_threads_event);
2169
2170 if (os_event_wait_time(fil_crypt_threads_event, 1000000) == 0) {
2171 break;
2172 }
2173
2174 if (recheck) {
2175 /* check recheck here, after sleep, so
2176 * that we don't busy loop while when one thread is starting
2177 * a space*/
2178 break;
2179 }
2180 }
2181
2182 recheck = false;
2183 thr.first = true; // restart from first tablespace
2184
2185 /* iterate all spaces searching for those needing rotation */
2186 while (!thr.should_shutdown() &&
2187 fil_crypt_find_space_to_rotate(&new_state, &thr, &recheck)) {
2188
2189 /* we found a space to rotate */
2190 fil_crypt_start_rotate_space(&new_state, &thr);
2191
2192 /* iterate all pages (cooperativly with other threads) */
2193 while (!thr.should_shutdown() &&
2194 fil_crypt_find_page_to_rotate(&new_state, &thr)) {
2195
2196 if (!thr.space->is_stopping()) {
2197 /* rotate a (set) of pages */
2198 fil_crypt_rotate_pages(&new_state, &thr);
2199 }
2200
2201 /* If space is marked as stopping, release
2202 space and stop rotation. */
2203 if (thr.space->is_stopping()) {
2204 fil_crypt_complete_rotate_space(&thr);
2205 thr.space->release();
2206 thr.space = NULL;
2207 break;
2208 }
2209
2210 /* realloc iops */
2211 fil_crypt_realloc_iops(&thr);
2212 }
2213
2214 /* complete rotation */
2215 if (thr.space) {
2216 fil_crypt_complete_rotate_space(&thr);
2217 }
2218
2219 /* force key state refresh */
2220 new_state.key_id = 0;
2221
2222 /* return iops */
2223 fil_crypt_return_iops(&thr);
2224 }
2225 }
2226
2227 /* return iops if shutting down */
2228 fil_crypt_return_iops(&thr);
2229
2230 /* release current space if shutting down */
2231 if (thr.space) {
2232 thr.space->release();
2233 thr.space = NULL;
2234 }
2235
2236 mutex_enter(&fil_crypt_threads_mutex);
2237 srv_n_fil_crypt_threads_started--;
2238 os_event_set(fil_crypt_event); /* signal that we stopped */
2239 mutex_exit(&fil_crypt_threads_mutex);
2240
2241 /* We count the number of threads in os_thread_exit(). A created
2242 thread should always use that to exit and not use return() to exit. */
2243
2244 os_thread_exit();
2245
2246 OS_THREAD_DUMMY_RETURN;
2247 }
2248
2249 /*********************************************************************
2250 Adjust thread count for key rotation
2251 @param[in] enw_cnt Number of threads to be used */
2252 UNIV_INTERN
2253 void
fil_crypt_set_thread_cnt(const uint new_cnt)2254 fil_crypt_set_thread_cnt(
2255 const uint new_cnt)
2256 {
2257 if (!fil_crypt_threads_inited) {
2258 if (srv_shutdown_state != SRV_SHUTDOWN_NONE)
2259 return;
2260 fil_crypt_threads_init();
2261 }
2262
2263 mutex_enter(&fil_crypt_threads_mutex);
2264
2265 if (new_cnt > srv_n_fil_crypt_threads) {
2266 uint add = new_cnt - srv_n_fil_crypt_threads;
2267 srv_n_fil_crypt_threads = new_cnt;
2268 for (uint i = 0; i < add; i++) {
2269 ib::info() << "Creating #"
2270 << i+1 << " encryption thread id "
2271 << os_thread_create(fil_crypt_thread)
2272 << " total threads " << new_cnt << ".";
2273 }
2274 } else if (new_cnt < srv_n_fil_crypt_threads) {
2275 srv_n_fil_crypt_threads = new_cnt;
2276 os_event_set(fil_crypt_threads_event);
2277 }
2278
2279 mutex_exit(&fil_crypt_threads_mutex);
2280
2281 while(srv_n_fil_crypt_threads_started != srv_n_fil_crypt_threads) {
2282 os_event_reset(fil_crypt_event);
2283 os_event_wait_time(fil_crypt_event, 100000);
2284 }
2285
2286 /* Send a message to encryption threads that there could be
2287 something to do. */
2288 if (srv_n_fil_crypt_threads) {
2289 os_event_set(fil_crypt_threads_event);
2290 }
2291 }
2292
2293 /** Initialize the tablespace default_encrypt_tables
2294 if innodb_encryption_rotate_key_age=0. */
fil_crypt_default_encrypt_tables_fill()2295 static void fil_crypt_default_encrypt_tables_fill()
2296 {
2297 ut_ad(mutex_own(&fil_system.mutex));
2298
2299 for (fil_space_t* space = UT_LIST_GET_FIRST(fil_system.space_list);
2300 space != NULL;
2301 space = UT_LIST_GET_NEXT(space_list, space)) {
2302 if (space->purpose != FIL_TYPE_TABLESPACE
2303 || space->is_in_default_encrypt
2304 || UT_LIST_GET_LEN(space->chain) == 0
2305 || !space->acquire_if_not_stopped()) {
2306 continue;
2307 }
2308
2309 /* Ensure that crypt_data has been initialized. */
2310 ut_ad(space->size);
2311
2312 /* Skip ENCRYPTION!=DEFAULT tablespaces. */
2313 if (space->crypt_data
2314 && !space->crypt_data->is_default_encryption()) {
2315 goto next;
2316 }
2317
2318 if (srv_encrypt_tables) {
2319 /* Skip encrypted tablespaces if
2320 innodb_encrypt_tables!=OFF */
2321 if (space->crypt_data
2322 && space->crypt_data->min_key_version) {
2323 goto next;
2324 }
2325 } else {
2326 /* Skip unencrypted tablespaces if
2327 innodb_encrypt_tables=OFF */
2328 if (!space->crypt_data
2329 || !space->crypt_data->min_key_version) {
2330 goto next;
2331 }
2332 }
2333
2334 fil_system.default_encrypt_tables.push_back(*space);
2335 space->is_in_default_encrypt = true;
2336 next:
2337 space->release();
2338 }
2339 }
2340
2341 /*********************************************************************
2342 Adjust max key age
2343 @param[in] val New max key age */
2344 UNIV_INTERN
2345 void
fil_crypt_set_rotate_key_age(uint val)2346 fil_crypt_set_rotate_key_age(
2347 uint val)
2348 {
2349 mutex_enter(&fil_system.mutex);
2350 srv_fil_crypt_rotate_key_age = val;
2351 if (val == 0) {
2352 fil_crypt_default_encrypt_tables_fill();
2353 }
2354 mutex_exit(&fil_system.mutex);
2355 os_event_set(fil_crypt_threads_event);
2356 }
2357
2358 /*********************************************************************
2359 Adjust rotation iops
2360 @param[in] val New max roation iops */
2361 UNIV_INTERN
2362 void
fil_crypt_set_rotation_iops(uint val)2363 fil_crypt_set_rotation_iops(
2364 uint val)
2365 {
2366 srv_n_fil_crypt_iops = val;
2367 os_event_set(fil_crypt_threads_event);
2368 }
2369
2370 /*********************************************************************
2371 Adjust encrypt tables
2372 @param[in] val New setting for innodb-encrypt-tables */
fil_crypt_set_encrypt_tables(ulong val)2373 void fil_crypt_set_encrypt_tables(ulong val)
2374 {
2375 if (!fil_crypt_threads_inited) {
2376 return;
2377 }
2378
2379 mutex_enter(&fil_system.mutex);
2380
2381 srv_encrypt_tables = val;
2382
2383 if (fil_crypt_must_default_encrypt()) {
2384 fil_crypt_default_encrypt_tables_fill();
2385 }
2386
2387 mutex_exit(&fil_system.mutex);
2388
2389 os_event_set(fil_crypt_threads_event);
2390 }
2391
2392 /*********************************************************************
2393 Init threads for key rotation */
2394 UNIV_INTERN
2395 void
fil_crypt_threads_init()2396 fil_crypt_threads_init()
2397 {
2398 if (!fil_crypt_threads_inited) {
2399 fil_crypt_event = os_event_create(0);
2400 fil_crypt_threads_event = os_event_create(0);
2401 mutex_create(LATCH_ID_FIL_CRYPT_THREADS_MUTEX,
2402 &fil_crypt_threads_mutex);
2403
2404 uint cnt = srv_n_fil_crypt_threads;
2405 srv_n_fil_crypt_threads = 0;
2406 fil_crypt_threads_inited = true;
2407 fil_crypt_set_thread_cnt(cnt);
2408 }
2409 }
2410
2411 /*********************************************************************
2412 Clean up key rotation threads resources */
2413 UNIV_INTERN
2414 void
fil_crypt_threads_cleanup()2415 fil_crypt_threads_cleanup()
2416 {
2417 if (!fil_crypt_threads_inited) {
2418 return;
2419 }
2420 ut_a(!srv_n_fil_crypt_threads_started);
2421 os_event_destroy(fil_crypt_event);
2422 os_event_destroy(fil_crypt_threads_event);
2423 mutex_free(&fil_crypt_threads_mutex);
2424 fil_crypt_threads_inited = false;
2425 }
2426
2427 /*********************************************************************
2428 Wait for crypt threads to stop accessing space
2429 @param[in] space Tablespace */
2430 UNIV_INTERN
2431 void
fil_space_crypt_close_tablespace(const fil_space_t * space)2432 fil_space_crypt_close_tablespace(
2433 const fil_space_t* space)
2434 {
2435 fil_space_crypt_t* crypt_data = space->crypt_data;
2436
2437 if (!crypt_data || srv_n_fil_crypt_threads == 0
2438 || !fil_crypt_threads_inited) {
2439 return;
2440 }
2441
2442 mutex_enter(&fil_crypt_threads_mutex);
2443
2444 time_t start = time(0);
2445 time_t last = start;
2446
2447 mutex_enter(&crypt_data->mutex);
2448 mutex_exit(&fil_crypt_threads_mutex);
2449
2450 ulint cnt = crypt_data->rotate_state.active_threads;
2451 bool flushing = crypt_data->rotate_state.flushing;
2452
2453 while (cnt > 0 || flushing) {
2454 mutex_exit(&crypt_data->mutex);
2455 /* release dict mutex so that scrub threads can release their
2456 * table references */
2457 dict_mutex_exit_for_mysql();
2458
2459 /* wakeup throttle (all) sleepers */
2460 os_event_set(fil_crypt_throttle_sleep_event);
2461 os_event_set(fil_crypt_threads_event);
2462
2463 os_thread_sleep(20000);
2464 dict_mutex_enter_for_mysql();
2465 mutex_enter(&crypt_data->mutex);
2466 cnt = crypt_data->rotate_state.active_threads;
2467 flushing = crypt_data->rotate_state.flushing;
2468
2469 time_t now = time(0);
2470
2471 if (now >= last + 30) {
2472 ib::warn() << "Waited "
2473 << now - start
2474 << " seconds to drop space: "
2475 << space->name << " ("
2476 << space->id << ") active threads "
2477 << cnt << "flushing="
2478 << flushing << ".";
2479 last = now;
2480 }
2481 }
2482
2483 mutex_exit(&crypt_data->mutex);
2484 }
2485
2486 /*********************************************************************
2487 Get crypt status for a space (used by information_schema)
2488 @param[in] space Tablespace
2489 @param[out] status Crypt status */
2490 UNIV_INTERN
2491 void
fil_space_crypt_get_status(const fil_space_t * space,struct fil_space_crypt_status_t * status)2492 fil_space_crypt_get_status(
2493 const fil_space_t* space,
2494 struct fil_space_crypt_status_t* status)
2495 {
2496 memset(status, 0, sizeof(*status));
2497
2498 ut_ad(space->referenced());
2499
2500 /* If there is no crypt data and we have not yet read
2501 page 0 for this tablespace, we need to read it before
2502 we can continue. */
2503 if (!space->crypt_data) {
2504 fil_crypt_read_crypt_data(const_cast<fil_space_t*>(space));
2505 }
2506
2507 status->space = ULINT_UNDEFINED;
2508
2509 if (fil_space_crypt_t* crypt_data = space->crypt_data) {
2510 status->space = space->id;
2511 mutex_enter(&crypt_data->mutex);
2512 status->scheme = crypt_data->type;
2513 status->keyserver_requests = crypt_data->keyserver_requests;
2514 status->min_key_version = crypt_data->min_key_version;
2515 status->key_id = crypt_data->key_id;
2516
2517 if (crypt_data->rotate_state.active_threads > 0 ||
2518 crypt_data->rotate_state.flushing) {
2519 status->rotating = true;
2520 status->flushing =
2521 crypt_data->rotate_state.flushing;
2522 status->rotate_next_page_number =
2523 crypt_data->rotate_state.next_offset;
2524 status->rotate_max_page_number =
2525 crypt_data->rotate_state.max_offset;
2526 }
2527
2528 mutex_exit(&crypt_data->mutex);
2529
2530 if (srv_encrypt_tables || crypt_data->min_key_version) {
2531 status->current_key_version =
2532 fil_crypt_get_latest_key_version(crypt_data);
2533 }
2534 }
2535 }
2536
2537 /*********************************************************************
2538 Return crypt statistics
2539 @param[out] stat Crypt statistics */
2540 UNIV_INTERN
2541 void
fil_crypt_total_stat(fil_crypt_stat_t * stat)2542 fil_crypt_total_stat(
2543 fil_crypt_stat_t *stat)
2544 {
2545 mutex_enter(&crypt_stat_mutex);
2546 *stat = crypt_stat;
2547 mutex_exit(&crypt_stat_mutex);
2548 }
2549
2550 #endif /* UNIV_INNOCHECKSUM */
2551
2552 /**
2553 Verify that post encryption checksum match calculated checksum.
2554 This function should be called only if tablespace contains crypt_data
2555 metadata (this is strong indication that tablespace is encrypted).
2556 Function also verifies that traditional checksum does not match
2557 calculated checksum as if it does page could be valid unencrypted,
2558 encrypted, or corrupted.
2559
2560 @param[in,out] page page frame (checksum is temporarily modified)
2561 @param[in] zip_size ROW_FORMAT=COMPRESSED page size, or 0
2562 @return true if page is encrypted AND OK, false otherwise */
fil_space_verify_crypt_checksum(const byte * page,ulint zip_size)2563 bool fil_space_verify_crypt_checksum(const byte* page, ulint zip_size)
2564 {
2565 if (ENCRYPTION_KEY_NOT_ENCRYPTED == mach_read_from_4(
2566 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)) {
2567 return false;
2568 }
2569
2570 /* Compressed and encrypted pages do not have checksum. Assume not
2571 corrupted. Page verification happens after decompression in
2572 buf_page_read_complete() using buf_page_is_corrupted(). */
2573 if (fil_page_get_type(page) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
2574 return true;
2575 }
2576
2577 /* Read stored post encryption checksum. */
2578 const ib_uint32_t checksum = mach_read_from_4(
2579 page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + 4);
2580
2581 /* If stored checksum matches one of the calculated checksums
2582 page is not corrupted. */
2583
2584 switch (srv_checksum_algorithm_t(srv_checksum_algorithm)) {
2585 case SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32:
2586 case SRV_CHECKSUM_ALGORITHM_STRICT_CRC32:
2587 if (zip_size) {
2588 return checksum == page_zip_calc_checksum(
2589 page, zip_size, SRV_CHECKSUM_ALGORITHM_CRC32);
2590 }
2591
2592 return checksum == buf_calc_page_crc32(page);
2593 case SRV_CHECKSUM_ALGORITHM_STRICT_NONE:
2594 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2595 due to MDEV-12114, fil_crypt_calculate_checksum()
2596 is only using CRC32 for the encrypted pages.
2597 Due to this, we must treat "strict_none" as "none". */
2598 case SRV_CHECKSUM_ALGORITHM_NONE:
2599 return true;
2600 case SRV_CHECKSUM_ALGORITHM_STRICT_INNODB:
2601 /* Starting with MariaDB 10.1.25, 10.2.7, 10.3.1,
2602 due to MDEV-12114, fil_crypt_calculate_checksum()
2603 is only using CRC32 for the encrypted pages.
2604 Due to this, we must treat "strict_innodb" as "innodb". */
2605 case SRV_CHECKSUM_ALGORITHM_INNODB:
2606 case SRV_CHECKSUM_ALGORITHM_CRC32:
2607 case SRV_CHECKSUM_ALGORITHM_FULL_CRC32:
2608 if (checksum == BUF_NO_CHECKSUM_MAGIC) {
2609 return true;
2610 }
2611 if (zip_size) {
2612 return checksum == page_zip_calc_checksum(
2613 page, zip_size,
2614 SRV_CHECKSUM_ALGORITHM_CRC32)
2615 || checksum == page_zip_calc_checksum(
2616 page, zip_size,
2617 SRV_CHECKSUM_ALGORITHM_INNODB);
2618 }
2619
2620 return checksum == buf_calc_page_crc32(page)
2621 || checksum == buf_calc_page_new_checksum(page);
2622 }
2623
2624 ut_ad("unhandled innodb_checksum_algorithm" == 0);
2625 return false;
2626 }
2627