1 /*****************************************************************************
2 
3 Copyright (c) 2015, 2020, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 #include <sys/types.h>
28 
29 #include "btr0pcur.h"
30 #include "fil0fil.h"
31 #include "lob0first.h"
32 #include "lob0inf.h"
33 #include "lob0lob.h"
34 #include "lob0zip.h"
35 #include "row0upd.h"
36 #include "zlob0first.h"
37 
38 #include "my_dbug.h"
39 
40 namespace lob {
41 
42 /** A BLOB field reference has all the bits set to zero, except the "being
43  * modified" bit. */
44 const byte field_ref_almost_zero[FIELD_REF_SIZE] = {
45     0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x20, 0, 0, 0, 0, 0, 0, 0,
46 };
47 
48 #ifdef UNIV_DEBUG
assert_read_uncommitted() const49 bool ReadContext::assert_read_uncommitted() const {
50   ut_ad(m_trx == nullptr || m_trx->is_read_uncommitted());
51   return (true);
52 }
53 #endif /* UNIV_DEBUG */
54 
55 /** Gets the offset of the pointer to the externally stored part of a field.
56 @param[in]	offsets		array returned by rec_get_offsets()
57 @param[in]	n		index of the external field
58 @return offset of the pointer to the externally stored part */
btr_rec_get_field_ref_offs(const ulint * offsets,ulint n)59 ulint btr_rec_get_field_ref_offs(const ulint *offsets, ulint n) {
60   ulint field_ref_offs;
61   ulint local_len;
62 
63   ut_a(rec_offs_nth_extern(offsets, n));
64   field_ref_offs = rec_get_nth_field_offs(offsets, n, &local_len);
65   ut_a(rec_field_not_null_not_add_col_def(local_len));
66   ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
67 
68   return (field_ref_offs + local_len - BTR_EXTERN_FIELD_REF_SIZE);
69 }
70 
71 /** Marks non-updated off-page fields as disowned by this record.
72 The ownership must be transferred to the updated record which is
73 inserted elsewhere in the index tree. In purge only the owner of
74 externally stored field is allowed to free the field.
75 @param[in]	update		update vector. */
disown_inherited_fields(const upd_t * update)76 void BtrContext::disown_inherited_fields(const upd_t *update) {
77   ut_ad(rec_offs_validate());
78   ut_ad(!rec_offs_comp(m_offsets) || !rec_get_node_ptr_flag(m_rec));
79   ut_ad(rec_offs_any_extern(m_offsets));
80   ut_ad(m_mtr);
81 
82   for (ulint i = 0; i < rec_offs_n_fields(m_offsets); i++) {
83     if (rec_offs_nth_extern(m_offsets, i) &&
84         !upd_get_field_by_field_no(update, i, false)) {
85       set_ownership_of_extern_field(i, FALSE);
86     }
87   }
88 }
89 
90 /** When bulk load is being done, check if there is enough space in redo
91 log file. */
check_redolog_bulk()92 void BtrContext::check_redolog_bulk() {
93   ut_ad(is_bulk());
94 
95   FlushObserver *observer = m_mtr->get_flush_observer();
96 
97   rec_block_fix();
98 
99   commit_btr_mtr();
100 
101   DEBUG_SYNC_C("blob_write_middle");
102 
103   log_free_check();
104 
105   start_btr_mtr();
106   m_mtr->set_flush_observer(observer);
107 
108   rec_block_unfix();
109   ut_ad(validate());
110 }
111 
112 /** Check if there is enough space in log file. Commit and re-start the
113 mini transaction. */
check_redolog_normal()114 void BtrContext::check_redolog_normal() {
115   ut_ad(!is_bulk());
116 
117   FlushObserver *observer = m_mtr->get_flush_observer();
118   store_position();
119 
120   commit_btr_mtr();
121 
122   DEBUG_SYNC_C("blob_write_middle");
123 
124   log_free_check();
125 
126   DEBUG_SYNC_C("blob_write_middle_after_check");
127 
128   start_btr_mtr();
129 
130   m_mtr->set_flush_observer(observer);
131 
132   restore_position();
133 
134   ut_ad(validate());
135 }
136 
restart_mtr_normal()137 void BtrContext::restart_mtr_normal() {
138   ut_ad(!is_bulk());
139   FlushObserver *observer = m_mtr->get_flush_observer();
140 
141   if (m_pcur != nullptr) {
142     store_position();
143   }
144 
145   commit_btr_mtr();
146   start_btr_mtr();
147   m_mtr->set_flush_observer(observer);
148 
149   if (m_pcur != nullptr) {
150     restore_position();
151   }
152 
153   ut_ad(m_pcur == nullptr || validate());
154 }
155 
restart_mtr_bulk()156 void BtrContext::restart_mtr_bulk() {
157   ut_ad(is_bulk());
158   FlushObserver *observer = m_mtr->get_flush_observer();
159   rec_block_fix();
160   commit_btr_mtr();
161   start_btr_mtr();
162   m_mtr->set_flush_observer(observer);
163   rec_block_unfix();
164   ut_ad(validate());
165 }
166 
167 /** Print this blob directory into the given output stream.
168 @param[in]	out	the output stream.
169 @return the output stream. */
print(std::ostream & out) const170 std::ostream &blob_dir_t::print(std::ostream &out) const {
171   out << "[blob_dir_t: ";
172   for (const blob_page_info_t &info : m_pages) {
173     out << info;
174   }
175   out << "]";
176   return (out);
177 }
178 
179 /** Print this blob_page_into_t object into the given output stream.
180 @param[in]	out	the output stream.
181 @return the output stream. */
print(std::ostream & out) const182 std::ostream &blob_page_info_t::print(std::ostream &out) const {
183   out << "[blob_page_info_t: m_page_no=" << m_page_no << ", m_bytes=" << m_bytes
184       << ", m_zbytes=" << m_zbytes << "]";
185   return (out);
186 }
187 
188 /** Do setup of the zlib stream.
189 @return code returned by zlib. */
setup_zstream()190 int zReader::setup_zstream() {
191   const ulint local_prefix = m_rctx.m_local_len - BTR_EXTERN_FIELD_REF_SIZE;
192 
193   m_stream.next_out = m_rctx.m_buf + local_prefix;
194   m_stream.avail_out = static_cast<uInt>(m_rctx.m_len - local_prefix);
195   m_stream.next_in = Z_NULL;
196   m_stream.avail_in = 0;
197 
198   /* Zlib inflate needs 32 kilobytes for the default
199   window size, plus a few kilobytes for small objects. */
200   m_heap = mem_heap_create(40000);
201   page_zip_set_alloc(&m_stream, m_heap);
202 
203   int err = inflateInit(&m_stream);
204   return (err);
205 }
206 
207 /** Fetch the BLOB.
208 @return DB_SUCCESS on success, DB_FAIL on error. */
fetch()209 dberr_t zReader::fetch() {
210   DBUG_TRACE;
211 
212   dberr_t err = DB_SUCCESS;
213 
214   ut_ad(m_rctx.is_valid_blob());
215   ut_ad(assert_empty_local_prefix());
216 
217   ut_d(m_page_type_ex =
218            m_rctx.is_sdi() ? FIL_PAGE_SDI_ZBLOB : FIL_PAGE_TYPE_ZBLOB);
219 
220   setup_zstream();
221 
222   m_remaining = m_rctx.m_blobref.length();
223 
224   while (m_rctx.m_page_no != FIL_NULL) {
225     page_no_t curr_page_no = m_rctx.m_page_no;
226 
227     err = fetch_page();
228     if (err != DB_SUCCESS) {
229       break;
230     }
231 
232     m_stream.next_in = m_bpage->zip.data + m_rctx.m_offset;
233     m_stream.avail_in =
234         static_cast<uInt>(m_rctx.m_page_size.physical() - m_rctx.m_offset);
235 
236     int zlib_err = inflate(&m_stream, Z_NO_FLUSH);
237     switch (zlib_err) {
238       case Z_OK:
239         if (m_stream.avail_out == 0) {
240           goto end_of_blob;
241         }
242         break;
243       case Z_STREAM_END:
244         if (m_rctx.m_page_no == FIL_NULL) {
245           goto end_of_blob;
246         }
247       /* fall through */
248       default:
249         err = DB_FAIL;
250         ib::error(ER_IB_MSG_630)
251             << "inflate() of compressed BLOB page "
252             << page_id_t(m_rctx.m_space_id, curr_page_no) << " returned "
253             << zlib_err << " (" << m_stream.msg << ")";
254         /* fall through */
255         ut_error;
256       case Z_BUF_ERROR:
257         goto end_of_blob;
258     }
259 
260     buf_page_release_zip(m_bpage);
261 
262     m_rctx.m_offset = FIL_PAGE_NEXT;
263 
264     ut_d(if (!m_rctx.m_is_sdi) m_page_type_ex = FIL_PAGE_TYPE_ZBLOB2);
265   }
266 
267 end_of_blob:
268   buf_page_release_zip(m_bpage);
269   inflateEnd(&m_stream);
270   mem_heap_free(m_heap);
271   UNIV_MEM_ASSERT_RW(m_rctx.m_buf, m_stream.total_out);
272   return err;
273 }
274 
275 #ifdef UNIV_DEBUG
276 /** Assert that the local prefix is empty.  For compressed row format,
277 there is no local prefix stored.  This function doesn't return if the
278 local prefix is non-empty.
279 @return true if local prefix is empty*/
assert_empty_local_prefix()280 bool zReader::assert_empty_local_prefix() {
281   ut_ad(m_rctx.m_local_len == BTR_EXTERN_FIELD_REF_SIZE);
282   return (true);
283 }
284 #endif /* UNIV_DEBUG */
285 
fetch_page()286 dberr_t zReader::fetch_page() {
287   dberr_t err(DB_SUCCESS);
288 
289   m_bpage = buf_page_get_zip(page_id_t(m_rctx.m_space_id, m_rctx.m_page_no),
290                              m_rctx.m_page_size);
291 
292   ut_a(m_bpage != nullptr);
293   ut_ad(fil_page_get_type(m_bpage->zip.data) == m_page_type_ex);
294   m_rctx.m_page_no = mach_read_from_4(m_bpage->zip.data + FIL_PAGE_NEXT);
295 
296   if (m_rctx.m_offset == FIL_PAGE_NEXT) {
297     /* When the BLOB begins at page header,
298     the compressed data payload does not
299     immediately follow the next page pointer. */
300     m_rctx.m_offset = FIL_PAGE_DATA;
301   } else {
302     m_rctx.m_offset += 4;
303   }
304 
305   return (err);
306 }
307 
308 /** This is used to take action when we enter and exit a scope.  When we enter
309 the scope the constructor will set the "being modified" bit in the lob reference
310 objects that are either being inserted or updated.  When we exit the scope the
311 destructor will clear the "being modified" bit in the lob reference objects. */
312 struct Being_modified {
313   /** Constructor.  Set the "being modified" bit in LOB references.
314   @param[in] ctx  the B-tree context for LOB operation.
315   @param[in] big_rec_vec  the LOB vector
316   @param[in] pcur  persistent cursor
317   @param[in] offsets the record offsets
318   @param[in] op  the operation code
319   @param[in] mtr the mini-transaction context. */
Being_modifiedlob::Being_modified320   Being_modified(BtrContext &ctx, const big_rec_t *big_rec_vec,
321                  btr_pcur_t *pcur, ulint *offsets, opcode op, mtr_t *mtr)
322       : m_btr_ctx(ctx),
323         m_big_rec_vec(big_rec_vec),
324         m_pcur(pcur),
325         m_offsets(offsets),
326         m_op(op),
327         m_mtr(mtr) {
328     /* All pointers to externally stored columns in the record
329     must either be zero or they must be pointers to inherited
330     columns, owned by this record or an earlier record version. */
331     rec_t *rec = btr_pcur_get_rec(m_pcur);
332     dict_index_t *index = m_pcur->index();
333 #ifdef UNIV_DEBUG
334     rec_offs_make_valid(rec, index, m_offsets);
335 #endif /* UNIV_DEBUG */
336     for (uint i = 0; i < m_big_rec_vec->n_fields; i++) {
337       ulint field_no = m_big_rec_vec->fields[i].field_no;
338       byte *field_ref = btr_rec_get_field_ref(rec, m_offsets, field_no);
339       ref_t blobref(field_ref);
340 
341       ut_ad(!blobref.is_being_modified());
342 
343       /* Before we release latches in a subsequent ctx.check_redolog() call,
344       mark the blobs as being modified.  This is needed to ensure that READ
345       UNCOMMITTED transactions don't read an inconsistent BLOB. */
346       if (index->is_compressed()) {
347         blobref.set_being_modified(true, nullptr);
348 
349         if (m_op == OPCODE_INSERT_UPDATE) {
350           /* Inserting by updating a del-marked record. */
351           blobref.set_page_no(FIL_NULL, nullptr);
352         }
353 
354         if (!m_btr_ctx.is_bulk()) {
355           buf_block_t *rec_block = btr_pcur_get_block(m_pcur);
356           page_zip_des_t *page_zip = buf_block_get_page_zip(rec_block);
357           page_zip_write_blob_ptr(page_zip, rec, index, m_offsets, field_no,
358                                   m_mtr);
359         }
360       } else {
361         blobref.set_being_modified(true, m_mtr);
362       }
363 
364 #if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
365 
366       /* Make a in-memory copy of the LOB ref. */
367       ref_mem_t ref_mem;
368       blobref.parse(ref_mem);
369 
370       ut_a(blobref.is_owner());
371       /* Either this must be an update in place,
372       or the BLOB must be inherited, or the BLOB pointer
373       must be zero (will be written in this function). */
374       ut_a(m_op == OPCODE_UPDATE || m_op == OPCODE_INSERT_UPDATE ||
375            blobref.is_inherited() || blobref.is_null_relaxed());
376       ut_ad(blobref.is_being_modified());
377 
378 #endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
379     }
380   }
381 
382   /** Destructor.  Clear the "being modified" bit in LOB references. */
~Being_modifiedlob::Being_modified383   ~Being_modified() {
384     rec_t *rec = btr_pcur_get_rec(m_pcur);
385     dict_index_t *index = m_pcur->index();
386 #ifdef UNIV_DEBUG
387     rec_offs_make_valid(rec, index, m_offsets);
388 #endif /* UNIV_DEBUG */
389     for (uint i = 0; i < m_big_rec_vec->n_fields; i++) {
390       ulint field_no = m_big_rec_vec->fields[i].field_no;
391       byte *field_ref = btr_rec_get_field_ref(rec, m_offsets, field_no);
392       ref_t blobref(field_ref);
393 
394       if (index->is_compressed()) {
395         blobref.set_being_modified(false, nullptr);
396         if (!m_btr_ctx.is_bulk()) {
397           buf_block_t *rec_block = btr_pcur_get_block(m_pcur);
398           page_zip_des_t *page_zip = buf_block_get_page_zip(rec_block);
399           page_zip_write_blob_ptr(page_zip, rec, index, m_offsets, field_no,
400                                   m_mtr);
401         }
402       } else {
403         blobref.set_being_modified(false, m_mtr);
404       }
405     }
406   }
407 
408   BtrContext &m_btr_ctx;
409   const big_rec_t *m_big_rec_vec;
410   btr_pcur_t *m_pcur;
411   ulint *m_offsets;
412   opcode m_op;
413   mtr_t *m_mtr;
414 };
415 
416 /** Stores the fields in big_rec_vec to the tablespace and puts pointers to
417 them in rec.  The extern flags in rec will have to be set beforehand. The
418 fields are stored on pages allocated from leaf node file segment of the index
419 tree.
420 
421 TODO: If the allocation extends the tablespace, it will not be redo logged, in
422 any mini-transaction.  Tablespace extension should be redo-logged, so that
423 recovery will not fail when the big_rec was written to the extended portion of
424 the file, in case the file was somehow truncated in the crash.
425 @param[in]	trx		the trx doing LOB store. If unavailable it
426                                 could be nullptr.
427 @param[in,out]	pcur		a persistent cursor. if btr_mtr is restarted,
428                                 then this can be repositioned.
429 @param[in]	upd		update vector
430 @param[in,out]	offsets		rec_get_offsets() on pcur. the "external in
431                                 offsets will correctly correspond storage"
432                                 flagsin offsets will correctly correspond to
433                                 rec when this function returns
434 @param[in]	big_rec_vec	vector containing fields to be stored
435                                 externally
436 @param[in,out]	btr_mtr		mtr containing the latches to the clustered
437                                 index. can be committed and restarted.
438 @param[in]	op		operation code
439 @return DB_SUCCESS or DB_OUT_OF_FILE_SPACE */
btr_store_big_rec_extern_fields(trx_t * trx,btr_pcur_t * pcur,const upd_t * upd,ulint * offsets,const big_rec_t * big_rec_vec,mtr_t * btr_mtr,opcode op)440 dberr_t btr_store_big_rec_extern_fields(trx_t *trx, btr_pcur_t *pcur,
441                                         const upd_t *upd, ulint *offsets,
442                                         const big_rec_t *big_rec_vec,
443                                         mtr_t *btr_mtr, opcode op) {
444   mtr_t mtr;
445   mtr_t mtr_bulk;
446   page_zip_des_t *page_zip;
447   dberr_t error = DB_SUCCESS;
448   dict_index_t *index = pcur->index();
449   dict_table_t *table = index->table;
450   buf_block_t *rec_block = btr_pcur_get_block(pcur);
451   rec_t *rec = btr_pcur_get_rec(pcur);
452 
453   ut_ad(rec_offs_validate(rec, index, offsets));
454   ut_ad(rec_offs_any_extern(offsets));
455   ut_ad(btr_mtr);
456   ut_ad(mtr_memo_contains_flagged(btr_mtr, dict_index_get_lock(index),
457                                   MTR_MEMO_X_LOCK | MTR_MEMO_SX_LOCK) ||
458         index->table->is_intrinsic() || !index->is_committed());
459   ut_ad(
460       mtr_is_block_fix(btr_mtr, rec_block, MTR_MEMO_PAGE_X_FIX, index->table));
461   ut_ad(buf_block_get_frame(rec_block) == page_align(rec));
462   ut_a(index->is_clustered());
463 
464   ut_a(dict_table_page_size(table).equals_to(rec_block->page.size));
465 
466   /* Create a blob operation context. */
467   BtrContext btr_ctx(btr_mtr, pcur, index, rec, offsets, rec_block, op);
468   InsertContext ctx(btr_ctx, big_rec_vec);
469 
470   Being_modified bm(btr_ctx, big_rec_vec, pcur, offsets, op, btr_mtr);
471 
472   /* The pcur could be re-positioned.  Commit and restart btr_mtr. */
473   ctx.check_redolog();
474   rec_block = btr_pcur_get_block(pcur);
475   rec = btr_pcur_get_rec(pcur);
476 
477   page_zip = buf_block_get_page_zip(rec_block);
478   ut_a(fil_page_index_page_check(page_align(rec)) || op == OPCODE_INSERT_BULK);
479 
480   if (page_zip != nullptr) {
481     DBUG_EXECUTE_IF("lob_insert_single_zstream",
482                     { goto insert_single_zstream; });
483 
484     if (dict_index_is_sdi(index)) {
485       goto insert_single_zstream;
486     }
487 
488   } else {
489     /* Uncompressed LOB */
490 
491     DBUG_EXECUTE_IF("lob_insert_noindex", { goto insert_noindex; });
492 
493     if (dict_index_is_sdi(index)) {
494       goto insert_noindex;
495     }
496   }
497 
498   for (uint i = 0; i < big_rec_vec->n_fields; i++) {
499     ulint field_no = big_rec_vec->fields[i].field_no;
500 
501     /* Cursor could have changed position. */
502     rec = btr_pcur_get_rec(pcur);
503     rec_offs_make_valid(rec, index, offsets);
504     ut_ad(rec_offs_validate(rec, index, offsets));
505 
506     byte *field_ref = btr_rec_get_field_ref(rec, offsets, field_no);
507 
508     ref_t blobref(field_ref);
509     ut_ad(blobref.validate(btr_mtr));
510 
511     bool can_do_partial_update = false;
512 
513     if (op == lob::OPCODE_UPDATE && upd != nullptr &&
514         big_rec_vec->fields[i].ext_in_old) {
515       can_do_partial_update = blobref.is_lob_partially_updatable(index);
516     }
517 
518     if (page_zip != nullptr) {
519       bool do_insert = true;
520 
521       if (op == lob::OPCODE_UPDATE && upd != nullptr &&
522           blobref.is_big(rec_block->page.size) && can_do_partial_update) {
523         if (upd->is_partially_updated(field_no)) {
524           /* Do partial update. */
525           error = lob::z_update(ctx, trx, index, upd, field_no, blobref);
526           switch (error) {
527             case DB_SUCCESS:
528               do_insert = false;
529               break;
530             case DB_FAIL:
531               break;
532             default:
533               ut_error;
534           }
535         } else {
536           /* This is to inform the purge thread that
537           the older version LOB in this update operation
538           can be freed. */
539           blobref.mark_not_partially_updatable(trx, btr_mtr, index,
540                                                dict_table_page_size(table));
541         }
542       }
543 
544       if (do_insert) {
545         const ulint lob_len = big_rec_vec->fields[i].len;
546         if (ref_t::use_single_z_stream(lob_len)) {
547           zInserter zblob_writer(&ctx);
548           error = zblob_writer.prepare();
549           if (error == DB_SUCCESS) {
550             zblob_writer.write_one_blob(i);
551             error = zblob_writer.finish();
552           }
553         } else {
554           error = lob::z_insert(&ctx, trx, blobref, &big_rec_vec->fields[i], i);
555         }
556 
557         if (op == lob::OPCODE_UPDATE && upd != nullptr) {
558           /* Get the corresponding upd_field_t
559           object.*/
560           upd_field_t *uf = upd->get_field_by_field_no(field_no, index);
561 
562           if (uf != nullptr) {
563             /* Update the LOB reference
564             stored in upd_field_t */
565             dfield_t *new_val = &uf->new_val;
566 
567             if (dfield_is_ext(new_val)) {
568               byte *field_ref = new_val->blobref();
569               blobref.copy(field_ref);
570               ref_t::set_being_modified(field_ref, false, nullptr);
571             }
572           }
573         }
574       }
575 
576     } else {
577       /* Uncompressed LOB */
578       bool do_insert = true;
579 
580       if (op == lob::OPCODE_UPDATE && upd != nullptr &&
581           blobref.is_big(rec_block->page.size) && can_do_partial_update) {
582         if (upd->is_partially_updated(field_no)) {
583           /* Do partial update. */
584           error = lob::update(ctx, trx, index, upd, field_no, blobref);
585           switch (error) {
586             case DB_SUCCESS:
587               do_insert = false;
588               break;
589             case DB_FAIL:
590               break;
591             case DB_OUT_OF_FILE_SPACE:
592               break;
593             default:
594               ut_error;
595           }
596 
597         } else {
598           /* This is to inform the purge thread that
599           the older version LOB in this update operation
600           can be freed. */
601           blobref.mark_not_partially_updatable(trx, btr_mtr, index,
602                                                dict_table_page_size(table));
603         }
604       }
605 
606       if (do_insert) {
607         error = lob::insert(&ctx, trx, blobref, &big_rec_vec->fields[i], i);
608 
609         if (op == lob::OPCODE_UPDATE && upd != nullptr) {
610           /* Get the corresponding upd_field_t
611           object.*/
612           upd_field_t *uf = upd->get_field_by_field_no(field_no, index);
613 
614           if (uf != nullptr) {
615             /* Update the LOB reference
616             stored in upd_field_t */
617             dfield_t *new_val = &uf->new_val;
618             if (dfield_is_ext(new_val)) {
619               byte *field_ref = new_val->blobref();
620               blobref.copy(field_ref);
621               ref_t::set_being_modified(field_ref, false, nullptr);
622             }
623           }
624         }
625       }
626     }
627 
628     if (error != DB_SUCCESS) {
629       break;
630     }
631 
632 #ifdef UNIV_DEBUG
633     /* Ensure that the LOB references are valid now. */
634     rec = btr_pcur_get_rec(pcur);
635     rec_offs_make_valid(rec, index, offsets);
636     field_ref =
637         btr_rec_get_field_ref(rec, offsets, big_rec_vec->fields[i].field_no);
638     ref_t lobref(field_ref);
639 
640     ut_ad(!lobref.is_null());
641 #endif /* UNIV_DEBUG */
642   }
643   return (error);
644 
645   {
646   insert_single_zstream:
647     /* Insert the LOB as a single zlib stream spanning multiple
648     LOB pages.  This is the old way of storing LOBs. */
649     zInserter zblob_writer(&ctx);
650     error = zblob_writer.prepare();
651     if (error == DB_SUCCESS) {
652       zblob_writer.write();
653       error = zblob_writer.finish();
654     }
655     return (error);
656   }
657   {
658   insert_noindex:
659     /* Insert the uncompressed LOB without LOB index. */
660     Inserter blob_writer(&ctx);
661     error = blob_writer.write();
662     return (error);
663   }
664 }
665 
666 /** Copies an externally stored field of a record to mem heap.
667 @param[in]	rec		record in a clustered index; must be
668                                 protected by a lock or a page latch
669 @param[in]	offsets		array returned by rec_get_offsets()
670 @param[in]	page_size	BLOB page size
671 @param[in]	no		field number
672 @param[out]	len		length of the field */
673 #ifdef UNIV_DEBUG
674 /**
675 @param[in]	is_sdi		true for SDI Indexes */
676 #endif /* UNIV_DEBUG */
677 /**
678 @param[in,out]	heap		mem heap
679 @return the field copied to heap, or NULL if the field is incomplete */
btr_rec_copy_externally_stored_field_func(trx_t * trx,const dict_index_t * index,const rec_t * rec,const ulint * offsets,const page_size_t & page_size,ulint no,ulint * len,size_t * lob_version,bool is_sdi,mem_heap_t * heap)680 byte *btr_rec_copy_externally_stored_field_func(
681     trx_t *trx, const dict_index_t *index, const rec_t *rec,
682     const ulint *offsets, const page_size_t &page_size, ulint no, ulint *len,
683     size_t *lob_version,
684 #ifdef UNIV_DEBUG
685     bool is_sdi,
686 #endif /* UNIV_DEBUG */
687     mem_heap_t *heap) {
688 
689   ulint local_len;
690   const byte *data;
691 
692   ut_a(rec_offs_nth_extern(offsets, no));
693 
694   /* An externally stored field can contain some initial
695   data from the field, and in the last 20 bytes it has the
696   space id, page number, and offset where the rest of the
697   field data is stored, and the data length in addition to
698   the data stored locally. We may need to store some data
699   locally to get the local record length above the 128 byte
700   limit so that field offsets are stored in two bytes, and
701   the extern bit is available in those two bytes. */
702 
703   data = rec_get_nth_field(rec, offsets, no, &local_len);
704   const byte *field_ref = data + local_len - BTR_EXTERN_FIELD_REF_SIZE;
705 
706   lob::ref_t ref(const_cast<byte *>(field_ref));
707 
708   ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
709 
710 #ifdef UNIV_DEBUG
711   /* Verify if the LOB reference is sane. */
712   space_id_t space_id = ref.space_id();
713   ut_ad(space_id == 0 || space_id == index->space);
714 #endif /* UNIV_DEBUG */
715 
716   if (ref.is_null()) {
717     /* The externally stored field was not written yet.
718     This record should only be seen by
719     trx_rollback_or_clean_all_recovered() or any
720     TRX_ISO_READ_UNCOMMITTED transactions. */
721 
722     return (nullptr);
723   }
724 
725   return (btr_copy_externally_stored_field(trx, index, len, lob_version, data,
726                                            page_size, local_len, is_sdi, heap));
727 }
728 
729 /** Returns the page number where the next BLOB part is stored.
730 @param[in]	blob_header	the BLOB header.
731 @return page number or FIL_NULL if no more pages */
btr_blob_get_next_page_no(const byte * blob_header)732 static inline page_no_t btr_blob_get_next_page_no(const byte *blob_header) {
733   return (mach_read_from_4(blob_header + LOB_HDR_NEXT_PAGE_NO));
734 }
735 
736 /** Check the FIL_PAGE_TYPE on an uncompressed BLOB page.
737 @param[in]	space_id	space identifier.
738 @param[in]	page_no		page number.
739 @param[in]	page		the page
740 @param[in]	read		TRUE=read, FALSE=purge */
btr_check_blob_fil_page_type(space_id_t space_id,page_no_t page_no,const page_t * page,ibool read)741 static void btr_check_blob_fil_page_type(space_id_t space_id, page_no_t page_no,
742                                          const page_t *page, ibool read) {
743   ulint type = fil_page_get_type(page);
744 
745   ut_a(space_id == page_get_space_id(page));
746   ut_a(page_no == page_get_page_no(page));
747 
748   switch (type) {
749     uint32_t flags;
750     case FIL_PAGE_TYPE_BLOB:
751     case FIL_PAGE_SDI_BLOB:
752       break;
753 
754     default:
755       flags = fil_space_get_flags(space_id);
756 #ifndef UNIV_DEBUG /* Improve debug test coverage */
757       if (!DICT_TF_HAS_ATOMIC_BLOBS(flags)) {
758         /* Old versions of InnoDB did not initialize
759         FIL_PAGE_TYPE on BLOB pages.  Do not print
760         anything about the type mismatch when reading
761         a BLOB page that may be from old versions. */
762         return;
763       }
764 #endif /* !UNIV_DEBUG */
765 
766       ib::fatal(ER_IB_MSG_631)
767           << "FIL_PAGE_TYPE=" << type << " on BLOB "
768           << (read ? "read" : "purge") << " space " << space_id << " page "
769           << page_no << " flags " << flags;
770   }
771 }
772 
773 /** Returns the length of a BLOB part stored on the header page.
774 @param[in]	blob_header	the BLOB header.
775 @return part length */
btr_blob_get_part_len(const byte * blob_header)776 static inline ulint btr_blob_get_part_len(const byte *blob_header) {
777   return (mach_read_from_4(blob_header + LOB_HDR_PART_LEN));
778 }
779 
780 /** Fetch one BLOB page. */
fetch_page()781 void Reader::fetch_page() {
782   mtr_t mtr;
783 
784   /* Bytes of LOB data available in the current LOB page. */
785   ulint part_len;
786 
787   /* Bytes of LOB data obtained from the current LOB page. */
788   ulint copy_len;
789 
790   ut_ad(m_rctx.m_page_no != FIL_NULL);
791   ut_ad(m_rctx.m_page_no > 0);
792 
793   mtr_start(&mtr);
794 
795   m_cur_block = buf_page_get(page_id_t(m_rctx.m_space_id, m_rctx.m_page_no),
796                              m_rctx.m_page_size, RW_S_LATCH, &mtr);
797   buf_block_dbg_add_level(m_cur_block, SYNC_EXTERN_STORAGE);
798   page_t *page = buf_block_get_frame(m_cur_block);
799 
800   btr_check_blob_fil_page_type(m_rctx.m_space_id, m_rctx.m_page_no, page, TRUE);
801 
802   byte *blob_header = page + m_rctx.m_offset;
803   part_len = btr_blob_get_part_len(blob_header);
804   copy_len = ut_min(part_len, m_rctx.m_len - m_copied_len);
805 
806   memcpy(m_rctx.m_buf + m_copied_len, blob_header + LOB_HDR_SIZE, copy_len);
807 
808   m_copied_len += copy_len;
809   m_rctx.m_page_no = btr_blob_get_next_page_no(blob_header);
810   mtr_commit(&mtr);
811   m_rctx.m_offset = FIL_PAGE_DATA;
812 }
813 
814 /** Fetch the complete or prefix of the uncompressed LOB data.
815 @return bytes of LOB data fetched. */
fetch()816 ulint Reader::fetch() {
817   if (m_rctx.m_blobref.is_null()) {
818     ut_ad(m_copied_len == 0);
819     return (m_copied_len);
820   }
821 
822   while (m_copied_len < m_rctx.m_len) {
823     if (m_rctx.m_page_no == FIL_NULL) {
824       /* End of LOB has been reached. */
825       break;
826     }
827 
828     fetch_page();
829   }
830 
831   /* Assure that we have fetched the requested amount or the LOB
832   has ended. */
833   ut_ad(m_copied_len == m_rctx.m_len || m_rctx.m_page_no == FIL_NULL);
834 
835   return (m_copied_len);
836 }
837 
838 /** Copies the prefix of an externally stored field of a record.
839 The clustered index record must be protected by a lock or a page latch.
840 @param[in]	index		the clust index in which lob is read.
841 @param[out]	buf		the field, or a prefix of it
842 @param[in]	len		length of buf, in bytes
843 @param[in]	page_size	BLOB page size
844 @param[in]	data		'internally' stored part of the field
845                                 containing also the reference to the external
846                                 part; must be protected by a lock or a page
847                                 latch. */
848 #ifdef UNIV_DEBUG
849 /**
850 @param[in]	is_sdi		true for SDI indexes */
851 #endif /* UNIV_DEBUG */
852 /**
853 @param[in]	local_len	length of data, in bytes
854 @return the length of the copied field, or 0 if the column was being
855 or has been deleted */
btr_copy_externally_stored_field_prefix_func(trx_t * trx,const dict_index_t * index,byte * buf,ulint len,const page_size_t & page_size,const byte * data,bool is_sdi,ulint local_len)856 ulint btr_copy_externally_stored_field_prefix_func(trx_t *trx,
857                                                    const dict_index_t *index,
858                                                    byte *buf, ulint len,
859                                                    const page_size_t &page_size,
860                                                    const byte *data,
861 #ifdef UNIV_DEBUG
862                                                    bool is_sdi,
863 #endif /* UNIV_DEBUG */
864                                                    ulint local_len) {
865   ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
866 
867   if (page_size.is_compressed()) {
868     ut_a(local_len == BTR_EXTERN_FIELD_REF_SIZE);
869 
870     ReadContext rctx(page_size, data, local_len, buf, len
871 #ifdef UNIV_DEBUG
872                      ,
873                      is_sdi
874 #endif /* UNIV_DEBUG */
875     );
876 
877     rctx.m_index = const_cast<dict_index_t *>(index);
878     rctx.m_trx = trx;
879 
880     /* Obtain length of LOB available in clustered index.*/
881     ulint avail_lob = rctx.m_blobref.length();
882 
883     if (avail_lob == 0) {
884       /* No LOB data available. */
885       return (0);
886     }
887 
888     /* Read the LOB data. */
889     ulint fetch_len = lob::z_read(&rctx, rctx.m_blobref, 0, len, buf);
890 
891     /* Either fetch the requested length or fetch the complete
892     LOB. If complete LOB is fetched, then it means that requested
893     length is bigger than the available length. */
894     ut_a(fetch_len == 0 || fetch_len == len ||
895          (fetch_len == avail_lob && avail_lob < len));
896 
897     return (fetch_len);
898   }
899 
900   local_len -= BTR_EXTERN_FIELD_REF_SIZE;
901 
902   if (UNIV_UNLIKELY(local_len >= len)) {
903     memcpy(buf, data, len);
904     return (len);
905   }
906 
907   memcpy(buf, data, local_len);
908   data += local_len;
909 
910   ut_a(memcmp(data, field_ref_zero, BTR_EXTERN_FIELD_REF_SIZE));
911 
912   if (!mach_read_from_4(data + BTR_EXTERN_LEN + 4)) {
913     /* The externally stored part of the column has been
914     (partially) deleted.  Signal the half-deleted BLOB
915     to the caller. */
916 
917     return (0);
918   }
919 
920   ReadContext rctx(page_size, data, local_len + BTR_EXTERN_FIELD_REF_SIZE,
921                    buf + local_len, len
922 #ifdef UNIV_DEBUG
923                    ,
924                    false
925 #endif /* UNIV_DEBUG */
926   );
927 
928   rctx.m_index = (dict_index_t *)index;
929   rctx.m_trx = trx;
930 
931   ulint fetch_len = lob::read(&rctx, rctx.m_blobref, 0, len, buf + local_len);
932   return (local_len + fetch_len);
933 }
934 
935 /** Copies an externally stored field of a record to mem heap.
936 The clustered index record must be protected by a lock or a page latch.
937 @param[in]	trx		the current trx object or nullptr
938 @param[in]	index		the clust index in which lob is read.
939 @param[out]	len		length of the whole field
940 @param[out]	lob_version	LOB version number.
941 @param[in]	data		'internally' stored part of the field
942                                 containing also the reference to the external
943                                 part; must be protected by a lock or a page
944                                 latch.
945 @param[in]	page_size	BLOB page size
946 @param[in]	local_len	length of data */
947 #ifdef UNIV_DEBUG
948 /**
949 @param[in]	is_sdi		true for SDI Indexes */
950 #endif /* UNIV_DEBUG */
951 /**
952 @param[in,out]	heap		mem heap
953 @return the whole field copied to heap */
btr_copy_externally_stored_field_func(trx_t * trx,const dict_index_t * index,ulint * len,size_t * lob_version,const byte * data,const page_size_t & page_size,ulint local_len,bool is_sdi,mem_heap_t * heap)954 byte *btr_copy_externally_stored_field_func(
955     trx_t *trx, const dict_index_t *index, ulint *len, size_t *lob_version,
956     const byte *data, const page_size_t &page_size, ulint local_len,
957 #ifdef UNIV_DEBUG
958     bool is_sdi,
959 #endif /* UNIV_DEBUG */
960     mem_heap_t *heap) {
961   uint32_t extern_len;
962   byte *buf;
963 
964   ut_a(index->is_clustered());
965 
966   ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
967 
968   local_len -= BTR_EXTERN_FIELD_REF_SIZE;
969 
970   /* Currently a BLOB cannot be bigger than 4 GB; we
971   leave the 4 upper bytes in the length field unused */
972 
973   const byte *field_ref = data + local_len;
974 
975   extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
976 
977   buf = (byte *)mem_heap_alloc(heap, local_len + extern_len);
978 
979   ReadContext rctx(page_size, data, local_len + BTR_EXTERN_FIELD_REF_SIZE,
980                    buf + local_len, extern_len
981 #ifdef UNIV_DEBUG
982                    ,
983                    is_sdi
984 #endif /* UNIV_DEBUG */
985   );
986 
987   rctx.m_index = (dict_index_t *)index;
988 
989   if (ref_t::is_being_modified(field_ref)) {
990 #ifdef UNIV_DEBUG
991     /* Check the sanity of the LOB reference. */
992     if (ref_t::is_null_relaxed(field_ref) ||
993         ref_t::space_id(field_ref) == index->space) {
994       /* Valid scenario.  Do nothing. */
995     } else {
996       bool lob_ref_is_corrupt = false;
997       ut_ad(lob_ref_is_corrupt);
998     }
999 #endif /* UNIV_DEBUG */
1000 
1001     /* This is applicable only for READ UNCOMMITTED transactions because they
1002     don't take transaction locks. */
1003     ut_ad(trx == nullptr || trx->is_read_uncommitted());
1004 
1005     *len = 0;
1006     return (buf);
1007   }
1008 
1009   if (extern_len == 0) {
1010     /* The lob has already been purged. */
1011     ut_ad(ref_t::page_no(field_ref) == FIL_NULL);
1012     *len = 0;
1013     return (buf);
1014   }
1015 
1016   if (page_size.is_compressed()) {
1017     ut_ad(local_len == 0);
1018     *len = 0;
1019 
1020     if (extern_len > 0) {
1021       *len = lob::z_read(&rctx, rctx.m_blobref, 0, extern_len, buf + local_len);
1022     }
1023 
1024     return (buf);
1025   } else {
1026     if (local_len > 0) {
1027       memcpy(buf, data, local_len);
1028     }
1029 
1030     ulint fetch_len =
1031         lob::read(&rctx, rctx.m_blobref, 0, extern_len, buf + local_len);
1032 
1033     *len = local_len + fetch_len;
1034 
1035     if (lob_version != nullptr) {
1036       *lob_version = rctx.m_lob_version;
1037     }
1038 
1039     return (buf);
1040   }
1041 }
1042 
1043 /** Frees the externally stored fields for a record, if the field
1044 is mentioned in the update vector.
1045 @param[in]	trx_id		the transaction identifier.
1046 @param[in]	undo_no		undo number within a transaction whose
1047                                 LOB is being freed.
1048 @param[in]	update		update vector
1049 @param[in]	rollback	performing rollback? */
free_updated_extern_fields(trx_id_t trx_id,undo_no_t undo_no,const upd_t * update,bool rollback)1050 void BtrContext::free_updated_extern_fields(trx_id_t trx_id, undo_no_t undo_no,
1051                                             const upd_t *update,
1052                                             bool rollback) {
1053   ulint n_fields;
1054   ulint i;
1055   ut_ad(rollback);
1056 
1057   ut_ad(rec_offs_validate());
1058   ut_ad(mtr_is_page_fix(m_mtr, m_rec, MTR_MEMO_PAGE_X_FIX, m_index->table));
1059   /* Assert that the cursor position and the record are matching. */
1060   ut_ad(!need_recalc());
1061 
1062   /* Free possible externally stored fields in the record */
1063 
1064   n_fields = upd_get_n_fields(update);
1065 
1066   for (i = 0; i < n_fields; i++) {
1067     const upd_field_t *ufield = upd_get_nth_field(update, i);
1068 
1069     if (rec_offs_nth_extern(m_offsets, ufield->field_no)) {
1070       ulint len;
1071       byte *data = rec_get_nth_field(m_rec, m_offsets, ufield->field_no, &len);
1072       ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
1073 
1074       byte *field_ref = data + len - BTR_EXTERN_FIELD_REF_SIZE;
1075 
1076       DeleteContext ctx(*this, field_ref, ufield->field_no, rollback);
1077       lob::purge(&ctx, m_index, trx_id, undo_no, 0, ufield);
1078       if (need_recalc()) {
1079         recalc();
1080       }
1081     }
1082   }
1083 }
1084 
1085 /** Deallocate a buffer block that was reserved for a BLOB part.
1086 @param[in]	index	index
1087 @param[in]	block	buffer block
1088 @param[in]	all	flag whether remove the compressed page
1089                         if there is one
1090 @param[in]	mtr	mini-transaction to commit */
blob_free(dict_index_t * index,buf_block_t * block,bool all,mtr_t * mtr)1091 void blob_free(dict_index_t *index, buf_block_t *block, bool all, mtr_t *mtr) {
1092   buf_pool_t *buf_pool = buf_pool_from_block(block);
1093   page_id_t page_id(block->page.id.space(), block->page.id.page_no());
1094   bool freed = false;
1095 
1096   ut_ad(mtr_is_block_fix(mtr, block, MTR_MEMO_PAGE_X_FIX, index->table));
1097 
1098   mtr_commit(mtr);
1099 
1100   mutex_enter(&buf_pool->LRU_list_mutex);
1101   buf_page_mutex_enter(block);
1102 
1103   /* Only free the block if it is still allocated to
1104   the same file page. */
1105 
1106   if (buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE &&
1107       page_id == block->page.id) {
1108     freed = buf_LRU_free_page(&block->page, all);
1109 
1110     if (!freed && all && block->page.zip.data &&
1111         buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE &&
1112         page_id == block->page.id) {
1113       /* Attempt to deallocate the uncompressed page
1114       if the whole block cannot be deallocted. */
1115 
1116       freed = buf_LRU_free_page(&block->page, false);
1117     }
1118   }
1119 
1120   if (!freed) {
1121     mutex_exit(&buf_pool->LRU_list_mutex);
1122     buf_page_mutex_exit(block);
1123   }
1124 }
1125 
1126 /** Gets the externally stored size of a record, in units of a database page.
1127 @param[in]	rec	record
1128 @param[in]	offsets	array returned by rec_get_offsets()
1129 @return externally stored part, in units of a database page */
btr_rec_get_externally_stored_len(const rec_t * rec,const ulint * offsets)1130 ulint btr_rec_get_externally_stored_len(const rec_t *rec,
1131                                         const ulint *offsets) {
1132   ulint n_fields;
1133   ulint total_extern_len = 0;
1134   ulint i;
1135 
1136   ut_ad(!rec_offs_comp(offsets) || !rec_get_node_ptr_flag(rec));
1137 
1138   if (!rec_offs_any_extern(offsets)) {
1139     return (0);
1140   }
1141 
1142   n_fields = rec_offs_n_fields(offsets);
1143 
1144   for (i = 0; i < n_fields; i++) {
1145     if (rec_offs_nth_extern(offsets, i)) {
1146       ulint extern_len = mach_read_from_4(
1147           btr_rec_get_field_ref(rec, offsets, i) + BTR_EXTERN_LEN + 4);
1148 
1149       total_extern_len += ut_calc_align(extern_len, UNIV_PAGE_SIZE);
1150     }
1151   }
1152 
1153   return (total_extern_len / UNIV_PAGE_SIZE);
1154 }
1155 
1156 /** Frees the externally stored fields for a record.
1157 @param[in]	trx_id		transaction identifier whose LOB is
1158                                 being freed.
1159 @param[in]	undo_no		undo number within a transaction whose
1160                                 LOB is being freed.
1161 @param[in]	rollback	performing rollback?
1162 @param[in]	rec_type	undo record type.*/
free_externally_stored_fields(trx_id_t trx_id,undo_no_t undo_no,bool rollback,ulint rec_type)1163 void BtrContext::free_externally_stored_fields(trx_id_t trx_id,
1164                                                undo_no_t undo_no, bool rollback,
1165                                                ulint rec_type) {
1166   ut_ad(rec_offs_validate());
1167   ut_ad(mtr_is_page_fix(m_mtr, m_rec, MTR_MEMO_PAGE_X_FIX, m_index->table));
1168   /* Assert that the cursor position and the record are matching. */
1169   ut_ad(!need_recalc());
1170 
1171   /* Free possible externally stored fields in the record */
1172   ut_ad(dict_table_is_comp(m_index->table) == !!rec_offs_comp(m_offsets));
1173   ulint n_fields = rec_offs_n_fields(m_offsets);
1174 
1175   for (ulint i = 0; i < n_fields; i++) {
1176     if (rec_offs_nth_extern(m_offsets, i)) {
1177       byte *field_ref = btr_rec_get_field_ref(m_rec, m_offsets, i);
1178 
1179       DeleteContext ctx(*this, field_ref, i, rollback);
1180 
1181       upd_field_t *uf = nullptr;
1182       lob::purge(&ctx, m_index, trx_id, undo_no, rec_type, uf);
1183       if (need_recalc()) {
1184         recalc();
1185       }
1186     }
1187   }
1188 }
1189 
1190 /** Load the first page of LOB and read its page type.
1191 @param[in]	index			the index object.
1192 @param[in]	page_size		the page size of LOB.
1193 @param[out]	is_partially_updatable	is the LOB partially updatable.
1194 @return the page type of first page of LOB.*/
get_lob_page_info(const dict_index_t * index,const page_size_t & page_size,bool & is_partially_updatable) const1195 ulint ref_t::get_lob_page_info(const dict_index_t *index,
1196                                const page_size_t &page_size,
1197                                bool &is_partially_updatable) const {
1198   mtr_t mtr;
1199   buf_block_t *block;
1200   ref_mem_t ref_mem;
1201 
1202   parse(ref_mem);
1203 
1204   mtr_start(&mtr);
1205 
1206   block = buf_page_get(page_id_t(ref_mem.m_space_id, ref_mem.m_page_no),
1207                        page_size, RW_S_LATCH, &mtr);
1208 
1209   page_type_t page_type = block->get_page_type();
1210 
1211   switch (page_type) {
1212     case FIL_PAGE_TYPE_LOB_FIRST: {
1213       first_page_t first_page(block, &mtr, (dict_index_t *)index);
1214       is_partially_updatable = first_page.can_be_partially_updated();
1215       break;
1216     }
1217     case FIL_PAGE_TYPE_ZLOB_FIRST: {
1218       z_first_page_t z_first_page(block, &mtr, (dict_index_t *)index);
1219       is_partially_updatable = z_first_page.can_be_partially_updated();
1220       break;
1221     }
1222     default:
1223       is_partially_updatable = false;
1224   }
1225 
1226   mtr_commit(&mtr);
1227 
1228   return (page_type);
1229 }
1230 
1231 /** Load the first page of the LOB and mark it as not partially
1232 updatable anymore.
1233 @param[in]	trx		the current transaction
1234 @param[in]	mtr		the mini transaction context.
1235 @param[in]	index		the index dictionary object.
1236 @param[in]	page_size	the page size information. */
mark_not_partially_updatable(trx_t * trx,mtr_t * mtr,dict_index_t * index,const page_size_t & page_size)1237 void ref_t::mark_not_partially_updatable(trx_t *trx, mtr_t *mtr,
1238                                          dict_index_t *index,
1239                                          const page_size_t &page_size) {
1240   buf_block_t *block;
1241   ref_mem_t ref_mem;
1242 
1243   parse(ref_mem);
1244 
1245   /* If LOB has already been purged, ignore it. */
1246   if (ref_mem.is_purged()) {
1247     return;
1248   }
1249 
1250   block = buf_page_get(page_id_t(ref_mem.m_space_id, ref_mem.m_page_no),
1251                        page_size, RW_X_LATCH, mtr);
1252 
1253   page_type_t page_type = block->get_page_type();
1254 
1255   switch (page_type) {
1256     case FIL_PAGE_TYPE_LOB_FIRST: {
1257       first_page_t first_page(block, mtr, (dict_index_t *)index);
1258       first_page.mark_cannot_be_partially_updated(trx);
1259       break;
1260     }
1261     case FIL_PAGE_TYPE_ZLOB_FIRST: {
1262       z_first_page_t z_first_page(block, mtr, (dict_index_t *)index);
1263       z_first_page.mark_cannot_be_partially_updated(trx);
1264       break;
1265     }
1266     default:
1267       /* do nothing */
1268       break;
1269   }
1270 }
1271 
1272 /** Check if the LOB can be partially updated. This is done by loading
1273 the first page of LOB and looking at the flags.
1274 @param[in]	index	the index to which LOB belongs.
1275 @return true if LOB is partially updatable, false otherwise.*/
is_lob_partially_updatable(const dict_index_t * index) const1276 bool ref_t::is_lob_partially_updatable(const dict_index_t *index) const {
1277   if (is_null_relaxed()) {
1278     return (false);
1279   }
1280 
1281   const page_size_t page_size = dict_table_page_size(index->table);
1282 
1283   if (page_size.is_compressed() && use_single_z_stream()) {
1284     return (false);
1285   }
1286 
1287   bool can_do_partial_update = false;
1288   ulint page_type = get_lob_page_info(index, page_size, can_do_partial_update);
1289 
1290   bool page_type_ok = (page_type == FIL_PAGE_TYPE_LOB_FIRST ||
1291                        page_type == FIL_PAGE_TYPE_ZLOB_FIRST);
1292 
1293   return (page_type_ok && can_do_partial_update);
1294 }
1295 
print(std::ostream & out) const1296 std::ostream &ref_t::print(std::ostream &out) const {
1297   out << "[ref_t: m_ref=" << (void *)m_ref << ", space_id=" << space_id()
1298       << ", page_no=" << page_no() << ", offset=" << offset()
1299       << ", length=" << length()
1300       << ", is_being_modified=" << is_being_modified() << "]";
1301   return (out);
1302 }
1303 
1304 #ifdef UNIV_DEBUG
check_space_id(dict_index_t * index) const1305 bool ref_t::check_space_id(dict_index_t *index) const {
1306   space_id_t idx_space_id = index->space;
1307   space_id_t ref_space_id = space_id();
1308 
1309   bool lob_ref_valid = (ref_space_id == 0 || idx_space_id == ref_space_id);
1310   return (lob_ref_valid);
1311 }
1312 #endif /* UNIV_DEBUG */
1313 
1314 /** Acquire an x-latch on the index page containing the clustered
1315 index record, in the given mini transaction context.
1316 @param[in]	mtr	the mini-transaction context. */
x_latch_rec_page(mtr_t * mtr)1317 void DeleteContext::x_latch_rec_page(mtr_t *mtr) {
1318   bool found;
1319   page_t *rec_page = m_blobref.page_align();
1320   page_no_t rec_page_no = page_get_page_no(rec_page);
1321   space_id_t rec_space_id = page_get_space_id(rec_page);
1322 
1323   const page_size_t &rec_page_size =
1324       fil_space_get_page_size(rec_space_id, &found);
1325   ut_ad(found);
1326 
1327 #ifdef UNIV_DEBUG
1328   buf_block_t *block =
1329 #endif /* UNIV_DEBUG */
1330       buf_page_get(page_id_t(rec_space_id, rec_page_no), rec_page_size,
1331                    RW_X_LATCH, mtr);
1332 
1333   ut_ad(block != nullptr);
1334 }
1335 
1336 #ifdef UNIV_DEBUG
rec_check_lobref_space_id(dict_index_t * index,const rec_t * rec,const ulint * offsets)1337 bool rec_check_lobref_space_id(dict_index_t *index, const rec_t *rec,
1338                                const ulint *offsets) {
1339   /* Make it more robust.  If rec pointer is null, don't do anything. */
1340   if (rec == nullptr) {
1341     return (true);
1342   }
1343 
1344   ut_ad(index->is_clustered());
1345   ut_ad(rec_offs_validate(rec, nullptr, offsets));
1346 
1347   const ulint n = rec_offs_n_fields(offsets);
1348 
1349   for (ulint i = 0; i < n; i++) {
1350     ulint len;
1351 
1352     if (rec_offs_nth_default(offsets, i)) {
1353       continue;
1354     }
1355 
1356     byte *data = rec_get_nth_field(rec, offsets, i, &len);
1357 
1358     if (len == UNIV_SQL_NULL) {
1359       continue;
1360     }
1361 
1362     if (rec_offs_nth_extern(offsets, i)) {
1363       ulint local_len = len - BTR_EXTERN_FIELD_REF_SIZE;
1364       ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
1365 
1366       byte *field_ref = data + local_len;
1367       ref_t ref(field_ref);
1368       if (!ref.check_space_id(index)) {
1369         return (false);
1370       }
1371     }
1372   }
1373   return (true);
1374 }
1375 #endif /* UNIV_DEBUG */
1376 
mark_not_partially_updatable(trx_t * trx,dict_index_t * index,const upd_t * update,mtr_t * mtr)1377 dberr_t mark_not_partially_updatable(trx_t *trx, dict_index_t *index,
1378                                      const upd_t *update, mtr_t *mtr) {
1379   if (!index->is_clustered()) {
1380     /* Only clustered index can have LOBs. */
1381     return (DB_SUCCESS);
1382   }
1383 
1384   const ulint n_fields = upd_get_n_fields(update);
1385 
1386   for (ulint i = 0; i < n_fields; i++) {
1387     const upd_field_t *ufield = upd_get_nth_field(update, i);
1388 
1389     if (update->is_partially_updated(ufield->field_no)) {
1390       continue;
1391     }
1392 
1393     if (ufield->is_virtual()) {
1394       continue;
1395     }
1396 
1397     const dfield_t *new_field = &ufield->new_val;
1398 
1399     if (ufield->ext_in_old && !dfield_is_ext(new_field)) {
1400       const dfield_t *old_field = &ufield->old_val;
1401       byte *field_ref = old_field->blobref();
1402       ref_t ref(field_ref);
1403 
1404       if (!ref.is_null_relaxed()) {
1405         ut_ad(ref.space_id() == index->space_id());
1406         ref.mark_not_partially_updatable(trx, mtr, index,
1407                                          index->get_page_size());
1408       }
1409     }
1410   }
1411 
1412   return (DB_SUCCESS);
1413 }
1414 
1415 }  // namespace lob
1416