1 /*****************************************************************************
2 
3 Copyright (c) 2016, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 
27 #include "lob0impl.h"
28 #include "lob0del.h"
29 #include "lob0index.h"
30 #include "lob0inf.h"
31 #include "lob0ins.h"
32 #include "lob0pages.h"
33 #include "lob0util.h"
34 #include "lob0zip.h"
35 #include "my_dbug.h"
36 #include "trx0sys.h"
37 #include "ut0ut.h"
38 #include "zlob0first.h"
39 #include "zlob0index.h"
40 #include "zlob0read.h"
41 
42 namespace lob {
43 
buf_block_set_next_page_no(buf_block_t * block,page_no_t next_page_no,mtr_t * mtr)44 static void buf_block_set_next_page_no(buf_block_t *block,
45                                        page_no_t next_page_no, mtr_t *mtr) {
46   mlog_write_ulint(block->frame + FIL_PAGE_NEXT, next_page_no, MLOG_4BYTES,
47                    mtr);
48 }
49 
50 #ifdef UNIV_DEBUG
51 /** Validate the page list.
52 @return true if valid, false otherwise. */
validate() const53 bool plist_base_node_t::validate() const {
54   ulint len = 0;
55   ulint exp = get_len();
56 
57   for (plist_node_t cur = get_first_node(); !cur.is_null();
58        cur = cur.get_next_node()) {
59     len++;
60     ut_ad(len <= exp);
61   }
62 
63   ut_ad(len == exp);
64   return (true);
65 }
66 #endif /* UNIV_DEBUG */
67 
68 /** Allocate one node page. */
alloc(first_page_t & first_page,bool bulk)69 buf_block_t *node_page_t::alloc(first_page_t &first_page, bool bulk) {
70   ut_ad(m_block == nullptr);
71   page_no_t hint = FIL_NULL;
72 
73   /* For testing purposes, pretend that the LOB page allocation failed.*/
74   DBUG_EXECUTE_IF("innodb_lob_alloc_node_page_failed", return (nullptr););
75 
76   m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
77 
78   if (m_block == nullptr) {
79     return (nullptr);
80   }
81 
82   set_page_type();
83   set_version_0();
84   set_next_page(first_page.get_next_page());
85   first_page.set_next_page(get_page_no());
86 
87   /* Use fully for the LOB index contents */
88   ulint lob_metadata_len = payload();
89   ulint node_count = lob_metadata_len / index_entry_t::SIZE;
90 
91   flst_base_node_t *free_list = first_page.free_list();
92 
93   byte *cur = nodes_begin();
94 
95   /* Populate the free list with empty index entry nodes. */
96   for (ulint i = 0; i < node_count; ++i) {
97     flst_add_last(free_list, cur, m_mtr);
98     cur += index_entry_t::SIZE;
99   }
100 
101   ut_ad(flst_validate(free_list, m_mtr));
102   return (m_block);
103 }
104 
print(std::ostream & out) const105 std::ostream &z_frag_entry_t::print(std::ostream &out) const {
106   out << "[z_frag_entry_t: prev=" << get_prev() << ", next=" << get_next()
107       << ", page_no=" << get_page_no() << ", n_frags=" << get_n_frags()
108       << ", used_len=" << get_used_len()
109       << ", total_free_len=" << get_total_free_len()
110       << ", big_free_len=" << get_big_free_len() << "]";
111   return (out);
112 }
113 
purge(flst_base_node_t * used_lst,flst_base_node_t * free_lst)114 void z_frag_entry_t::purge(flst_base_node_t *used_lst,
115                            flst_base_node_t *free_lst) {
116   remove(used_lst);
117   init();
118   push_front(free_lst);
119 }
120 
121 /** Update the current fragment entry with information about
122 the given fragment page.
123 @param[in]	frag_page	the fragment page whose information
124                                 will be stored in current fragment entry. */
update(const z_frag_page_t & frag_page)125 void z_frag_entry_t::update(const z_frag_page_t &frag_page) {
126   ut_ad(m_mtr != nullptr);
127 
128   set_page_no(frag_page.get_page_no());
129   set_n_frags(frag_page.get_n_frags());
130   set_used_len(frag_page.get_total_stored_data());
131   set_total_free_len(frag_page.get_total_free_len());
132   set_big_free_len(frag_page.get_big_free_len());
133 }
134 
free_frag_page(mtr_t * mtr,dict_index_t * index)135 void z_frag_entry_t::free_frag_page(mtr_t *mtr, dict_index_t *index) {
136   page_no_t page_no = get_page_no();
137   if (page_no != FIL_NULL) {
138     page_id_t page_id = page_id_t(index->space_id(), page_no);
139     page_size_t page_size = index->get_page_size();
140     buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, mtr);
141     btr_page_free_low(index, block, ULINT_UNDEFINED, mtr);
142     set_page_no(FIL_NULL);
143   }
144 }
145 
146 /** Insert a single zlib stream.
147 @param[in]	index	the index to which the LOB belongs.
148 @param[in]	first	the first page of the compressed LOB.
149 @param[in]	trxid	the id of the current transaction.
150 @param[in]	blob	in memory copy of the LOB.
151 @param[in]	len	the length of the LOB.
152 @param[in]	mtr	the mini transaction context.
153 @param[in]	bulk	true if bulk operation, false otherwise.
154 @param[out]	start_page_no	the first page into which zlib stream
155                                 was written.
156 @param[out]	frag_id	the fragment id that contains last part of the
157                         zlib stream.
158 @return DB_SUCCESS on success, error code on error. */
z_insert_strm(dict_index_t * index,z_first_page_t & first,trx_id_t trxid,byte * blob,ulint len,mtr_t * mtr,bool bulk,page_no_t & start_page_no,frag_id_t & frag_id)159 dberr_t z_insert_strm(dict_index_t *index, z_first_page_t &first,
160                       trx_id_t trxid, byte *blob, ulint len, mtr_t *mtr,
161                       bool bulk, page_no_t &start_page_no, frag_id_t &frag_id) {
162   ulint remain = len;
163   start_page_no = FIL_NULL;
164   frag_id = FRAG_ID_NULL;
165   page_no_t prev_page_no;
166   byte *lob_ptr = blob;
167   const page_no_t first_page_no = first.get_page_no();
168 
169 #ifdef UNIV_DEBUG
170   ulint frag_max_payload = z_frag_page_t::max_payload(index);
171 #endif /* UNIV_DEBUG */
172 
173   /* If the first page is empty, then make use of it. */
174   if (first.get_data_len() == 0) {
175     /* First page is unused. Use it. */
176     byte *ptr = first.begin_data_ptr();
177     ulint size = first.payload();
178     ulint to_copy = (remain > size) ? size : remain;
179     mlog_write_string(ptr, lob_ptr, to_copy, mtr);
180     remain -= to_copy;
181     lob_ptr += to_copy;
182 
183     start_page_no = first.get_page_no();
184     prev_page_no = start_page_no;
185 
186     first.set_data_len(to_copy);
187     first.set_trx_id(trxid);
188     first.set_next_page_null();
189 
190   } else if (!z_frag_page_t::can_data_fit(index, remain)) {
191     /* Data cannot fit into a fragment page. Allocate a data
192     page. */
193 
194     z_data_page_t data_page(mtr, index);
195     buf_block_t *tmp_block = data_page.alloc(first_page_no + 1, bulk);
196 
197     if (tmp_block == nullptr) {
198       return (DB_OUT_OF_FILE_SPACE);
199     }
200 
201     byte *ptr = data_page.begin_data_ptr();
202     ulint size = data_page.payload();
203     ulint to_copy = (remain > size) ? size : remain;
204 
205     /* Copy data into the page. */
206     mlog_write_string(ptr, lob_ptr, to_copy, mtr);
207 
208     remain -= to_copy;
209     lob_ptr += to_copy;
210 
211     start_page_no = data_page.get_page_no();
212     prev_page_no = start_page_no;
213 
214     data_page.set_data_len(to_copy);
215     data_page.set_trx_id(trxid);
216 
217   } else {
218     /* Data can fit into a fragment page. */
219     z_frag_page_t frag_page(mtr, index);
220     z_frag_entry_t frag_entry(mtr);
221 
222     frag_id = first.alloc_fragment(bulk, remain, frag_page, frag_entry);
223 
224     if (frag_id == FRAG_ID_NULL) {
225       return (DB_OUT_OF_FILE_SPACE);
226     }
227 
228 #ifdef UNIV_DEBUG
229     const ulint big_free_len_1 = frag_page.get_big_free_len();
230     const ulint big_free_len_2 = frag_entry.get_big_free_len();
231     ut_ad(big_free_len_1 == big_free_len_2);
232 #endif /* UNIV_DEBUG */
233 
234     frag_node_t node = frag_page.get_frag_node(frag_id);
235     byte *ptr = node.frag_begin();
236 
237     ut_ad(remain == node.payload());
238 
239     /* copy data to the page. */
240     mlog_write_string(ptr, lob_ptr, remain, mtr);
241 
242     start_page_no = frag_page.get_page_no();
243 
244     /* Update the frag entry. */
245     frag_entry.update(frag_page);
246 
247     return (DB_SUCCESS);
248   }
249 
250   /* As long as data cannot fit into a fragment page, use a data page. */
251   while (remain > 0 && !z_frag_page_t::can_data_fit(index, remain)) {
252     z_data_page_t data_page(mtr, index);
253     buf_block_t *new_block = data_page.alloc(first_page_no + 1, bulk);
254 
255     if (new_block == nullptr) {
256       return (DB_OUT_OF_FILE_SPACE);
257     }
258 
259     byte *ptr = data_page.begin_data_ptr();
260     ulint size = data_page.payload();
261     ulint to_copy = (remain > size) ? size : remain;
262 
263     mlog_write_string(ptr, lob_ptr, to_copy, mtr);
264 
265     remain -= to_copy;
266     lob_ptr += to_copy;
267 
268     data_page.set_data_len(to_copy);
269     data_page.set_trx_id(trxid);
270 
271     /* Get the previous page and update its next page. */
272     buf_block_t *block =
273         buf_page_get(page_id_t(dict_index_get_space(index), prev_page_no),
274                      dict_table_page_size(index->table), RW_X_LATCH, mtr);
275 
276     buf_block_set_next_page_no(block, data_page.get_page_no(), mtr);
277 
278     prev_page_no = data_page.get_page_no();
279   }
280 
281   if (remain > 0) {
282     ut_ad(remain <= frag_max_payload);
283     ut_ad(frag_id == FRAG_ID_NULL);
284     z_frag_page_t frag_page(mtr, index);
285     z_frag_entry_t frag_entry(mtr);
286 
287     frag_id = first.alloc_fragment(bulk, remain, frag_page, frag_entry);
288 
289     if (frag_id == FRAG_ID_NULL) {
290       return (DB_OUT_OF_FILE_SPACE);
291     }
292 
293 #ifdef UNIV_DEBUG
294     const ulint big_free_len_1 = frag_page.get_big_free_len();
295     const ulint big_free_len_2 = frag_entry.get_big_free_len();
296     ut_ad(big_free_len_1 == big_free_len_2);
297 #endif /* UNIV_DEBUG */
298 
299     frag_node_t node = frag_page.get_frag_node(frag_id);
300     byte *ptr = node.frag_begin();
301 
302 #ifdef UNIV_DEBUG
303     {
304       const ulint pl = node.payload();
305       ut_ad(remain <= pl);
306     }
307 #endif /* UNIV_DEBUG */
308 
309     mlog_write_string(ptr, lob_ptr, remain, mtr);
310 
311     /* Update the frag entry. */
312     frag_entry.update(frag_page);
313 
314     /* Get the previous page and update its next page. */
315     buf_block_t *block =
316         buf_page_get(page_id_t(dict_index_get_space(index), prev_page_no),
317                      dict_table_page_size(index->table), RW_X_LATCH, mtr);
318 
319     buf_block_set_next_page_no(block, frag_page.get_page_no(), mtr);
320   }
321 
322   return (DB_SUCCESS);
323 }
324 
325 /** Insert one chunk of input.  The maximum size of a chunk is Z_CHUNK_SIZE.
326 @param[in]  index      clustered index in which LOB is inserted.
327 @param[in]  first      the first page of the LOB.
328 @param[in]  trx        transaction doing the insertion.
329 @param[in]  ref        LOB reference in the clust rec.
330 @param[in]  blob       the uncompressed LOB to be inserted.
331 @param[in]  len        length of the blob.
332 @param[out] out_entry  the newly inserted index entry. can be NULL.
333 @param[in]  mtr        the mini transaction
334 @param[in]  bulk       true if it is bulk operation, false otherwise.
335 @return DB_SUCCESS on success, error code on failure. */
z_insert_chunk(dict_index_t * index,z_first_page_t & first,trx_t * trx,ref_t ref,byte * blob,ulint len,z_index_entry_t * out_entry,mtr_t * mtr,bool bulk)336 dberr_t z_insert_chunk(dict_index_t *index, z_first_page_t &first, trx_t *trx,
337                        ref_t ref, byte *blob, ulint len,
338                        z_index_entry_t *out_entry, mtr_t *mtr, bool bulk) {
339   ut_ad(len <= Z_CHUNK_SIZE);
340   ut_ad(first.get_page_type() == FIL_PAGE_TYPE_ZLOB_FIRST);
341   dberr_t err(DB_SUCCESS);
342 
343   const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
344   const undo_no_t undo_no = (trx == nullptr ? 0 : trx->undo_no - 1);
345   z_stream strm;
346 
347   strm.zalloc = nullptr;
348   strm.zfree = nullptr;
349   strm.opaque = nullptr;
350 
351   int ret = deflateInit(&strm, page_zip_level);
352 
353   ut_a(ret == Z_OK);
354 
355   strm.avail_in = static_cast<uInt>(len);
356   strm.next_in = blob;
357 
358   /* It is possible that the compressed stream is actually bigger.  So
359   making use of this call to find it out for sure. */
360   const ulint max_buf = deflateBound(&strm, static_cast<uLong>(len));
361 
362   std::unique_ptr<byte[]> tmpbuf(new byte[max_buf]);
363   strm.avail_out = static_cast<uInt>(max_buf);
364   strm.next_out = tmpbuf.get();
365 
366   ret = deflate(&strm, Z_FINISH);
367   ut_a(ret == Z_STREAM_END);
368   ut_a(strm.avail_in == 0);
369   ut_a(strm.total_out == (max_buf - strm.avail_out));
370 
371   page_no_t z_page_no;
372   frag_id_t z_frag_id;
373   err = z_insert_strm(index, first, trxid, tmpbuf.get(), strm.total_out, mtr,
374                       bulk, z_page_no, z_frag_id);
375 
376   if (err != DB_SUCCESS) {
377     deflateEnd(&strm);
378     return (err);
379   }
380 
381   z_index_entry_t entry = first.alloc_index_entry(bulk);
382 
383   if (entry.is_null()) {
384     deflateEnd(&strm);
385     return (DB_OUT_OF_FILE_SPACE);
386   }
387 
388   entry.set_trx_id(trxid);
389   entry.set_trx_id_modifier(trxid);
390   entry.set_trx_undo_no(undo_no);
391   entry.set_trx_undo_no_modifier(undo_no);
392   entry.set_z_page_no(z_page_no);
393   entry.set_z_frag_id(z_frag_id);
394   entry.set_data_len(len);
395   entry.set_zdata_len(strm.total_out);
396 
397   deflateEnd(&strm);
398 
399   if (out_entry != nullptr) {
400     out_entry->reset(entry);
401   }
402 
403   ut_ad(z_validate_strm(index, entry, mtr));
404   return (DB_SUCCESS);
405 }
406 
407 /** Insert a large object (LOB) into the system.
408 @param[in]      ctx    the B-tree context for this LOB operation.
409 @param[in]      trx    transaction doing the insertion.
410 @param[in,out]  ref    the LOB reference.
411 @param[in]      field  the LOB field.
412 @return DB_SUCCESS on success, error code on failure.*/
z_insert(InsertContext * ctx,trx_t * trx,ref_t & ref,big_rec_field_t * field,ulint field_j)413 dberr_t z_insert(InsertContext *ctx, trx_t *trx, ref_t &ref,
414                  big_rec_field_t *field, ulint field_j) {
415   byte *blob = field->ptr();
416   ulint len = field->len;
417   ulint remain = len;
418   byte *ptr = blob;
419   dberr_t err(DB_SUCCESS);
420   dict_index_t *index = ctx->index();
421   space_id_t space_id = dict_index_get_space(index);
422   byte *field_ref;
423   mtr_t *mtr = ctx->get_mtr();
424   const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
425   const ulint commit_freq = 4;
426 
427   ut_ad(remain > 0);
428 
429   if (ref.length() > 0) {
430     ref.set_length(len, nullptr);
431     if (!ctx->is_bulk()) {
432       ctx->zblob_write_blobref(field->field_no, ctx->m_mtr);
433     }
434   }
435 
436   const page_size_t page_size(dict_table_page_size(index->table));
437 
438   if (!ref_t::is_big(page_size, len)) {
439     /* The LOB is not big enough to build LOB index. Insert the
440     LOB without an LOB index. */
441     zInserter zblob_writer(ctx);
442     err = zblob_writer.prepare();
443     if (err == DB_SUCCESS) {
444       zblob_writer.write_one_small_blob(field_j);
445       err = zblob_writer.finish(false);
446     }
447     return (err);
448   }
449 
450   z_first_page_t first(mtr, index);
451   buf_block_t *first_block = first.alloc(ctx->is_bulk());
452 
453   if (first_block == nullptr) {
454     return (DB_OUT_OF_FILE_SPACE);
455   }
456 
457   first.init_lob_version();
458   first.set_last_trx_id(trxid);
459 
460   const page_no_t first_page_no = first.get_page_no();
461   const page_id_t first_page_id(dict_index_get_space(index), first_page_no);
462 
463   if (dict_index_is_online_ddl(index)) {
464     row_log_table_blob_alloc(index, first_page_no);
465   }
466 
467   flst_base_node_t *idx_list = first.index_list();
468 
469   ulint nth_chunk = 0;
470 
471   ut_o(const) ulint chunk_size = Z_CHUNK_SIZE;
472 
473   DBUG_EXECUTE_IF("zlob_reduce_chunk_size", chunk_size = 20000;);
474 
475   while (remain > 0) {
476     ut_ad(first.get_page_type() == FIL_PAGE_TYPE_ZLOB_FIRST);
477 
478     z_index_entry_t entry(mtr, index);
479     ulint size = (remain >= chunk_size) ? chunk_size : remain;
480 
481     err = z_insert_chunk(index, first, trx, ref, ptr, size, &entry, mtr,
482                          ctx->is_bulk());
483 
484     if (err != DB_SUCCESS) {
485       return (err);
486     }
487 
488     entry.set_lob_version(1);
489 
490     ptr += size;
491     remain -= size;
492 
493     entry.push_back(idx_list);
494 
495     if (++nth_chunk % commit_freq == 0) {
496       ctx->check_redolog();
497       field_ref = ctx->get_field_ref(field->field_no);
498       ref.set_ref(field_ref);
499       first.load_x(first_page_id, page_size);
500 
501       /* The first page could have been re-located.  Reset
502       the idx_list to the correct value. */
503       idx_list = first.index_list();
504     }
505   }
506 
507   /* Must have inserted atleast one chunk. */
508   ut_ad(nth_chunk > 0);
509 
510   field_ref = ctx->get_field_ref(field->field_no);
511   ref.set_ref(field_ref);
512 
513   ref.update(space_id, first_page_no, 1, nullptr);
514   ref.set_length(len, nullptr);
515 
516   ctx->make_nth_extern(field->field_no);
517 
518   if (!ctx->is_bulk()) {
519     ctx->zblob_write_blobref(field->field_no, ctx->m_mtr);
520   }
521 
522   /* If the full LOB could not be inserted, then we report error. */
523   ut_ad(remain == 0);
524 
525 #ifdef ZLOB_DEBUG
526   std::cout << "thread=" << std::this_thread::get_id()
527             << ", lob::z_insert(): table=" << ctx->index()->table->name
528             << ", ref=" << ref << std::endl;
529   first.print(std::cout);
530 #endif
531 
532   DBUG_EXECUTE_IF("innodb_zlob_print", z_print_info(index, ref, std::cerr););
533 
534   return (err);
535 }
536 
537 /** Print information about the given compressed lob.
538 @param[in]  index  the index dictionary object.
539 @param[in]  ref    the LOB reference
540 @param[out] out    the output stream where information is printed.
541 @return DB_SUCCESS on success, or an error code. */
z_print_info(const dict_index_t * index,const lob::ref_t & ref,std::ostream & out)542 dberr_t z_print_info(const dict_index_t *index, const lob::ref_t &ref,
543                      std::ostream &out) {
544   mtr_t mtr;
545   mtr_start(&mtr);
546   z_first_page_t first(&mtr, const_cast<dict_index_t *>(index));
547   first.load_x(ref.page_no());
548   first.print(out);
549   mtr_commit(&mtr);
550   return (DB_SUCCESS);
551 }
552 
alloc(z_first_page_t & first,page_no_t hint,bool bulk)553 buf_block_t *z_frag_page_t::alloc(z_first_page_t &first, page_no_t hint,
554                                   bool bulk) {
555   /* The m_block member could point to valid block.  Overwriting it is
556   good enough. */
557 
558   /* For testing purposes, pretend that the LOB page allocation failed.*/
559   DBUG_EXECUTE_IF("innodb_lob_alloc_z_frag_page_failed", return (nullptr););
560 
561   m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
562 
563   if (m_block == nullptr) {
564     return (nullptr);
565   }
566 
567   /* Set page type to FIL_PAGE_TYPE_ZLOB_FRAG. */
568   set_page_type();
569   set_version_0();
570 
571   /* All allocated fragment pages are linked via the next page of the first page
572    * of LOB. */
573   page_no_t frag_page_no = first.get_frag_page_no();
574 
575   if (frag_page_no == 0) {
576     /* If the frag_page_no is equal to 0, it means that this LOB was created
577      * before storing the fragment page list in the FIL_PAGE_PREV of the first
578      * page.  So don't change that. */
579   } else {
580     if (frag_page_no != FIL_NULL) {
581       /* Load the first fragment page and updates its prev page. */
582       z_frag_page_t tmp(m_mtr, m_index);
583       tmp.load_x(frag_page_no);
584       tmp.set_page_prev(get_page_no());
585     }
586     set_page_next(frag_page_no);
587     set_page_prev(FIL_NULL);
588     first.set_frag_page_no(get_page_no());
589   }
590 
591   set_frag_entry_null();
592 
593   /* Initialize the frag free list. */
594   plist_base_node_t fl = free_list();
595   fl.init();
596   ut_ad(fl.validate());
597 
598   /* Initialize the used frag list. */
599   plist_base_node_t frag_lst = frag_list();
600   frag_lst.init();
601   ut_ad(frag_lst.validate());
602 
603   byte *page = frame();
604 
605   /* Add the available space as free frag to free list. */
606   frag_node_t frag(page, page + OFFSET_FRAGS_BEGIN, payload(), m_mtr);
607   fl.push_front(frag.m_node);
608   frag.set_frag_id_null();
609 
610   ut_ad(fl.validate());
611   return (m_block);
612 }
613 
614 /** Determine if the given length of data can fit into a fragment page.
615 @param[in]   index   the clust index into which LOB is inserted.
616 @param[in]   data_size  The length of data to operate.
617 @return true if data can fit into fragment page, false otherwise. */
can_data_fit(dict_index_t * index,ulint data_size)618 bool z_frag_page_t::can_data_fit(dict_index_t *index, ulint data_size) {
619   ulint max_size = max_payload(index);
620 
621   /* Look for a fragment page only if the data to be stored is less
622   than a quarter of the size of the fragment page. */
623   return (data_size < (max_size / 4));
624 }
625 
alloc(z_first_page_t & first,bool bulk)626 buf_block_t *z_frag_node_page_t::alloc(z_first_page_t &first, bool bulk) {
627   ut_ad(m_block == nullptr);
628   page_no_t hint = FIL_NULL;
629 
630   DBUG_EXECUTE_IF("innodb_lob_alloc_z_frag_node_page_failed",
631                   return (nullptr););
632 
633   m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
634 
635   if (m_block == nullptr) {
636     return (nullptr);
637   }
638 
639   set_page_type();
640   set_version_0();
641   flst_base_node_t *free_lst = first.free_frag_list();
642   init(free_lst);
643 
644   /* Link the allocated index page to the first page. */
645   page_no_t page_no = first.get_frag_node_page_no();
646   set_next_page_no(page_no);
647   first.set_frag_node_page_no(get_page_no());
648   return (m_block);
649 }
650 
alloc_fragment(ulint size,z_frag_entry_t & entry)651 frag_id_t z_frag_page_t::alloc_fragment(ulint size, z_frag_entry_t &entry) {
652   plist_base_node_t free_lst = free_list();
653 
654   ut_ad(free_lst.get_len() > 0);
655 
656   const ulint big_free_len = get_big_free_len();
657   ut_d(bool visited_big_frag = false;);
658 
659   for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
660        cur = cur.get_next_node()) {
661     frag_node_t frag(cur, m_mtr);
662     const ulint total_len = frag.get_total_len();
663     const ulint payload = frag.payload();
664     const ulint overhead = frag_node_t::overhead();
665 
666     /* Get the biggest free fragment available. */
667     if (total_len != big_free_len) {
668       continue;
669     }
670 
671     ut_d(visited_big_frag = true;);
672 
673     bool exact_fit = false;
674 
675     if (is_last_frag(frag)) {
676       /* This fragment gives space for the directory
677       entry. */
678       ulint extra = frag_node_t::SIZE_OF_PAGE_DIR_ENTRY;
679       if (payload == (size + extra)) {
680         exact_fit = true;
681       }
682     } else {
683       /* This fragment does not give space for the
684       directory entry. */
685       if (payload == size) {
686         exact_fit = true;
687       }
688     }
689 
690     if (exact_fit) {
691       /* Allocate the fragment id. */
692       ulint frag_id = alloc_frag_id();
693       ut_ad(frag_id != FRAG_ID_NULL);
694 
695       /* this is the requested fragment. */
696       free_lst.remove(cur);
697       insert_into_frag_list(frag);
698 
699       frag.set_frag_id(frag_id);
700       set_nth_dir_entry(frag_id, frag.addr());
701       entry.update(*this);
702       return (frag_id);
703 
704     } else if (payload >= (size + overhead + 1)) {
705       /* Break the current fragment into two. Atleast 1 byte
706       payload must be there in the other node. */
707 
708       split_free_frag(frag, size);
709       free_lst.remove(frag.m_node);
710       insert_into_frag_list(frag);
711 
712       /* Allocate the fragment id. */
713       ulint frag_id = alloc_frag_id();
714       ut_ad(frag_id != FRAG_ID_NULL);
715 
716       frag.set_frag_id(frag_id);
717       set_nth_dir_entry(frag_id, frag.addr());
718       entry.update(*this);
719       return (frag_id);
720     }
721   }
722 
723   ut_ad(visited_big_frag);
724   return (FRAG_ID_NULL);
725 }
726 
727 /** Grow the frag directory by one entry.
728 @return the fragment identifier that was newly added. */
alloc_dir_entry()729 ulint z_frag_page_t::alloc_dir_entry() {
730   plist_base_node_t free_lst = free_list();
731   plist_node_t last = free_lst.get_last_node();
732   frag_node_t frag(last, m_mtr);
733   ulint len = frag.payload();
734 
735   /* The last free fragment must be adjacent to the directory.
736   Then only it can give space to one slot. */
737   if (frag.end_ptr() != slots_end_ptr()) {
738     ut_ad(0);
739     return (FRAG_ID_NULL);
740   }
741 
742   if (len <= SIZE_OF_PAGE_DIR_ENTRY) {
743     ut_ad(0);
744     return (FRAG_ID_NULL);
745   }
746 
747   incr_n_dir_entries();
748   frag.decr_length_by_2();
749   return (init_last_dir_entry());
750 }
751 
get_frag_entry_x()752 z_frag_entry_t z_frag_page_t::get_frag_entry_x() {
753   fil_addr_t node_loc = get_frag_entry_addr();
754   flst_node_t *node = addr2ptr_x(node_loc);
755   z_frag_entry_t entry(node, m_mtr);
756   ut_ad(entry.get_page_no() == get_page_no());
757   return (entry);
758 }
759 
get_frag_entry_s()760 z_frag_entry_t z_frag_page_t::get_frag_entry_s() {
761   fil_addr_t node_loc = get_frag_entry_addr();
762   flst_node_t *node = addr2ptr_s(node_loc);
763   z_frag_entry_t entry(node, m_mtr);
764   ut_ad(entry.get_page_no() == get_page_no());
765   return (entry);
766 }
767 
dealloc_with_entry(z_first_page_t & first,mtr_t * alloc_mtr)768 void z_frag_page_t::dealloc_with_entry(z_first_page_t &first,
769                                        mtr_t *alloc_mtr) {
770   ut_ad(get_n_frags() == 0);
771   z_frag_entry_t entry = get_frag_entry_x();
772   entry.purge(first.frag_list(), first.free_frag_list());
773 
774   page_no_t top_frag_page = first.get_frag_page_no();
775 
776   if (top_frag_page == 0) {
777     /* If the first page contains 0 in FIL_PAGE_PREV, then this LOB does not use
778      * FIL_PAGE_PREV to point to the doubly-linked list of fragment pages.  In
779      * this case, don't touch FIL_PAGE_PREV. */
780   } else {
781     page_no_t next_frag_page = get_next_page_no();
782     page_no_t prev_frag_page = get_prev_page_no();
783 
784     if (top_frag_page == get_page_no()) {
785       /* The fragment page pointed to by first LOB page is being deallocated. */
786       ut_ad(prev_frag_page == FIL_NULL);
787       first.set_frag_page_no(alloc_mtr, next_frag_page);
788     } else {
789       ut_ad(prev_frag_page != FIL_NULL);
790     }
791 
792     /* The fragment pages are doubly linked via FIL_PAGE_NEXT and
793      * FIL_PAGE_PREV. Update the links before deallocating a fragment page. */
794     if (next_frag_page != FIL_NULL) {
795       z_frag_page_t zfp_next(alloc_mtr, m_index);
796       zfp_next.load_x(next_frag_page);
797       zfp_next.set_page_prev(prev_frag_page);
798     }
799 
800     if (prev_frag_page != FIL_NULL) {
801       z_frag_page_t zfp_prev(alloc_mtr, m_index);
802       zfp_prev.load_x(prev_frag_page);
803       zfp_prev.set_page_next(next_frag_page);
804     }
805   }
806 
807   btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, alloc_mtr);
808   m_block = nullptr;
809 }
810 
print_frags_in_order(std::ostream & out) const811 std::ostream &z_frag_page_t::print_frags_in_order(std::ostream &out) const {
812   if (m_block == nullptr) {
813     return (out);
814   }
815 
816   plist_base_node_t free_lst = free_list();
817   plist_base_node_t frag_lst = frag_list();
818 
819   out << "[Free List: " << free_lst << "]" << std::endl;
820   out << "[Frag List: " << frag_lst << "]" << std::endl;
821 
822   frag_node_t cur_free(free_lst.get_first_node(), m_mtr);
823   frag_node_t cur_frag(frag_lst.get_first_node(), m_mtr);
824 
825   while (!cur_free.is_null() && !cur_frag.is_null()) {
826     if (cur_free.is_before(cur_frag)) {
827       out << "F: " << cur_free << std::endl;
828       cur_free = cur_free.get_next_node();
829     } else {
830       out << "U: " << cur_frag << std::endl;
831       cur_frag = cur_frag.get_next_node();
832     }
833   }
834 
835   if (cur_free.is_null()) {
836     while (!cur_frag.is_null()) {
837       out << "U: " << cur_frag << std::endl;
838       cur_frag = cur_frag.get_next_node();
839     }
840   }
841 
842   if (cur_frag.is_null()) {
843     while (!cur_free.is_null()) {
844       out << "F: " << cur_free << std::endl;
845       cur_free = cur_free.get_next_node();
846     }
847   }
848 
849   return (out);
850 }
851 
852 /** Get the total amount of stored data in this page. */
get_total_stored_data() const853 ulint z_frag_page_t::get_total_stored_data() const {
854   ulint len = 0;
855 
856   ut_ad(m_block != nullptr);
857 
858   plist_base_node_t frag_lst = frag_list();
859 
860   for (plist_node_t cur = frag_lst.get_first_node(); !cur.is_null();
861        cur = cur.get_next_node()) {
862     frag_node_t frag(cur, m_mtr);
863     len += frag.payload();
864   }
865 
866   return (len);
867 }
868 
869 /** Get the total cumulative free space in this page. */
get_total_free_len() const870 ulint z_frag_page_t::get_total_free_len() const {
871   ulint len = 0;
872 
873   ut_ad(m_block != nullptr);
874 
875   plist_base_node_t free_lst = free_list();
876   for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
877        cur = cur.get_next_node()) {
878     frag_node_t frag(cur, m_mtr);
879     len += frag.payload();
880   }
881   return (len);
882 }
883 
884 /** Get the big free space in this page. */
get_big_free_len() const885 ulint z_frag_page_t::get_big_free_len() const {
886   ulint big = 0;
887 
888   ut_ad(m_block != nullptr);
889 
890   plist_base_node_t free_lst = free_list();
891   for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
892        cur = cur.get_next_node()) {
893     frag_node_t frag(cur, m_mtr);
894 
895     /* Use the total length (including the meta data overhead) of the
896     fragment. */
897     ulint total_free = frag.get_total_len();
898     if (total_free > big) {
899       big = total_free;
900     }
901   }
902 
903   return (big);
904 }
905 
906 /** Deallocate all the free slots from the end of the page directory. */
dealloc_frag_id()907 void z_frag_page_t::dealloc_frag_id() {
908   plist_base_node_t free_lst = free_list();
909   plist_node_t last = free_lst.get_last_node();
910   frag_node_t frag(last, m_mtr);
911   /* The last free fragment must be adjacent to the directory.
912   Then only it can take space from one slot. */
913   if (frag.end_ptr() != slots_end_ptr()) {
914     return;
915   }
916 
917   ulint frag_id = get_n_dir_entries() - 1;
918   paddr_t addr = frag_id_to_addr(frag_id);
919   while (addr == 0) {
920     frag.incr_length_by_2();
921     decr_n_dir_entries();
922     if (frag_id == 0) {
923       break;
924     }
925     frag_id--;
926     addr = frag_id_to_addr(frag_id);
927   }
928 }
929 
930 /** Insert a large object (LOB) into the system.
931 @param[in]      ctx    the B-tree context for this LOB operation.
932 @param[in]      trx    transaction doing the insertion.
933 @param[in,out]  ref    the LOB reference.
934 @param[in]      field  the LOB field.
935 @return DB_SUCCESS on success, error code on failure.*/
insert(InsertContext * ctx,trx_t * trx,ref_t & ref,big_rec_field_t * field,ulint field_j)936 dberr_t insert(InsertContext *ctx, trx_t *trx, ref_t &ref,
937                big_rec_field_t *field, ulint field_j) {
938   const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
939   const undo_no_t undo_no = (trx == nullptr ? 0 : trx->undo_no - 1);
940   dberr_t ret = DB_SUCCESS;
941   ulint total_written = 0;
942   const byte *ptr = field->ptr();
943   ulint len = field->len;
944   mtr_t *mtr = ctx->get_mtr();
945   dict_index_t *index = ctx->index();
946   space_id_t space_id = dict_index_get_space(index);
947   page_size_t page_size(dict_table_page_size(index->table));
948   DBUG_TRACE;
949 
950   if (ref.length() > 0) {
951     ref.set_length(0, mtr);
952   }
953 
954   if (!ref_t::is_big(page_size, len)) {
955     /* The LOB is not big enough to build LOB index. Insert the LOB without an
956     LOB index. */
957     Inserter blob_writer(ctx);
958     return blob_writer.write_one_small_blob(field_j);
959   }
960 
961   ut_ad(ref_t::is_big(page_size, len));
962 
963   DBUG_LOG("lob", PrintBuffer(ptr, len));
964   ut_ad(ref.validate(ctx->get_mtr()));
965 
966   first_page_t first(mtr, index);
967   buf_block_t *first_block = first.alloc(mtr, ctx->is_bulk());
968 
969   if (first_block == nullptr) {
970     /* Allocation of the first page of LOB failed. */
971     return DB_OUT_OF_FILE_SPACE;
972   }
973 
974   first.set_last_trx_id(trxid);
975   first.init_lob_version();
976 
977   page_no_t first_page_no = first.get_page_no();
978 
979   if (dict_index_is_online_ddl(index)) {
980     row_log_table_blob_alloc(index, first_page_no);
981   }
982 
983   page_id_t first_page_id(space_id, first_page_no);
984 
985   flst_base_node_t *index_list = first.index_list();
986 
987   ulint to_write = first.write(trxid, ptr, len);
988   total_written += to_write;
989   ulint remaining = len;
990 
991   {
992     /* Insert an index entry in LOB index. */
993     flst_node_t *node = first.alloc_index_entry(ctx->is_bulk());
994 
995     /* Here the first index entry is being allocated.  Since this will be
996     allocated in the first page of LOB, it cannot be nullptr. */
997     ut_ad(node != nullptr);
998 
999     index_entry_t entry(node, mtr, index);
1000     entry.set_versions_null();
1001     entry.set_trx_id(trxid);
1002     entry.set_trx_id_modifier(trxid);
1003     entry.set_trx_undo_no(undo_no);
1004     entry.set_trx_undo_no_modifier(undo_no);
1005     entry.set_page_no(first.get_page_no());
1006     entry.set_data_len(to_write);
1007     entry.set_lob_version(1);
1008     flst_add_last(index_list, node, mtr);
1009 
1010     first.set_trx_id(trxid);
1011     first.set_data_len(to_write);
1012   }
1013 
1014   ulint nth_blob_page = 0;
1015   const ulint commit_freq = 4;
1016 
1017   while (remaining > 0) {
1018     data_page_t data_page(mtr, index);
1019     buf_block_t *block = data_page.alloc(mtr, ctx->is_bulk());
1020 
1021     if (block == nullptr) {
1022       ret = DB_OUT_OF_FILE_SPACE;
1023       break;
1024     }
1025 
1026     to_write = data_page.write(trxid, ptr, remaining);
1027     total_written += to_write;
1028     data_page.set_trx_id(trxid);
1029 
1030     /* Allocate a new index entry */
1031     flst_node_t *node = first.alloc_index_entry(ctx->is_bulk());
1032 
1033     if (node == nullptr) {
1034       ret = DB_OUT_OF_FILE_SPACE;
1035       break;
1036     }
1037 
1038     index_entry_t entry(node, mtr, index);
1039     entry.set_versions_null();
1040     entry.set_trx_id(trxid);
1041     entry.set_trx_id_modifier(trxid);
1042     entry.set_trx_undo_no(undo_no);
1043     entry.set_trx_undo_no_modifier(undo_no);
1044     entry.set_page_no(data_page.get_page_no());
1045     entry.set_data_len(to_write);
1046     entry.set_lob_version(1);
1047     entry.push_back(first.index_list());
1048 
1049     ut_ad(!entry.get_self().is_equal(entry.get_prev()));
1050     ut_ad(!entry.get_self().is_equal(entry.get_next()));
1051 
1052     page_type_t type = fil_page_get_type(block->frame);
1053     ut_a(type == FIL_PAGE_TYPE_LOB_DATA);
1054 
1055     if (++nth_blob_page % commit_freq == 0) {
1056       ctx->check_redolog();
1057       ref.set_ref(ctx->get_field_ref(field->field_no));
1058       first.load_x(first_page_id, page_size);
1059     }
1060   }
1061 
1062   if (ret == DB_SUCCESS) {
1063     ref.update(space_id, first_page_no, 1, mtr);
1064     ref.set_length(total_written, mtr);
1065   }
1066 
1067   DBUG_EXECUTE_IF("innodb_lob_print",
1068                   print(trx, index, std::cerr, ref, false););
1069 
1070   DBUG_EXECUTE_IF("btr_store_big_rec_extern", ret = DB_OUT_OF_FILE_SPACE;);
1071   return ret;
1072 }
1073 
1074 /** Fetch a large object (LOB) from the system.
1075 @param[in]  ctx    the read context information.
1076 @param[in]  ref    the LOB reference identifying the LOB.
1077 @param[in]  offset read the LOB from the given offset.
1078 @param[in]  len    the length of LOB data that needs to be fetched.
1079 @param[out] buf    the output buffer (owned by caller) of minimum len bytes.
1080 @return the amount of data (in bytes) that was actually read. */
read(ReadContext * ctx,ref_t ref,ulint offset,ulint len,byte * buf)1081 ulint read(ReadContext *ctx, ref_t ref, ulint offset, ulint len, byte *buf) {
1082   DBUG_TRACE;
1083   ut_ad(offset == 0);
1084   const uint32_t lob_version = ref.version();
1085 
1086   ref_mem_t ref_mem;
1087   ref.parse(ref_mem);
1088 
1089 #ifdef LOB_DEBUG
1090   std::cout << "thread=" << std::this_thread::get_id()
1091             << ", lob::read(): table=" << ctx->index()->table->name
1092             << ", ref=" << ref << std::endl;
1093 #endif /* LOB_DEBUG */
1094 
1095   /* Cache of s-latched blocks of LOB index pages.*/
1096   BlockCache cached_blocks;
1097 
1098   ut_ad(len > 0);
1099 
1100   /* Obtain length of LOB available in clustered index.*/
1101   const ulint avail_lob = ref.length();
1102 
1103   if (avail_lob == 0) {
1104     return 0;
1105   }
1106 
1107   if (ref.is_being_modified()) {
1108     /* This should happen only for READ UNCOMMITTED transactions. */
1109     ut_ad(ctx->assert_read_uncommitted());
1110     return 0;
1111   }
1112 
1113   ut_ad(ctx->m_index->is_clustered());
1114 
1115   ulint total_read = 0;
1116   ulint actual_read = 0;
1117   page_no_t page_no = ref.page_no();
1118   const page_id_t page_id(ctx->m_space_id, page_no);
1119   mtr_t mtr;
1120 
1121   mtr_start(&mtr);
1122 
1123   first_page_t first_page(&mtr, ctx->m_index);
1124   first_page.load_s(page_id, ctx->m_page_size);
1125 
1126   page_type_t page_type = first_page.get_page_type();
1127 
1128   if (page_type == FIL_PAGE_TYPE_BLOB || page_type == FIL_PAGE_SDI_BLOB) {
1129     mtr_commit(&mtr);
1130     Reader reader(*ctx);
1131     ulint fetch_len = reader.fetch();
1132     return fetch_len;
1133   }
1134 
1135   ut_ad(page_type == FIL_PAGE_TYPE_LOB_FIRST);
1136 
1137   cached_blocks.insert(
1138       std::pair<page_no_t, buf_block_t *>(page_no, first_page.get_block()));
1139 
1140   ctx->m_lob_version = first_page.get_lob_version();
1141 
1142   page_no_t first_page_no = first_page.get_page_no();
1143 
1144   flst_base_node_t *base_node = first_page.index_list();
1145 
1146   fil_addr_t node_loc = flst_get_first(base_node, &mtr);
1147   flst_node_t *node = nullptr;
1148 
1149   /* Total bytes that have been skipped in this LOB */
1150   ulint skipped = 0;
1151 
1152   index_entry_t cur_entry(&mtr, ctx->m_index);
1153   index_entry_t old_version(&mtr, ctx->m_index);
1154   index_entry_mem_t entry_mem;
1155 
1156   ut_ad(offset >= skipped);
1157 
1158   ulint page_offset = offset - skipped;
1159   ulint want = len;
1160   byte *ptr = buf;
1161 
1162   /* Use a different mtr for data pages. */
1163   mtr_t data_mtr;
1164   mtr_start(&data_mtr);
1165   const ulint commit_freq = 10;
1166   ulint data_pages_count = 0;
1167 
1168   while (!fil_addr_is_null(node_loc) && want > 0) {
1169     old_version.reset(nullptr);
1170 
1171     node = first_page.addr2ptr_s_cache(cached_blocks, node_loc);
1172     cur_entry.reset(node);
1173 
1174     cur_entry.read(entry_mem);
1175 
1176     const uint32_t entry_lob_version = cur_entry.get_lob_version();
1177 
1178     if (entry_lob_version > lob_version) {
1179       flst_base_node_t *ver_list = cur_entry.get_versions_list();
1180       /* Look at older versions. */
1181       fil_addr_t node_versions = flst_get_first(ver_list, &mtr);
1182 
1183       while (!fil_addr_is_null(node_versions)) {
1184         flst_node_t *node_old_version =
1185             first_page.addr2ptr_s_cache(cached_blocks, node_versions);
1186 
1187         old_version.reset(node_old_version);
1188 
1189         old_version.read(entry_mem);
1190 
1191         const uint32_t old_lob_version = old_version.get_lob_version();
1192 
1193         if (old_lob_version <= lob_version) {
1194           /* The current trx can see this
1195           entry. */
1196           break;
1197         }
1198         node_versions = old_version.get_next();
1199         old_version.reset(nullptr);
1200       }
1201     }
1202 
1203     page_no_t read_from_page_no = FIL_NULL;
1204 
1205     if (old_version.is_null()) {
1206       read_from_page_no = cur_entry.get_page_no();
1207     } else {
1208       read_from_page_no = old_version.get_page_no();
1209     }
1210 
1211     actual_read = 0;
1212     if (read_from_page_no != FIL_NULL) {
1213       if (read_from_page_no == first_page_no) {
1214         actual_read = first_page.read(page_offset, ptr, want);
1215         ptr += actual_read;
1216         want -= actual_read;
1217 
1218       } else {
1219         buf_block_t *block =
1220             buf_page_get(page_id_t(ctx->m_space_id, read_from_page_no),
1221                          ctx->m_page_size, RW_S_LATCH, &data_mtr);
1222 
1223         data_page_t page(block, &data_mtr);
1224         actual_read = page.read(page_offset, ptr, want);
1225         ptr += actual_read;
1226         want -= actual_read;
1227 
1228         page_type_t type = page.get_page_type();
1229         ut_a(type == FIL_PAGE_TYPE_LOB_DATA);
1230 
1231         if (++data_pages_count % commit_freq == 0) {
1232           mtr_commit(&data_mtr);
1233           mtr_start(&data_mtr);
1234         }
1235       }
1236     }
1237 
1238     total_read += actual_read;
1239     page_offset = 0;
1240     node_loc = cur_entry.get_next();
1241   }
1242 
1243   /* Assert that we have read what has been requested or what is
1244   available. */
1245   ut_ad(total_read == len || total_read == avail_lob);
1246   ut_ad(total_read <= avail_lob);
1247 
1248   mtr_commit(&mtr);
1249   mtr_commit(&data_mtr);
1250   return total_read;
1251 }
1252 
alloc(z_first_page_t & first,bool bulk)1253 buf_block_t *z_index_page_t::alloc(z_first_page_t &first, bool bulk) {
1254   ut_ad(m_block == nullptr);
1255   page_no_t hint = FIL_NULL;
1256 
1257   /* For testing purposes, pretend that the LOB page allocation failed.*/
1258   DBUG_EXECUTE_IF("innodb_lob_alloc_z_index_page_failed", return (nullptr););
1259 
1260   m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
1261 
1262   if (m_block == nullptr) {
1263     return (nullptr);
1264   }
1265 
1266   set_page_type(m_mtr);
1267   set_version_0();
1268   flst_base_node_t *free_lst = first.free_list();
1269   init(free_lst, m_mtr);
1270 
1271   /* Link the allocated index page to the first page. */
1272   page_no_t page_no = first.get_index_page_no();
1273   set_next_page_no(page_no);
1274   first.set_index_page_no(get_page_no());
1275   return (m_block);
1276 }
1277 
1278 /** Allocate one data page.
1279 @param[in]	hint	hint page number for allocation.
1280 @param[in]	bulk	true if bulk operation (OPCODE_INSERT_BULK)
1281                         false otherwise.
1282 @return the allocated buffer block. */
alloc(page_no_t hint,bool bulk)1283 buf_block_t *z_data_page_t::alloc(page_no_t hint, bool bulk) {
1284   ut_ad(m_block == nullptr);
1285 
1286   /* For testing purposes, pretend that the LOB page allocation failed.*/
1287   DBUG_EXECUTE_IF("innodb_lob_alloc_z_data_page_failed", return (nullptr););
1288 
1289   m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
1290 
1291   if (m_block == nullptr) {
1292     return (nullptr);
1293   }
1294 
1295   init();
1296   return (m_block);
1297 }
1298 
init(flst_base_node_t * free_lst,mtr_t * mtr)1299 void z_index_page_t::init(flst_base_node_t *free_lst, mtr_t *mtr) {
1300   ulint n = get_n_index_entries();
1301   for (ulint i = 0; i < n; ++i) {
1302     byte *ptr = frame() + LOB_PAGE_DATA;
1303     ptr += (i * z_index_entry_t::SIZE);
1304     z_index_entry_t entry(ptr, mtr);
1305     entry.init();
1306     entry.push_back(free_lst);
1307   }
1308 }
1309 
get_n_index_entries() const1310 ulint z_index_page_t::get_n_index_entries() const {
1311   return (payload() / z_index_entry_t::SIZE);
1312 }
1313 
node_count()1314 ulint node_page_t::node_count() {
1315   return (max_space_available() / index_entry_t::SIZE);
1316 }
1317 
import(trx_id_t trx_id)1318 void node_page_t::import(trx_id_t trx_id) {
1319   ulint nc = node_count();
1320   byte *cur = nodes_begin();
1321 
1322   /* Update the trx id */
1323   for (ulint i = 0; i < nc; ++i) {
1324     index_entry_t entry(cur, m_mtr, m_index);
1325     entry.set_trx_id_no_redo(trx_id);
1326     entry.set_trx_id_modifier_no_redo(trx_id);
1327 
1328     cur += index_entry_t::SIZE;
1329   }
1330 }
1331 
1332 /** Print information about the given LOB.
1333 @param[in]  trx  the current transaction.
1334 @param[in]  index  the clust index that contains the LOB.
1335 @param[in]  out    the output stream into which LOB info is printed.
1336 @param[in]  ref    the LOB reference
1337 @param[in]  fatal  if true assert at end of function. */
print(trx_t * trx,dict_index_t * index,std::ostream & out,ref_t ref,bool fatal)1338 void print(trx_t *trx, dict_index_t *index, std::ostream &out, ref_t ref,
1339            bool fatal) {
1340   trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
1341 
1342   out << "[lob::print: trx_id=" << trxid << ", ";
1343 
1344   mtr_t mtr;
1345 
1346   /* Print the lob reference object. */
1347   space_id_t space_id = ref.space_id();
1348   page_no_t page_no = ref.page_no();
1349   ulint avail_lob = ref.length();
1350 
1351   out << "avail_lob=" << avail_lob << ", ";
1352   out << ref;
1353 
1354   const page_id_t first_page_id(space_id, page_no);
1355   const page_size_t page_size = dict_table_page_size(index->table);
1356 
1357   /* Load the first page of LOB */
1358   mtr_start(&mtr);
1359 
1360   first_page_t first_page(&mtr, index);
1361   first_page.load_x(first_page_id, page_size);
1362 
1363   first_page.print_index_entries(out);
1364   mtr_commit(&mtr);
1365   out << "]";
1366 
1367   if (fatal) {
1368     ut_error;
1369   }
1370 }
1371 
import(trx_id_t trx_id)1372 void z_index_page_t::import(trx_id_t trx_id) {
1373   ulint n = get_n_index_entries();
1374   for (ulint i = 0; i < n; ++i) {
1375     byte *ptr = frame() + LOB_PAGE_DATA;
1376     ptr += (i * z_index_entry_t::SIZE);
1377     z_index_entry_t entry(ptr);
1378     entry.set_trx_id_no_redo(trx_id);
1379     entry.set_trx_id_modifier_no_redo(trx_id);
1380   }
1381 }
1382 
1383 }  // namespace lob
1384