1 /*****************************************************************************
2
3 Copyright (c) 2015, 2020, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 #include <sys/types.h>
28
29 #include "btr0pcur.h"
30 #include "fil0fil.h"
31 #include "lob0first.h"
32 #include "lob0inf.h"
33 #include "lob0lob.h"
34 #include "lob0zip.h"
35 #include "row0upd.h"
36 #include "zlob0first.h"
37
38 #include "my_dbug.h"
39
40 namespace lob {
41
42 /** A BLOB field reference has all the bits set to zero, except the "being
43 * modified" bit. */
44 const byte field_ref_almost_zero[FIELD_REF_SIZE] = {
45 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x20, 0, 0, 0, 0, 0, 0, 0,
46 };
47
48 #ifdef UNIV_DEBUG
assert_read_uncommitted() const49 bool ReadContext::assert_read_uncommitted() const {
50 ut_ad(m_trx == nullptr || m_trx->is_read_uncommitted());
51 return (true);
52 }
53 #endif /* UNIV_DEBUG */
54
55 /** Gets the offset of the pointer to the externally stored part of a field.
56 @param[in] offsets array returned by rec_get_offsets()
57 @param[in] n index of the external field
58 @return offset of the pointer to the externally stored part */
btr_rec_get_field_ref_offs(const ulint * offsets,ulint n)59 ulint btr_rec_get_field_ref_offs(const ulint *offsets, ulint n) {
60 ulint field_ref_offs;
61 ulint local_len;
62
63 ut_a(rec_offs_nth_extern(offsets, n));
64 field_ref_offs = rec_get_nth_field_offs(offsets, n, &local_len);
65 ut_a(rec_field_not_null_not_add_col_def(local_len));
66 ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
67
68 return (field_ref_offs + local_len - BTR_EXTERN_FIELD_REF_SIZE);
69 }
70
71 /** Marks non-updated off-page fields as disowned by this record.
72 The ownership must be transferred to the updated record which is
73 inserted elsewhere in the index tree. In purge only the owner of
74 externally stored field is allowed to free the field.
75 @param[in] update update vector. */
disown_inherited_fields(const upd_t * update)76 void BtrContext::disown_inherited_fields(const upd_t *update) {
77 ut_ad(rec_offs_validate());
78 ut_ad(!rec_offs_comp(m_offsets) || !rec_get_node_ptr_flag(m_rec));
79 ut_ad(rec_offs_any_extern(m_offsets));
80 ut_ad(m_mtr);
81
82 for (ulint i = 0; i < rec_offs_n_fields(m_offsets); i++) {
83 if (rec_offs_nth_extern(m_offsets, i) &&
84 !upd_get_field_by_field_no(update, i, false)) {
85 set_ownership_of_extern_field(i, FALSE);
86 }
87 }
88 }
89
90 /** When bulk load is being done, check if there is enough space in redo
91 log file. */
check_redolog_bulk()92 void BtrContext::check_redolog_bulk() {
93 ut_ad(is_bulk());
94
95 FlushObserver *observer = m_mtr->get_flush_observer();
96
97 rec_block_fix();
98
99 commit_btr_mtr();
100
101 DEBUG_SYNC_C("blob_write_middle");
102
103 log_free_check();
104
105 start_btr_mtr();
106 m_mtr->set_flush_observer(observer);
107
108 rec_block_unfix();
109 ut_ad(validate());
110 }
111
112 /** Check if there is enough space in log file. Commit and re-start the
113 mini transaction. */
check_redolog_normal()114 void BtrContext::check_redolog_normal() {
115 ut_ad(!is_bulk());
116
117 FlushObserver *observer = m_mtr->get_flush_observer();
118 store_position();
119
120 commit_btr_mtr();
121
122 DEBUG_SYNC_C("blob_write_middle");
123
124 log_free_check();
125
126 DEBUG_SYNC_C("blob_write_middle_after_check");
127
128 start_btr_mtr();
129
130 m_mtr->set_flush_observer(observer);
131
132 restore_position();
133
134 ut_ad(validate());
135 }
136
restart_mtr_normal()137 void BtrContext::restart_mtr_normal() {
138 ut_ad(!is_bulk());
139 FlushObserver *observer = m_mtr->get_flush_observer();
140
141 if (m_pcur != nullptr) {
142 store_position();
143 }
144
145 commit_btr_mtr();
146 start_btr_mtr();
147 m_mtr->set_flush_observer(observer);
148
149 if (m_pcur != nullptr) {
150 restore_position();
151 }
152
153 ut_ad(m_pcur == nullptr || validate());
154 }
155
restart_mtr_bulk()156 void BtrContext::restart_mtr_bulk() {
157 ut_ad(is_bulk());
158 FlushObserver *observer = m_mtr->get_flush_observer();
159 rec_block_fix();
160 commit_btr_mtr();
161 start_btr_mtr();
162 m_mtr->set_flush_observer(observer);
163 rec_block_unfix();
164 ut_ad(validate());
165 }
166
167 /** Print this blob directory into the given output stream.
168 @param[in] out the output stream.
169 @return the output stream. */
print(std::ostream & out) const170 std::ostream &blob_dir_t::print(std::ostream &out) const {
171 out << "[blob_dir_t: ";
172 for (const blob_page_info_t &info : m_pages) {
173 out << info;
174 }
175 out << "]";
176 return (out);
177 }
178
179 /** Print this blob_page_into_t object into the given output stream.
180 @param[in] out the output stream.
181 @return the output stream. */
print(std::ostream & out) const182 std::ostream &blob_page_info_t::print(std::ostream &out) const {
183 out << "[blob_page_info_t: m_page_no=" << m_page_no << ", m_bytes=" << m_bytes
184 << ", m_zbytes=" << m_zbytes << "]";
185 return (out);
186 }
187
188 /** Do setup of the zlib stream.
189 @return code returned by zlib. */
setup_zstream()190 int zReader::setup_zstream() {
191 const ulint local_prefix = m_rctx.m_local_len - BTR_EXTERN_FIELD_REF_SIZE;
192
193 m_stream.next_out = m_rctx.m_buf + local_prefix;
194 m_stream.avail_out = static_cast<uInt>(m_rctx.m_len - local_prefix);
195 m_stream.next_in = Z_NULL;
196 m_stream.avail_in = 0;
197
198 /* Zlib inflate needs 32 kilobytes for the default
199 window size, plus a few kilobytes for small objects. */
200 m_heap = mem_heap_create(40000);
201 page_zip_set_alloc(&m_stream, m_heap);
202
203 int err = inflateInit(&m_stream);
204 return (err);
205 }
206
207 /** Fetch the BLOB.
208 @return DB_SUCCESS on success, DB_FAIL on error. */
fetch()209 dberr_t zReader::fetch() {
210 DBUG_TRACE;
211
212 dberr_t err = DB_SUCCESS;
213
214 ut_ad(m_rctx.is_valid_blob());
215 ut_ad(assert_empty_local_prefix());
216
217 ut_d(m_page_type_ex =
218 m_rctx.is_sdi() ? FIL_PAGE_SDI_ZBLOB : FIL_PAGE_TYPE_ZBLOB);
219
220 setup_zstream();
221
222 m_remaining = m_rctx.m_blobref.length();
223
224 while (m_rctx.m_page_no != FIL_NULL) {
225 page_no_t curr_page_no = m_rctx.m_page_no;
226
227 err = fetch_page();
228 if (err != DB_SUCCESS) {
229 break;
230 }
231
232 m_stream.next_in = m_bpage->zip.data + m_rctx.m_offset;
233 m_stream.avail_in =
234 static_cast<uInt>(m_rctx.m_page_size.physical() - m_rctx.m_offset);
235
236 int zlib_err = inflate(&m_stream, Z_NO_FLUSH);
237 switch (zlib_err) {
238 case Z_OK:
239 if (m_stream.avail_out == 0) {
240 goto end_of_blob;
241 }
242 break;
243 case Z_STREAM_END:
244 if (m_rctx.m_page_no == FIL_NULL) {
245 goto end_of_blob;
246 }
247 /* fall through */
248 default:
249 err = DB_FAIL;
250 ib::error(ER_IB_MSG_630)
251 << "inflate() of compressed BLOB page "
252 << page_id_t(m_rctx.m_space_id, curr_page_no) << " returned "
253 << zlib_err << " (" << m_stream.msg << ")";
254 /* fall through */
255 ut_error;
256 case Z_BUF_ERROR:
257 goto end_of_blob;
258 }
259
260 buf_page_release_zip(m_bpage);
261
262 m_rctx.m_offset = FIL_PAGE_NEXT;
263
264 ut_d(if (!m_rctx.m_is_sdi) m_page_type_ex = FIL_PAGE_TYPE_ZBLOB2);
265 }
266
267 end_of_blob:
268 buf_page_release_zip(m_bpage);
269 inflateEnd(&m_stream);
270 mem_heap_free(m_heap);
271 UNIV_MEM_ASSERT_RW(m_rctx.m_buf, m_stream.total_out);
272 return err;
273 }
274
275 #ifdef UNIV_DEBUG
276 /** Assert that the local prefix is empty. For compressed row format,
277 there is no local prefix stored. This function doesn't return if the
278 local prefix is non-empty.
279 @return true if local prefix is empty*/
assert_empty_local_prefix()280 bool zReader::assert_empty_local_prefix() {
281 ut_ad(m_rctx.m_local_len == BTR_EXTERN_FIELD_REF_SIZE);
282 return (true);
283 }
284 #endif /* UNIV_DEBUG */
285
fetch_page()286 dberr_t zReader::fetch_page() {
287 dberr_t err(DB_SUCCESS);
288
289 m_bpage = buf_page_get_zip(page_id_t(m_rctx.m_space_id, m_rctx.m_page_no),
290 m_rctx.m_page_size);
291
292 ut_a(m_bpage != nullptr);
293 ut_ad(fil_page_get_type(m_bpage->zip.data) == m_page_type_ex);
294 m_rctx.m_page_no = mach_read_from_4(m_bpage->zip.data + FIL_PAGE_NEXT);
295
296 if (m_rctx.m_offset == FIL_PAGE_NEXT) {
297 /* When the BLOB begins at page header,
298 the compressed data payload does not
299 immediately follow the next page pointer. */
300 m_rctx.m_offset = FIL_PAGE_DATA;
301 } else {
302 m_rctx.m_offset += 4;
303 }
304
305 return (err);
306 }
307
308 /** This is used to take action when we enter and exit a scope. When we enter
309 the scope the constructor will set the "being modified" bit in the lob reference
310 objects that are either being inserted or updated. When we exit the scope the
311 destructor will clear the "being modified" bit in the lob reference objects. */
312 struct Being_modified {
313 /** Constructor. Set the "being modified" bit in LOB references.
314 @param[in] ctx the B-tree context for LOB operation.
315 @param[in] big_rec_vec the LOB vector
316 @param[in] pcur persistent cursor
317 @param[in] offsets the record offsets
318 @param[in] op the operation code
319 @param[in] mtr the mini-transaction context. */
Being_modifiedlob::Being_modified320 Being_modified(BtrContext &ctx, const big_rec_t *big_rec_vec,
321 btr_pcur_t *pcur, ulint *offsets, opcode op, mtr_t *mtr)
322 : m_btr_ctx(ctx),
323 m_big_rec_vec(big_rec_vec),
324 m_pcur(pcur),
325 m_offsets(offsets),
326 m_op(op),
327 m_mtr(mtr) {
328 /* All pointers to externally stored columns in the record
329 must either be zero or they must be pointers to inherited
330 columns, owned by this record or an earlier record version. */
331 rec_t *rec = btr_pcur_get_rec(m_pcur);
332 dict_index_t *index = m_pcur->index();
333 #ifdef UNIV_DEBUG
334 rec_offs_make_valid(rec, index, m_offsets);
335 #endif /* UNIV_DEBUG */
336 for (uint i = 0; i < m_big_rec_vec->n_fields; i++) {
337 ulint field_no = m_big_rec_vec->fields[i].field_no;
338 byte *field_ref = btr_rec_get_field_ref(rec, m_offsets, field_no);
339 ref_t blobref(field_ref);
340
341 ut_ad(!blobref.is_being_modified());
342
343 /* Before we release latches in a subsequent ctx.check_redolog() call,
344 mark the blobs as being modified. This is needed to ensure that READ
345 UNCOMMITTED transactions don't read an inconsistent BLOB. */
346 if (index->is_compressed()) {
347 blobref.set_being_modified(true, nullptr);
348
349 if (m_op == OPCODE_INSERT_UPDATE) {
350 /* Inserting by updating a del-marked record. */
351 blobref.set_page_no(FIL_NULL, nullptr);
352 }
353
354 if (!m_btr_ctx.is_bulk()) {
355 buf_block_t *rec_block = btr_pcur_get_block(m_pcur);
356 page_zip_des_t *page_zip = buf_block_get_page_zip(rec_block);
357 page_zip_write_blob_ptr(page_zip, rec, index, m_offsets, field_no,
358 m_mtr);
359 }
360 } else {
361 blobref.set_being_modified(true, m_mtr);
362 }
363
364 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
365
366 /* Make a in-memory copy of the LOB ref. */
367 ref_mem_t ref_mem;
368 blobref.parse(ref_mem);
369
370 ut_a(blobref.is_owner());
371 /* Either this must be an update in place,
372 or the BLOB must be inherited, or the BLOB pointer
373 must be zero (will be written in this function). */
374 ut_a(m_op == OPCODE_UPDATE || m_op == OPCODE_INSERT_UPDATE ||
375 blobref.is_inherited() || blobref.is_null_relaxed());
376 ut_ad(blobref.is_being_modified());
377
378 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
379 }
380 }
381
382 /** Destructor. Clear the "being modified" bit in LOB references. */
~Being_modifiedlob::Being_modified383 ~Being_modified() {
384 rec_t *rec = btr_pcur_get_rec(m_pcur);
385 dict_index_t *index = m_pcur->index();
386 #ifdef UNIV_DEBUG
387 rec_offs_make_valid(rec, index, m_offsets);
388 #endif /* UNIV_DEBUG */
389 for (uint i = 0; i < m_big_rec_vec->n_fields; i++) {
390 ulint field_no = m_big_rec_vec->fields[i].field_no;
391 byte *field_ref = btr_rec_get_field_ref(rec, m_offsets, field_no);
392 ref_t blobref(field_ref);
393
394 if (index->is_compressed()) {
395 blobref.set_being_modified(false, nullptr);
396 if (!m_btr_ctx.is_bulk()) {
397 buf_block_t *rec_block = btr_pcur_get_block(m_pcur);
398 page_zip_des_t *page_zip = buf_block_get_page_zip(rec_block);
399 page_zip_write_blob_ptr(page_zip, rec, index, m_offsets, field_no,
400 m_mtr);
401 }
402 } else {
403 blobref.set_being_modified(false, m_mtr);
404 }
405 }
406 }
407
408 BtrContext &m_btr_ctx;
409 const big_rec_t *m_big_rec_vec;
410 btr_pcur_t *m_pcur;
411 ulint *m_offsets;
412 opcode m_op;
413 mtr_t *m_mtr;
414 };
415
416 /** Stores the fields in big_rec_vec to the tablespace and puts pointers to
417 them in rec. The extern flags in rec will have to be set beforehand. The
418 fields are stored on pages allocated from leaf node file segment of the index
419 tree.
420
421 TODO: If the allocation extends the tablespace, it will not be redo logged, in
422 any mini-transaction. Tablespace extension should be redo-logged, so that
423 recovery will not fail when the big_rec was written to the extended portion of
424 the file, in case the file was somehow truncated in the crash.
425 @param[in] trx the trx doing LOB store. If unavailable it
426 could be nullptr.
427 @param[in,out] pcur a persistent cursor. if btr_mtr is restarted,
428 then this can be repositioned.
429 @param[in] upd update vector
430 @param[in,out] offsets rec_get_offsets() on pcur. the "external in
431 offsets will correctly correspond storage"
432 flagsin offsets will correctly correspond to
433 rec when this function returns
434 @param[in] big_rec_vec vector containing fields to be stored
435 externally
436 @param[in,out] btr_mtr mtr containing the latches to the clustered
437 index. can be committed and restarted.
438 @param[in] op operation code
439 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
btr_store_big_rec_extern_fields(trx_t * trx,btr_pcur_t * pcur,const upd_t * upd,ulint * offsets,const big_rec_t * big_rec_vec,mtr_t * btr_mtr,opcode op)440 dberr_t btr_store_big_rec_extern_fields(trx_t *trx, btr_pcur_t *pcur,
441 const upd_t *upd, ulint *offsets,
442 const big_rec_t *big_rec_vec,
443 mtr_t *btr_mtr, opcode op) {
444 mtr_t mtr;
445 mtr_t mtr_bulk;
446 page_zip_des_t *page_zip;
447 dberr_t error = DB_SUCCESS;
448 dict_index_t *index = pcur->index();
449 dict_table_t *table = index->table;
450 buf_block_t *rec_block = btr_pcur_get_block(pcur);
451 rec_t *rec = btr_pcur_get_rec(pcur);
452
453 ut_ad(rec_offs_validate(rec, index, offsets));
454 ut_ad(rec_offs_any_extern(offsets));
455 ut_ad(btr_mtr);
456 ut_ad(mtr_memo_contains_flagged(btr_mtr, dict_index_get_lock(index),
457 MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
458 index->table->is_intrinsic() || !index->is_committed());
459 ut_ad(
460 mtr_is_block_fix(btr_mtr, rec_block, MTR_MEMO_PAGE_X_FIX, index->table));
461 ut_ad(buf_block_get_frame(rec_block) == page_align(rec));
462 ut_a(index->is_clustered());
463
464 ut_a(dict_table_page_size(table).equals_to(rec_block->page.size));
465
466 /* Create a blob operation context. */
467 BtrContext btr_ctx(btr_mtr, pcur, index, rec, offsets, rec_block, op);
468 InsertContext ctx(btr_ctx, big_rec_vec);
469
470 Being_modified bm(btr_ctx, big_rec_vec, pcur, offsets, op, btr_mtr);
471
472 /* The pcur could be re-positioned. Commit and restart btr_mtr. */
473 ctx.check_redolog();
474 rec_block = btr_pcur_get_block(pcur);
475 rec = btr_pcur_get_rec(pcur);
476
477 page_zip = buf_block_get_page_zip(rec_block);
478 ut_a(fil_page_index_page_check(page_align(rec)) || op == OPCODE_INSERT_BULK);
479
480 if (page_zip != nullptr) {
481 DBUG_EXECUTE_IF("lob_insert_single_zstream",
482 { goto insert_single_zstream; });
483
484 if (dict_index_is_sdi(index)) {
485 goto insert_single_zstream;
486 }
487
488 } else {
489 /* Uncompressed LOB */
490
491 DBUG_EXECUTE_IF("lob_insert_noindex", { goto insert_noindex; });
492
493 if (dict_index_is_sdi(index)) {
494 goto insert_noindex;
495 }
496 }
497
498 for (uint i = 0; i < big_rec_vec->n_fields; i++) {
499 ulint field_no = big_rec_vec->fields[i].field_no;
500
501 /* Cursor could have changed position. */
502 rec = btr_pcur_get_rec(pcur);
503 rec_offs_make_valid(rec, index, offsets);
504 ut_ad(rec_offs_validate(rec, index, offsets));
505
506 byte *field_ref = btr_rec_get_field_ref(rec, offsets, field_no);
507
508 ref_t blobref(field_ref);
509 ut_ad(blobref.validate(btr_mtr));
510
511 bool can_do_partial_update = false;
512
513 if (op == lob::OPCODE_UPDATE && upd != nullptr &&
514 big_rec_vec->fields[i].ext_in_old) {
515 can_do_partial_update = blobref.is_lob_partially_updatable(index);
516 }
517
518 if (page_zip != nullptr) {
519 bool do_insert = true;
520
521 if (op == lob::OPCODE_UPDATE && upd != nullptr &&
522 blobref.is_big(rec_block->page.size) && can_do_partial_update) {
523 if (upd->is_partially_updated(field_no)) {
524 /* Do partial update. */
525 error = lob::z_update(ctx, trx, index, upd, field_no, blobref);
526 switch (error) {
527 case DB_SUCCESS:
528 do_insert = false;
529 break;
530 case DB_FAIL:
531 break;
532 default:
533 ut_error;
534 }
535 } else {
536 /* This is to inform the purge thread that
537 the older version LOB in this update operation
538 can be freed. */
539 blobref.mark_not_partially_updatable(trx, btr_mtr, index,
540 dict_table_page_size(table));
541 }
542 }
543
544 if (do_insert) {
545 const ulint lob_len = big_rec_vec->fields[i].len;
546 if (ref_t::use_single_z_stream(lob_len)) {
547 zInserter zblob_writer(&ctx);
548 error = zblob_writer.prepare();
549 if (error == DB_SUCCESS) {
550 zblob_writer.write_one_blob(i);
551 error = zblob_writer.finish();
552 }
553 } else {
554 error = lob::z_insert(&ctx, trx, blobref, &big_rec_vec->fields[i], i);
555 }
556
557 if (op == lob::OPCODE_UPDATE && upd != nullptr) {
558 /* Get the corresponding upd_field_t
559 object.*/
560 upd_field_t *uf = upd->get_field_by_field_no(field_no, index);
561
562 if (uf != nullptr) {
563 /* Update the LOB reference
564 stored in upd_field_t */
565 dfield_t *new_val = &uf->new_val;
566
567 if (dfield_is_ext(new_val)) {
568 byte *field_ref = new_val->blobref();
569 blobref.copy(field_ref);
570 ref_t::set_being_modified(field_ref, false, nullptr);
571 }
572 }
573 }
574 }
575
576 } else {
577 /* Uncompressed LOB */
578 bool do_insert = true;
579
580 if (op == lob::OPCODE_UPDATE && upd != nullptr &&
581 blobref.is_big(rec_block->page.size) && can_do_partial_update) {
582 if (upd->is_partially_updated(field_no)) {
583 /* Do partial update. */
584 error = lob::update(ctx, trx, index, upd, field_no, blobref);
585 switch (error) {
586 case DB_SUCCESS:
587 do_insert = false;
588 break;
589 case DB_FAIL:
590 break;
591 case DB_OUT_OF_FILE_SPACE:
592 break;
593 default:
594 ut_error;
595 }
596
597 } else {
598 /* This is to inform the purge thread that
599 the older version LOB in this update operation
600 can be freed. */
601 blobref.mark_not_partially_updatable(trx, btr_mtr, index,
602 dict_table_page_size(table));
603 }
604 }
605
606 if (do_insert) {
607 error = lob::insert(&ctx, trx, blobref, &big_rec_vec->fields[i], i);
608
609 if (op == lob::OPCODE_UPDATE && upd != nullptr) {
610 /* Get the corresponding upd_field_t
611 object.*/
612 upd_field_t *uf = upd->get_field_by_field_no(field_no, index);
613
614 if (uf != nullptr) {
615 /* Update the LOB reference
616 stored in upd_field_t */
617 dfield_t *new_val = &uf->new_val;
618 if (dfield_is_ext(new_val)) {
619 byte *field_ref = new_val->blobref();
620 blobref.copy(field_ref);
621 ref_t::set_being_modified(field_ref, false, nullptr);
622 }
623 }
624 }
625 }
626 }
627
628 if (error != DB_SUCCESS) {
629 break;
630 }
631
632 #ifdef UNIV_DEBUG
633 /* Ensure that the LOB references are valid now. */
634 rec = btr_pcur_get_rec(pcur);
635 rec_offs_make_valid(rec, index, offsets);
636 field_ref =
637 btr_rec_get_field_ref(rec, offsets, big_rec_vec->fields[i].field_no);
638 ref_t lobref(field_ref);
639
640 ut_ad(!lobref.is_null());
641 #endif /* UNIV_DEBUG */
642 }
643 return (error);
644
645 {
646 insert_single_zstream:
647 /* Insert the LOB as a single zlib stream spanning multiple
648 LOB pages. This is the old way of storing LOBs. */
649 zInserter zblob_writer(&ctx);
650 error = zblob_writer.prepare();
651 if (error == DB_SUCCESS) {
652 zblob_writer.write();
653 error = zblob_writer.finish();
654 }
655 return (error);
656 }
657 {
658 insert_noindex:
659 /* Insert the uncompressed LOB without LOB index. */
660 Inserter blob_writer(&ctx);
661 error = blob_writer.write();
662 return (error);
663 }
664 }
665
666 /** Copies an externally stored field of a record to mem heap.
667 @param[in] rec record in a clustered index; must be
668 protected by a lock or a page latch
669 @param[in] offsets array returned by rec_get_offsets()
670 @param[in] page_size BLOB page size
671 @param[in] no field number
672 @param[out] len length of the field */
673 #ifdef UNIV_DEBUG
674 /**
675 @param[in] is_sdi true for SDI Indexes */
676 #endif /* UNIV_DEBUG */
677 /**
678 @param[in,out] heap mem heap
679 @return the field copied to heap, or NULL if the field is incomplete */
btr_rec_copy_externally_stored_field_func(trx_t * trx,const dict_index_t * index,const rec_t * rec,const ulint * offsets,const page_size_t & page_size,ulint no,ulint * len,size_t * lob_version,bool is_sdi,mem_heap_t * heap)680 byte *btr_rec_copy_externally_stored_field_func(
681 trx_t *trx, const dict_index_t *index, const rec_t *rec,
682 const ulint *offsets, const page_size_t &page_size, ulint no, ulint *len,
683 size_t *lob_version,
684 #ifdef UNIV_DEBUG
685 bool is_sdi,
686 #endif /* UNIV_DEBUG */
687 mem_heap_t *heap) {
688
689 ulint local_len;
690 const byte *data;
691
692 ut_a(rec_offs_nth_extern(offsets, no));
693
694 /* An externally stored field can contain some initial
695 data from the field, and in the last 20 bytes it has the
696 space id, page number, and offset where the rest of the
697 field data is stored, and the data length in addition to
698 the data stored locally. We may need to store some data
699 locally to get the local record length above the 128 byte
700 limit so that field offsets are stored in two bytes, and
701 the extern bit is available in those two bytes. */
702
703 data = rec_get_nth_field(rec, offsets, no, &local_len);
704 const byte *field_ref = data + local_len - BTR_EXTERN_FIELD_REF_SIZE;
705
706 lob::ref_t ref(const_cast<byte *>(field_ref));
707
708 ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
709
710 #ifdef UNIV_DEBUG
711 /* Verify if the LOB reference is sane. */
712 space_id_t space_id = ref.space_id();
713 ut_ad(space_id == 0 || space_id == index->space);
714 #endif /* UNIV_DEBUG */
715
716 if (ref.is_null()) {
717 /* The externally stored field was not written yet.
718 This record should only be seen by
719 trx_rollback_or_clean_all_recovered() or any
720 TRX_ISO_READ_UNCOMMITTED transactions. */
721
722 return (nullptr);
723 }
724
725 return (btr_copy_externally_stored_field(trx, index, len, lob_version, data,
726 page_size, local_len, is_sdi, heap));
727 }
728
729 /** Returns the page number where the next BLOB part is stored.
730 @param[in] blob_header the BLOB header.
731 @return page number or FIL_NULL if no more pages */
btr_blob_get_next_page_no(const byte * blob_header)732 static inline page_no_t btr_blob_get_next_page_no(const byte *blob_header) {
733 return (mach_read_from_4(blob_header + LOB_HDR_NEXT_PAGE_NO));
734 }
735
736 /** Check the FIL_PAGE_TYPE on an uncompressed BLOB page.
737 @param[in] space_id space identifier.
738 @param[in] page_no page number.
739 @param[in] page the page
740 @param[in] read TRUE=read, FALSE=purge */
btr_check_blob_fil_page_type(space_id_t space_id,page_no_t page_no,const page_t * page,ibool read)741 static void btr_check_blob_fil_page_type(space_id_t space_id, page_no_t page_no,
742 const page_t *page, ibool read) {
743 ulint type = fil_page_get_type(page);
744
745 ut_a(space_id == page_get_space_id(page));
746 ut_a(page_no == page_get_page_no(page));
747
748 switch (type) {
749 uint32_t flags;
750 case FIL_PAGE_TYPE_BLOB:
751 case FIL_PAGE_SDI_BLOB:
752 break;
753
754 default:
755 flags = fil_space_get_flags(space_id);
756 #ifndef UNIV_DEBUG /* Improve debug test coverage */
757 if (!DICT_TF_HAS_ATOMIC_BLOBS(flags)) {
758 /* Old versions of InnoDB did not initialize
759 FIL_PAGE_TYPE on BLOB pages. Do not print
760 anything about the type mismatch when reading
761 a BLOB page that may be from old versions. */
762 return;
763 }
764 #endif /* !UNIV_DEBUG */
765
766 ib::fatal(ER_IB_MSG_631)
767 << "FIL_PAGE_TYPE=" << type << " on BLOB "
768 << (read ? "read" : "purge") << " space " << space_id << " page "
769 << page_no << " flags " << flags;
770 }
771 }
772
773 /** Returns the length of a BLOB part stored on the header page.
774 @param[in] blob_header the BLOB header.
775 @return part length */
btr_blob_get_part_len(const byte * blob_header)776 static inline ulint btr_blob_get_part_len(const byte *blob_header) {
777 return (mach_read_from_4(blob_header + LOB_HDR_PART_LEN));
778 }
779
780 /** Fetch one BLOB page. */
fetch_page()781 void Reader::fetch_page() {
782 mtr_t mtr;
783
784 /* Bytes of LOB data available in the current LOB page. */
785 ulint part_len;
786
787 /* Bytes of LOB data obtained from the current LOB page. */
788 ulint copy_len;
789
790 ut_ad(m_rctx.m_page_no != FIL_NULL);
791 ut_ad(m_rctx.m_page_no > 0);
792
793 mtr_start(&mtr);
794
795 m_cur_block = buf_page_get(page_id_t(m_rctx.m_space_id, m_rctx.m_page_no),
796 m_rctx.m_page_size, RW_S_LATCH, &mtr);
797 buf_block_dbg_add_level(m_cur_block, SYNC_EXTERN_STORAGE);
798 page_t *page = buf_block_get_frame(m_cur_block);
799
800 btr_check_blob_fil_page_type(m_rctx.m_space_id, m_rctx.m_page_no, page, TRUE);
801
802 byte *blob_header = page + m_rctx.m_offset;
803 part_len = btr_blob_get_part_len(blob_header);
804 copy_len = ut_min(part_len, m_rctx.m_len - m_copied_len);
805
806 memcpy(m_rctx.m_buf + m_copied_len, blob_header + LOB_HDR_SIZE, copy_len);
807
808 m_copied_len += copy_len;
809 m_rctx.m_page_no = btr_blob_get_next_page_no(blob_header);
810 mtr_commit(&mtr);
811 m_rctx.m_offset = FIL_PAGE_DATA;
812 }
813
814 /** Fetch the complete or prefix of the uncompressed LOB data.
815 @return bytes of LOB data fetched. */
fetch()816 ulint Reader::fetch() {
817 if (m_rctx.m_blobref.is_null()) {
818 ut_ad(m_copied_len == 0);
819 return (m_copied_len);
820 }
821
822 while (m_copied_len < m_rctx.m_len) {
823 if (m_rctx.m_page_no == FIL_NULL) {
824 /* End of LOB has been reached. */
825 break;
826 }
827
828 fetch_page();
829 }
830
831 /* Assure that we have fetched the requested amount or the LOB
832 has ended. */
833 ut_ad(m_copied_len == m_rctx.m_len || m_rctx.m_page_no == FIL_NULL);
834
835 return (m_copied_len);
836 }
837
838 /** Copies the prefix of an externally stored field of a record.
839 The clustered index record must be protected by a lock or a page latch.
840 @param[in] index the clust index in which lob is read.
841 @param[out] buf the field, or a prefix of it
842 @param[in] len length of buf, in bytes
843 @param[in] page_size BLOB page size
844 @param[in] data 'internally' stored part of the field
845 containing also the reference to the external
846 part; must be protected by a lock or a page
847 latch. */
848 #ifdef UNIV_DEBUG
849 /**
850 @param[in] is_sdi true for SDI indexes */
851 #endif /* UNIV_DEBUG */
852 /**
853 @param[in] local_len length of data, in bytes
854 @return the length of the copied field, or 0 if the column was being
855 or has been deleted */
btr_copy_externally_stored_field_prefix_func(trx_t * trx,const dict_index_t * index,byte * buf,ulint len,const page_size_t & page_size,const byte * data,bool is_sdi,ulint local_len)856 ulint btr_copy_externally_stored_field_prefix_func(trx_t *trx,
857 const dict_index_t *index,
858 byte *buf, ulint len,
859 const page_size_t &page_size,
860 const byte *data,
861 #ifdef UNIV_DEBUG
862 bool is_sdi,
863 #endif /* UNIV_DEBUG */
864 ulint local_len) {
865 ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
866
867 if (page_size.is_compressed()) {
868 ut_a(local_len == BTR_EXTERN_FIELD_REF_SIZE);
869
870 ReadContext rctx(page_size, data, local_len, buf, len
871 #ifdef UNIV_DEBUG
872 ,
873 is_sdi
874 #endif /* UNIV_DEBUG */
875 );
876
877 rctx.m_index = const_cast<dict_index_t *>(index);
878 rctx.m_trx = trx;
879
880 /* Obtain length of LOB available in clustered index.*/
881 ulint avail_lob = rctx.m_blobref.length();
882
883 if (avail_lob == 0) {
884 /* No LOB data available. */
885 return (0);
886 }
887
888 /* Read the LOB data. */
889 ulint fetch_len = lob::z_read(&rctx, rctx.m_blobref, 0, len, buf);
890
891 /* Either fetch the requested length or fetch the complete
892 LOB. If complete LOB is fetched, then it means that requested
893 length is bigger than the available length. */
894 ut_a(fetch_len == 0 || fetch_len == len ||
895 (fetch_len == avail_lob && avail_lob < len));
896
897 return (fetch_len);
898 }
899
900 local_len -= BTR_EXTERN_FIELD_REF_SIZE;
901
902 if (UNIV_UNLIKELY(local_len >= len)) {
903 memcpy(buf, data, len);
904 return (len);
905 }
906
907 memcpy(buf, data, local_len);
908 data += local_len;
909
910 ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
911
912 if (!mach_read_from_4(data + BTR_EXTERN_LEN + 4)) {
913 /* The externally stored part of the column has been
914 (partially) deleted. Signal the half-deleted BLOB
915 to the caller. */
916
917 return (0);
918 }
919
920 ReadContext rctx(page_size, data, local_len + BTR_EXTERN_FIELD_REF_SIZE,
921 buf + local_len, len
922 #ifdef UNIV_DEBUG
923 ,
924 false
925 #endif /* UNIV_DEBUG */
926 );
927
928 rctx.m_index = (dict_index_t *)index;
929 rctx.m_trx = trx;
930
931 ulint fetch_len = lob::read(&rctx, rctx.m_blobref, 0, len, buf + local_len);
932 return (local_len + fetch_len);
933 }
934
935 /** Copies an externally stored field of a record to mem heap.
936 The clustered index record must be protected by a lock or a page latch.
937 @param[in] trx the current trx object or nullptr
938 @param[in] index the clust index in which lob is read.
939 @param[out] len length of the whole field
940 @param[out] lob_version LOB version number.
941 @param[in] data 'internally' stored part of the field
942 containing also the reference to the external
943 part; must be protected by a lock or a page
944 latch.
945 @param[in] page_size BLOB page size
946 @param[in] local_len length of data */
947 #ifdef UNIV_DEBUG
948 /**
949 @param[in] is_sdi true for SDI Indexes */
950 #endif /* UNIV_DEBUG */
951 /**
952 @param[in,out] heap mem heap
953 @return the whole field copied to heap */
btr_copy_externally_stored_field_func(trx_t * trx,const dict_index_t * index,ulint * len,size_t * lob_version,const byte * data,const page_size_t & page_size,ulint local_len,bool is_sdi,mem_heap_t * heap)954 byte *btr_copy_externally_stored_field_func(
955 trx_t *trx, const dict_index_t *index, ulint *len, size_t *lob_version,
956 const byte *data, const page_size_t &page_size, ulint local_len,
957 #ifdef UNIV_DEBUG
958 bool is_sdi,
959 #endif /* UNIV_DEBUG */
960 mem_heap_t *heap) {
961 uint32_t extern_len;
962 byte *buf;
963
964 ut_a(index->is_clustered());
965
966 ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
967
968 local_len -= BTR_EXTERN_FIELD_REF_SIZE;
969
970 /* Currently a BLOB cannot be bigger than 4 GB; we
971 leave the 4 upper bytes in the length field unused */
972
973 const byte *field_ref = data + local_len;
974
975 extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
976
977 buf = (byte *)mem_heap_alloc(heap, local_len + extern_len);
978
979 ReadContext rctx(page_size, data, local_len + BTR_EXTERN_FIELD_REF_SIZE,
980 buf + local_len, extern_len
981 #ifdef UNIV_DEBUG
982 ,
983 is_sdi
984 #endif /* UNIV_DEBUG */
985 );
986
987 rctx.m_index = (dict_index_t *)index;
988
989 if (ref_t::is_being_modified(field_ref)) {
990 #ifdef UNIV_DEBUG
991 /* Check the sanity of the LOB reference. */
992 if (ref_t::is_null_relaxed(field_ref) ||
993 ref_t::space_id(field_ref) == index->space) {
994 /* Valid scenario. Do nothing. */
995 } else {
996 bool lob_ref_is_corrupt = false;
997 ut_ad(lob_ref_is_corrupt);
998 }
999 #endif /* UNIV_DEBUG */
1000
1001 /* This is applicable only for READ UNCOMMITTED transactions because they
1002 don't take transaction locks. */
1003 ut_ad(trx == nullptr || trx->is_read_uncommitted());
1004
1005 *len = 0;
1006 return (buf);
1007 }
1008
1009 if (extern_len == 0) {
1010 /* The lob has already been purged. */
1011 ut_ad(ref_t::page_no(field_ref) == FIL_NULL);
1012 *len = 0;
1013 return (buf);
1014 }
1015
1016 if (page_size.is_compressed()) {
1017 ut_ad(local_len == 0);
1018 *len = 0;
1019
1020 if (extern_len > 0) {
1021 *len = lob::z_read(&rctx, rctx.m_blobref, 0, extern_len, buf + local_len);
1022 }
1023
1024 return (buf);
1025 } else {
1026 if (local_len > 0) {
1027 memcpy(buf, data, local_len);
1028 }
1029
1030 ulint fetch_len =
1031 lob::read(&rctx, rctx.m_blobref, 0, extern_len, buf + local_len);
1032
1033 *len = local_len + fetch_len;
1034
1035 if (lob_version != nullptr) {
1036 *lob_version = rctx.m_lob_version;
1037 }
1038
1039 return (buf);
1040 }
1041 }
1042
1043 /** Frees the externally stored fields for a record, if the field
1044 is mentioned in the update vector.
1045 @param[in] trx_id the transaction identifier.
1046 @param[in] undo_no undo number within a transaction whose
1047 LOB is being freed.
1048 @param[in] update update vector
1049 @param[in] rollback performing rollback? */
free_updated_extern_fields(trx_id_t trx_id,undo_no_t undo_no,const upd_t * update,bool rollback)1050 void BtrContext::free_updated_extern_fields(trx_id_t trx_id, undo_no_t undo_no,
1051 const upd_t *update,
1052 bool rollback) {
1053 ulint n_fields;
1054 ulint i;
1055 ut_ad(rollback);
1056
1057 ut_ad(rec_offs_validate());
1058 ut_ad(mtr_is_page_fix(m_mtr, m_rec, MTR_MEMO_PAGE_X_FIX, m_index->table));
1059 /* Assert that the cursor position and the record are matching. */
1060 ut_ad(!need_recalc());
1061
1062 /* Free possible externally stored fields in the record */
1063
1064 n_fields = upd_get_n_fields(update);
1065
1066 for (i = 0; i < n_fields; i++) {
1067 const upd_field_t *ufield = upd_get_nth_field(update, i);
1068
1069 if (rec_offs_nth_extern(m_offsets, ufield->field_no)) {
1070 ulint len;
1071 byte *data = rec_get_nth_field(m_rec, m_offsets, ufield->field_no, &len);
1072 ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
1073
1074 byte *field_ref = data + len - BTR_EXTERN_FIELD_REF_SIZE;
1075
1076 DeleteContext ctx(*this, field_ref, ufield->field_no, rollback);
1077 lob::purge(&ctx, m_index, trx_id, undo_no, 0, ufield);
1078 if (need_recalc()) {
1079 recalc();
1080 }
1081 }
1082 }
1083 }
1084
1085 /** Deallocate a buffer block that was reserved for a BLOB part.
1086 @param[in] index index
1087 @param[in] block buffer block
1088 @param[in] all flag whether remove the compressed page
1089 if there is one
1090 @param[in] mtr mini-transaction to commit */
blob_free(dict_index_t * index,buf_block_t * block,bool all,mtr_t * mtr)1091 void blob_free(dict_index_t *index, buf_block_t *block, bool all, mtr_t *mtr) {
1092 buf_pool_t *buf_pool = buf_pool_from_block(block);
1093 page_id_t page_id(block->page.id.space(), block->page.id.page_no());
1094 bool freed = false;
1095
1096 ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1097
1098 mtr_commit(mtr);
1099
1100 mutex_enter(&buf_pool->LRU_list_mutex);
1101 buf_page_mutex_enter(block);
1102
1103 /* Only free the block if it is still allocated to
1104 the same file page. */
1105
1106 if (buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE &&
1107 page_id == block->page.id) {
1108 freed = buf_LRU_free_page(&block->page, all);
1109
1110 if (!freed && all && block->page.zip.data &&
1111 buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE &&
1112 page_id == block->page.id) {
1113 /* Attempt to deallocate the uncompressed page
1114 if the whole block cannot be deallocted. */
1115
1116 freed = buf_LRU_free_page(&block->page, false);
1117 }
1118 }
1119
1120 if (!freed) {
1121 mutex_exit(&buf_pool->LRU_list_mutex);
1122 buf_page_mutex_exit(block);
1123 }
1124 }
1125
1126 /** Gets the externally stored size of a record, in units of a database page.
1127 @param[in] rec record
1128 @param[in] offsets array returned by rec_get_offsets()
1129 @return externally stored part, in units of a database page */
btr_rec_get_externally_stored_len(const rec_t * rec,const ulint * offsets)1130 ulint btr_rec_get_externally_stored_len(const rec_t *rec,
1131 const ulint *offsets) {
1132 ulint n_fields;
1133 ulint total_extern_len = 0;
1134 ulint i;
1135
1136 ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
1137
1138 if (!rec_offs_any_extern(offsets)) {
1139 return (0);
1140 }
1141
1142 n_fields = rec_offs_n_fields(offsets);
1143
1144 for (i = 0; i < n_fields; i++) {
1145 if (rec_offs_nth_extern(offsets, i)) {
1146 ulint extern_len = mach_read_from_4(
1147 btr_rec_get_field_ref(rec, offsets, i) + BTR_EXTERN_LEN + 4);
1148
1149 total_extern_len += ut_calc_align(extern_len, UNIV_PAGE_SIZE);
1150 }
1151 }
1152
1153 return (total_extern_len / UNIV_PAGE_SIZE);
1154 }
1155
1156 /** Frees the externally stored fields for a record.
1157 @param[in] trx_id transaction identifier whose LOB is
1158 being freed.
1159 @param[in] undo_no undo number within a transaction whose
1160 LOB is being freed.
1161 @param[in] rollback performing rollback?
1162 @param[in] rec_type undo record type.*/
free_externally_stored_fields(trx_id_t trx_id,undo_no_t undo_no,bool rollback,ulint rec_type)1163 void BtrContext::free_externally_stored_fields(trx_id_t trx_id,
1164 undo_no_t undo_no, bool rollback,
1165 ulint rec_type) {
1166 ut_ad(rec_offs_validate());
1167 ut_ad(mtr_is_page_fix(m_mtr, m_rec, MTR_MEMO_PAGE_X_FIX, m_index->table));
1168 /* Assert that the cursor position and the record are matching. */
1169 ut_ad(!need_recalc());
1170
1171 /* Free possible externally stored fields in the record */
1172 ut_ad(dict_table_is_comp(m_index->table) == !!rec_offs_comp(m_offsets));
1173 ulint n_fields = rec_offs_n_fields(m_offsets);
1174
1175 for (ulint i = 0; i < n_fields; i++) {
1176 if (rec_offs_nth_extern(m_offsets, i)) {
1177 byte *field_ref = btr_rec_get_field_ref(m_rec, m_offsets, i);
1178
1179 DeleteContext ctx(*this, field_ref, i, rollback);
1180
1181 upd_field_t *uf = nullptr;
1182 lob::purge(&ctx, m_index, trx_id, undo_no, rec_type, uf);
1183 if (need_recalc()) {
1184 recalc();
1185 }
1186 }
1187 }
1188 }
1189
1190 /** Load the first page of LOB and read its page type.
1191 @param[in] index the index object.
1192 @param[in] page_size the page size of LOB.
1193 @param[out] is_partially_updatable is the LOB partially updatable.
1194 @return the page type of first page of LOB.*/
get_lob_page_info(const dict_index_t * index,const page_size_t & page_size,bool & is_partially_updatable) const1195 ulint ref_t::get_lob_page_info(const dict_index_t *index,
1196 const page_size_t &page_size,
1197 bool &is_partially_updatable) const {
1198 mtr_t mtr;
1199 buf_block_t *block;
1200 ref_mem_t ref_mem;
1201
1202 parse(ref_mem);
1203
1204 mtr_start(&mtr);
1205
1206 block = buf_page_get(page_id_t(ref_mem.m_space_id, ref_mem.m_page_no),
1207 page_size, RW_S_LATCH, &mtr);
1208
1209 page_type_t page_type = block->get_page_type();
1210
1211 switch (page_type) {
1212 case FIL_PAGE_TYPE_LOB_FIRST: {
1213 first_page_t first_page(block, &mtr, (dict_index_t *)index);
1214 is_partially_updatable = first_page.can_be_partially_updated();
1215 break;
1216 }
1217 case FIL_PAGE_TYPE_ZLOB_FIRST: {
1218 z_first_page_t z_first_page(block, &mtr, (dict_index_t *)index);
1219 is_partially_updatable = z_first_page.can_be_partially_updated();
1220 break;
1221 }
1222 default:
1223 is_partially_updatable = false;
1224 }
1225
1226 mtr_commit(&mtr);
1227
1228 return (page_type);
1229 }
1230
1231 /** Load the first page of the LOB and mark it as not partially
1232 updatable anymore.
1233 @param[in] trx the current transaction
1234 @param[in] mtr the mini transaction context.
1235 @param[in] index the index dictionary object.
1236 @param[in] page_size the page size information. */
mark_not_partially_updatable(trx_t * trx,mtr_t * mtr,dict_index_t * index,const page_size_t & page_size)1237 void ref_t::mark_not_partially_updatable(trx_t *trx, mtr_t *mtr,
1238 dict_index_t *index,
1239 const page_size_t &page_size) {
1240 buf_block_t *block;
1241 ref_mem_t ref_mem;
1242
1243 parse(ref_mem);
1244
1245 /* If LOB has already been purged, ignore it. */
1246 if (ref_mem.is_purged()) {
1247 return;
1248 }
1249
1250 block = buf_page_get(page_id_t(ref_mem.m_space_id, ref_mem.m_page_no),
1251 page_size, RW_X_LATCH, mtr);
1252
1253 page_type_t page_type = block->get_page_type();
1254
1255 switch (page_type) {
1256 case FIL_PAGE_TYPE_LOB_FIRST: {
1257 first_page_t first_page(block, mtr, (dict_index_t *)index);
1258 first_page.mark_cannot_be_partially_updated(trx);
1259 break;
1260 }
1261 case FIL_PAGE_TYPE_ZLOB_FIRST: {
1262 z_first_page_t z_first_page(block, mtr, (dict_index_t *)index);
1263 z_first_page.mark_cannot_be_partially_updated(trx);
1264 break;
1265 }
1266 default:
1267 /* do nothing */
1268 break;
1269 }
1270 }
1271
1272 /** Check if the LOB can be partially updated. This is done by loading
1273 the first page of LOB and looking at the flags.
1274 @param[in] index the index to which LOB belongs.
1275 @return true if LOB is partially updatable, false otherwise.*/
is_lob_partially_updatable(const dict_index_t * index) const1276 bool ref_t::is_lob_partially_updatable(const dict_index_t *index) const {
1277 if (is_null_relaxed()) {
1278 return (false);
1279 }
1280
1281 const page_size_t page_size = dict_table_page_size(index->table);
1282
1283 if (page_size.is_compressed() && use_single_z_stream()) {
1284 return (false);
1285 }
1286
1287 bool can_do_partial_update = false;
1288 ulint page_type = get_lob_page_info(index, page_size, can_do_partial_update);
1289
1290 bool page_type_ok = (page_type == FIL_PAGE_TYPE_LOB_FIRST ||
1291 page_type == FIL_PAGE_TYPE_ZLOB_FIRST);
1292
1293 return (page_type_ok && can_do_partial_update);
1294 }
1295
print(std::ostream & out) const1296 std::ostream &ref_t::print(std::ostream &out) const {
1297 out << "[ref_t: m_ref=" << (void *)m_ref << ", space_id=" << space_id()
1298 << ", page_no=" << page_no() << ", offset=" << offset()
1299 << ", length=" << length()
1300 << ", is_being_modified=" << is_being_modified() << "]";
1301 return (out);
1302 }
1303
1304 #ifdef UNIV_DEBUG
check_space_id(dict_index_t * index) const1305 bool ref_t::check_space_id(dict_index_t *index) const {
1306 space_id_t idx_space_id = index->space;
1307 space_id_t ref_space_id = space_id();
1308
1309 bool lob_ref_valid = (ref_space_id == 0 || idx_space_id == ref_space_id);
1310 return (lob_ref_valid);
1311 }
1312 #endif /* UNIV_DEBUG */
1313
1314 /** Acquire an x-latch on the index page containing the clustered
1315 index record, in the given mini transaction context.
1316 @param[in] mtr the mini-transaction context. */
x_latch_rec_page(mtr_t * mtr)1317 void DeleteContext::x_latch_rec_page(mtr_t *mtr) {
1318 bool found;
1319 page_t *rec_page = m_blobref.page_align();
1320 page_no_t rec_page_no = page_get_page_no(rec_page);
1321 space_id_t rec_space_id = page_get_space_id(rec_page);
1322
1323 const page_size_t &rec_page_size =
1324 fil_space_get_page_size(rec_space_id, &found);
1325 ut_ad(found);
1326
1327 #ifdef UNIV_DEBUG
1328 buf_block_t *block =
1329 #endif /* UNIV_DEBUG */
1330 buf_page_get(page_id_t(rec_space_id, rec_page_no), rec_page_size,
1331 RW_X_LATCH, mtr);
1332
1333 ut_ad(block != nullptr);
1334 }
1335
1336 #ifdef UNIV_DEBUG
rec_check_lobref_space_id(dict_index_t * index,const rec_t * rec,const ulint * offsets)1337 bool rec_check_lobref_space_id(dict_index_t *index, const rec_t *rec,
1338 const ulint *offsets) {
1339 /* Make it more robust. If rec pointer is null, don't do anything. */
1340 if (rec == nullptr) {
1341 return (true);
1342 }
1343
1344 ut_ad(index->is_clustered());
1345 ut_ad(rec_offs_validate(rec, nullptr, offsets));
1346
1347 const ulint n = rec_offs_n_fields(offsets);
1348
1349 for (ulint i = 0; i < n; i++) {
1350 ulint len;
1351
1352 if (rec_offs_nth_default(offsets, i)) {
1353 continue;
1354 }
1355
1356 byte *data = rec_get_nth_field(rec, offsets, i, &len);
1357
1358 if (len == UNIV_SQL_NULL) {
1359 continue;
1360 }
1361
1362 if (rec_offs_nth_extern(offsets, i)) {
1363 ulint local_len = len - BTR_EXTERN_FIELD_REF_SIZE;
1364 ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1365
1366 byte *field_ref = data + local_len;
1367 ref_t ref(field_ref);
1368 if (!ref.check_space_id(index)) {
1369 return (false);
1370 }
1371 }
1372 }
1373 return (true);
1374 }
1375 #endif /* UNIV_DEBUG */
1376
mark_not_partially_updatable(trx_t * trx,dict_index_t * index,const upd_t * update,mtr_t * mtr)1377 dberr_t mark_not_partially_updatable(trx_t *trx, dict_index_t *index,
1378 const upd_t *update, mtr_t *mtr) {
1379 if (!index->is_clustered()) {
1380 /* Only clustered index can have LOBs. */
1381 return (DB_SUCCESS);
1382 }
1383
1384 const ulint n_fields = upd_get_n_fields(update);
1385
1386 for (ulint i = 0; i < n_fields; i++) {
1387 const upd_field_t *ufield = upd_get_nth_field(update, i);
1388
1389 if (update->is_partially_updated(ufield->field_no)) {
1390 continue;
1391 }
1392
1393 if (ufield->is_virtual()) {
1394 continue;
1395 }
1396
1397 const dfield_t *new_field = &ufield->new_val;
1398
1399 if (ufield->ext_in_old && !dfield_is_ext(new_field)) {
1400 const dfield_t *old_field = &ufield->old_val;
1401 byte *field_ref = old_field->blobref();
1402 ref_t ref(field_ref);
1403
1404 if (!ref.is_null_relaxed()) {
1405 ut_ad(ref.space_id() == index->space_id());
1406 ref.mark_not_partially_updatable(trx, mtr, index,
1407 index->get_page_size());
1408 }
1409 }
1410 }
1411
1412 return (DB_SUCCESS);
1413 }
1414
1415 } // namespace lob
1416