1 /*****************************************************************************
2
3 Copyright (c) 2016, 2019, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
24
25 *****************************************************************************/
26
27 #include "lob0impl.h"
28 #include "lob0del.h"
29 #include "lob0index.h"
30 #include "lob0inf.h"
31 #include "lob0ins.h"
32 #include "lob0pages.h"
33 #include "lob0util.h"
34 #include "lob0zip.h"
35 #include "my_dbug.h"
36 #include "trx0sys.h"
37 #include "ut0ut.h"
38 #include "zlob0first.h"
39 #include "zlob0index.h"
40 #include "zlob0read.h"
41
42 namespace lob {
43
buf_block_set_next_page_no(buf_block_t * block,page_no_t next_page_no,mtr_t * mtr)44 static void buf_block_set_next_page_no(buf_block_t *block,
45 page_no_t next_page_no, mtr_t *mtr) {
46 mlog_write_ulint(block->frame + FIL_PAGE_NEXT, next_page_no, MLOG_4BYTES,
47 mtr);
48 }
49
50 #ifdef UNIV_DEBUG
51 /** Validate the page list.
52 @return true if valid, false otherwise. */
validate() const53 bool plist_base_node_t::validate() const {
54 ulint len = 0;
55 ulint exp = get_len();
56
57 for (plist_node_t cur = get_first_node(); !cur.is_null();
58 cur = cur.get_next_node()) {
59 len++;
60 ut_ad(len <= exp);
61 }
62
63 ut_ad(len == exp);
64 return (true);
65 }
66 #endif /* UNIV_DEBUG */
67
68 /** Allocate one node page. */
alloc(first_page_t & first_page,bool bulk)69 buf_block_t *node_page_t::alloc(first_page_t &first_page, bool bulk) {
70 ut_ad(m_block == nullptr);
71 page_no_t hint = FIL_NULL;
72
73 /* For testing purposes, pretend that the LOB page allocation failed.*/
74 DBUG_EXECUTE_IF("innodb_lob_alloc_node_page_failed", return (nullptr););
75
76 m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
77
78 if (m_block == nullptr) {
79 return (nullptr);
80 }
81
82 set_page_type();
83 set_version_0();
84 set_next_page(first_page.get_next_page());
85 first_page.set_next_page(get_page_no());
86
87 /* Use fully for the LOB index contents */
88 ulint lob_metadata_len = payload();
89 ulint node_count = lob_metadata_len / index_entry_t::SIZE;
90
91 flst_base_node_t *free_list = first_page.free_list();
92
93 byte *cur = nodes_begin();
94
95 /* Populate the free list with empty index entry nodes. */
96 for (ulint i = 0; i < node_count; ++i) {
97 flst_add_last(free_list, cur, m_mtr);
98 cur += index_entry_t::SIZE;
99 }
100
101 ut_ad(flst_validate(free_list, m_mtr));
102 return (m_block);
103 }
104
print(std::ostream & out) const105 std::ostream &z_frag_entry_t::print(std::ostream &out) const {
106 out << "[z_frag_entry_t: prev=" << get_prev() << ", next=" << get_next()
107 << ", page_no=" << get_page_no() << ", n_frags=" << get_n_frags()
108 << ", used_len=" << get_used_len()
109 << ", total_free_len=" << get_total_free_len()
110 << ", big_free_len=" << get_big_free_len() << "]";
111 return (out);
112 }
113
purge(flst_base_node_t * used_lst,flst_base_node_t * free_lst)114 void z_frag_entry_t::purge(flst_base_node_t *used_lst,
115 flst_base_node_t *free_lst) {
116 remove(used_lst);
117 init();
118 push_front(free_lst);
119 }
120
121 /** Update the current fragment entry with information about
122 the given fragment page.
123 @param[in] frag_page the fragment page whose information
124 will be stored in current fragment entry. */
update(const z_frag_page_t & frag_page)125 void z_frag_entry_t::update(const z_frag_page_t &frag_page) {
126 ut_ad(m_mtr != nullptr);
127
128 set_page_no(frag_page.get_page_no());
129 set_n_frags(frag_page.get_n_frags());
130 set_used_len(frag_page.get_total_stored_data());
131 set_total_free_len(frag_page.get_total_free_len());
132 set_big_free_len(frag_page.get_big_free_len());
133 }
134
free_frag_page(mtr_t * mtr,dict_index_t * index)135 void z_frag_entry_t::free_frag_page(mtr_t *mtr, dict_index_t *index) {
136 page_no_t page_no = get_page_no();
137 if (page_no != FIL_NULL) {
138 page_id_t page_id = page_id_t(index->space_id(), page_no);
139 page_size_t page_size = index->get_page_size();
140 buf_block_t *block = buf_page_get(page_id, page_size, RW_X_LATCH, mtr);
141 btr_page_free_low(index, block, ULINT_UNDEFINED, mtr);
142 set_page_no(FIL_NULL);
143 }
144 }
145
146 /** Insert a single zlib stream.
147 @param[in] index the index to which the LOB belongs.
148 @param[in] first the first page of the compressed LOB.
149 @param[in] trxid the id of the current transaction.
150 @param[in] blob in memory copy of the LOB.
151 @param[in] len the length of the LOB.
152 @param[in] mtr the mini transaction context.
153 @param[in] bulk true if bulk operation, false otherwise.
154 @param[out] start_page_no the first page into which zlib stream
155 was written.
156 @param[out] frag_id the fragment id that contains last part of the
157 zlib stream.
158 @return DB_SUCCESS on success, error code on error. */
z_insert_strm(dict_index_t * index,z_first_page_t & first,trx_id_t trxid,byte * blob,ulint len,mtr_t * mtr,bool bulk,page_no_t & start_page_no,frag_id_t & frag_id)159 dberr_t z_insert_strm(dict_index_t *index, z_first_page_t &first,
160 trx_id_t trxid, byte *blob, ulint len, mtr_t *mtr,
161 bool bulk, page_no_t &start_page_no, frag_id_t &frag_id) {
162 ulint remain = len;
163 start_page_no = FIL_NULL;
164 frag_id = FRAG_ID_NULL;
165 page_no_t prev_page_no;
166 byte *lob_ptr = blob;
167 const page_no_t first_page_no = first.get_page_no();
168
169 #ifdef UNIV_DEBUG
170 ulint frag_max_payload = z_frag_page_t::max_payload(index);
171 #endif /* UNIV_DEBUG */
172
173 /* If the first page is empty, then make use of it. */
174 if (first.get_data_len() == 0) {
175 /* First page is unused. Use it. */
176 byte *ptr = first.begin_data_ptr();
177 ulint size = first.payload();
178 ulint to_copy = (remain > size) ? size : remain;
179 mlog_write_string(ptr, lob_ptr, to_copy, mtr);
180 remain -= to_copy;
181 lob_ptr += to_copy;
182
183 start_page_no = first.get_page_no();
184 prev_page_no = start_page_no;
185
186 first.set_data_len(to_copy);
187 first.set_trx_id(trxid);
188 first.set_next_page_null();
189
190 } else if (!z_frag_page_t::can_data_fit(index, remain)) {
191 /* Data cannot fit into a fragment page. Allocate a data
192 page. */
193
194 z_data_page_t data_page(mtr, index);
195 buf_block_t *tmp_block = data_page.alloc(first_page_no + 1, bulk);
196
197 if (tmp_block == nullptr) {
198 return (DB_OUT_OF_FILE_SPACE);
199 }
200
201 byte *ptr = data_page.begin_data_ptr();
202 ulint size = data_page.payload();
203 ulint to_copy = (remain > size) ? size : remain;
204
205 /* Copy data into the page. */
206 mlog_write_string(ptr, lob_ptr, to_copy, mtr);
207
208 remain -= to_copy;
209 lob_ptr += to_copy;
210
211 start_page_no = data_page.get_page_no();
212 prev_page_no = start_page_no;
213
214 data_page.set_data_len(to_copy);
215 data_page.set_trx_id(trxid);
216
217 } else {
218 /* Data can fit into a fragment page. */
219 z_frag_page_t frag_page(mtr, index);
220 z_frag_entry_t frag_entry(mtr);
221
222 frag_id = first.alloc_fragment(bulk, remain, frag_page, frag_entry);
223
224 if (frag_id == FRAG_ID_NULL) {
225 return (DB_OUT_OF_FILE_SPACE);
226 }
227
228 #ifdef UNIV_DEBUG
229 const ulint big_free_len_1 = frag_page.get_big_free_len();
230 const ulint big_free_len_2 = frag_entry.get_big_free_len();
231 ut_ad(big_free_len_1 == big_free_len_2);
232 #endif /* UNIV_DEBUG */
233
234 frag_node_t node = frag_page.get_frag_node(frag_id);
235 byte *ptr = node.frag_begin();
236
237 ut_ad(remain == node.payload());
238
239 /* copy data to the page. */
240 mlog_write_string(ptr, lob_ptr, remain, mtr);
241
242 start_page_no = frag_page.get_page_no();
243
244 /* Update the frag entry. */
245 frag_entry.update(frag_page);
246
247 return (DB_SUCCESS);
248 }
249
250 /* As long as data cannot fit into a fragment page, use a data page. */
251 while (remain > 0 && !z_frag_page_t::can_data_fit(index, remain)) {
252 z_data_page_t data_page(mtr, index);
253 buf_block_t *new_block = data_page.alloc(first_page_no + 1, bulk);
254
255 if (new_block == nullptr) {
256 return (DB_OUT_OF_FILE_SPACE);
257 }
258
259 byte *ptr = data_page.begin_data_ptr();
260 ulint size = data_page.payload();
261 ulint to_copy = (remain > size) ? size : remain;
262
263 mlog_write_string(ptr, lob_ptr, to_copy, mtr);
264
265 remain -= to_copy;
266 lob_ptr += to_copy;
267
268 data_page.set_data_len(to_copy);
269 data_page.set_trx_id(trxid);
270
271 /* Get the previous page and update its next page. */
272 buf_block_t *block =
273 buf_page_get(page_id_t(dict_index_get_space(index), prev_page_no),
274 dict_table_page_size(index->table), RW_X_LATCH, mtr);
275
276 buf_block_set_next_page_no(block, data_page.get_page_no(), mtr);
277
278 prev_page_no = data_page.get_page_no();
279 }
280
281 if (remain > 0) {
282 ut_ad(remain <= frag_max_payload);
283 ut_ad(frag_id == FRAG_ID_NULL);
284 z_frag_page_t frag_page(mtr, index);
285 z_frag_entry_t frag_entry(mtr);
286
287 frag_id = first.alloc_fragment(bulk, remain, frag_page, frag_entry);
288
289 if (frag_id == FRAG_ID_NULL) {
290 return (DB_OUT_OF_FILE_SPACE);
291 }
292
293 #ifdef UNIV_DEBUG
294 const ulint big_free_len_1 = frag_page.get_big_free_len();
295 const ulint big_free_len_2 = frag_entry.get_big_free_len();
296 ut_ad(big_free_len_1 == big_free_len_2);
297 #endif /* UNIV_DEBUG */
298
299 frag_node_t node = frag_page.get_frag_node(frag_id);
300 byte *ptr = node.frag_begin();
301
302 #ifdef UNIV_DEBUG
303 {
304 const ulint pl = node.payload();
305 ut_ad(remain <= pl);
306 }
307 #endif /* UNIV_DEBUG */
308
309 mlog_write_string(ptr, lob_ptr, remain, mtr);
310
311 /* Update the frag entry. */
312 frag_entry.update(frag_page);
313
314 /* Get the previous page and update its next page. */
315 buf_block_t *block =
316 buf_page_get(page_id_t(dict_index_get_space(index), prev_page_no),
317 dict_table_page_size(index->table), RW_X_LATCH, mtr);
318
319 buf_block_set_next_page_no(block, frag_page.get_page_no(), mtr);
320 }
321
322 return (DB_SUCCESS);
323 }
324
325 /** Insert one chunk of input. The maximum size of a chunk is Z_CHUNK_SIZE.
326 @param[in] index clustered index in which LOB is inserted.
327 @param[in] first the first page of the LOB.
328 @param[in] trx transaction doing the insertion.
329 @param[in] ref LOB reference in the clust rec.
330 @param[in] blob the uncompressed LOB to be inserted.
331 @param[in] len length of the blob.
332 @param[out] out_entry the newly inserted index entry. can be NULL.
333 @param[in] mtr the mini transaction
334 @param[in] bulk true if it is bulk operation, false otherwise.
335 @return DB_SUCCESS on success, error code on failure. */
z_insert_chunk(dict_index_t * index,z_first_page_t & first,trx_t * trx,ref_t ref,byte * blob,ulint len,z_index_entry_t * out_entry,mtr_t * mtr,bool bulk)336 dberr_t z_insert_chunk(dict_index_t *index, z_first_page_t &first, trx_t *trx,
337 ref_t ref, byte *blob, ulint len,
338 z_index_entry_t *out_entry, mtr_t *mtr, bool bulk) {
339 ut_ad(len <= Z_CHUNK_SIZE);
340 ut_ad(first.get_page_type() == FIL_PAGE_TYPE_ZLOB_FIRST);
341 dberr_t err(DB_SUCCESS);
342
343 const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
344 const undo_no_t undo_no = (trx == nullptr ? 0 : trx->undo_no - 1);
345 z_stream strm;
346
347 strm.zalloc = nullptr;
348 strm.zfree = nullptr;
349 strm.opaque = nullptr;
350
351 int ret = deflateInit(&strm, page_zip_level);
352
353 ut_a(ret == Z_OK);
354
355 strm.avail_in = static_cast<uInt>(len);
356 strm.next_in = blob;
357
358 /* It is possible that the compressed stream is actually bigger. So
359 making use of this call to find it out for sure. */
360 const ulint max_buf = deflateBound(&strm, static_cast<uLong>(len));
361
362 std::unique_ptr<byte[]> tmpbuf(new byte[max_buf]);
363 strm.avail_out = static_cast<uInt>(max_buf);
364 strm.next_out = tmpbuf.get();
365
366 ret = deflate(&strm, Z_FINISH);
367 ut_a(ret == Z_STREAM_END);
368 ut_a(strm.avail_in == 0);
369 ut_a(strm.total_out == (max_buf - strm.avail_out));
370
371 page_no_t z_page_no;
372 frag_id_t z_frag_id;
373 err = z_insert_strm(index, first, trxid, tmpbuf.get(), strm.total_out, mtr,
374 bulk, z_page_no, z_frag_id);
375
376 if (err != DB_SUCCESS) {
377 deflateEnd(&strm);
378 return (err);
379 }
380
381 z_index_entry_t entry = first.alloc_index_entry(bulk);
382
383 if (entry.is_null()) {
384 deflateEnd(&strm);
385 return (DB_OUT_OF_FILE_SPACE);
386 }
387
388 entry.set_trx_id(trxid);
389 entry.set_trx_id_modifier(trxid);
390 entry.set_trx_undo_no(undo_no);
391 entry.set_trx_undo_no_modifier(undo_no);
392 entry.set_z_page_no(z_page_no);
393 entry.set_z_frag_id(z_frag_id);
394 entry.set_data_len(len);
395 entry.set_zdata_len(strm.total_out);
396
397 deflateEnd(&strm);
398
399 if (out_entry != nullptr) {
400 out_entry->reset(entry);
401 }
402
403 ut_ad(z_validate_strm(index, entry, mtr));
404 return (DB_SUCCESS);
405 }
406
407 /** Insert a large object (LOB) into the system.
408 @param[in] ctx the B-tree context for this LOB operation.
409 @param[in] trx transaction doing the insertion.
410 @param[in,out] ref the LOB reference.
411 @param[in] field the LOB field.
412 @return DB_SUCCESS on success, error code on failure.*/
z_insert(InsertContext * ctx,trx_t * trx,ref_t & ref,big_rec_field_t * field,ulint field_j)413 dberr_t z_insert(InsertContext *ctx, trx_t *trx, ref_t &ref,
414 big_rec_field_t *field, ulint field_j) {
415 byte *blob = field->ptr();
416 ulint len = field->len;
417 ulint remain = len;
418 byte *ptr = blob;
419 dberr_t err(DB_SUCCESS);
420 dict_index_t *index = ctx->index();
421 space_id_t space_id = dict_index_get_space(index);
422 byte *field_ref;
423 mtr_t *mtr = ctx->get_mtr();
424 const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
425 const ulint commit_freq = 4;
426
427 ut_ad(remain > 0);
428
429 if (ref.length() > 0) {
430 ref.set_length(len, nullptr);
431 if (!ctx->is_bulk()) {
432 ctx->zblob_write_blobref(field->field_no, ctx->m_mtr);
433 }
434 }
435
436 const page_size_t page_size(dict_table_page_size(index->table));
437
438 if (!ref_t::is_big(page_size, len)) {
439 /* The LOB is not big enough to build LOB index. Insert the
440 LOB without an LOB index. */
441 zInserter zblob_writer(ctx);
442 err = zblob_writer.prepare();
443 if (err == DB_SUCCESS) {
444 zblob_writer.write_one_small_blob(field_j);
445 err = zblob_writer.finish(false);
446 }
447 return (err);
448 }
449
450 z_first_page_t first(mtr, index);
451 buf_block_t *first_block = first.alloc(ctx->is_bulk());
452
453 if (first_block == nullptr) {
454 return (DB_OUT_OF_FILE_SPACE);
455 }
456
457 first.init_lob_version();
458 first.set_last_trx_id(trxid);
459
460 const page_no_t first_page_no = first.get_page_no();
461 const page_id_t first_page_id(dict_index_get_space(index), first_page_no);
462
463 if (dict_index_is_online_ddl(index)) {
464 row_log_table_blob_alloc(index, first_page_no);
465 }
466
467 flst_base_node_t *idx_list = first.index_list();
468
469 ulint nth_chunk = 0;
470
471 ut_o(const) ulint chunk_size = Z_CHUNK_SIZE;
472
473 DBUG_EXECUTE_IF("zlob_reduce_chunk_size", chunk_size = 20000;);
474
475 while (remain > 0) {
476 ut_ad(first.get_page_type() == FIL_PAGE_TYPE_ZLOB_FIRST);
477
478 z_index_entry_t entry(mtr, index);
479 ulint size = (remain >= chunk_size) ? chunk_size : remain;
480
481 err = z_insert_chunk(index, first, trx, ref, ptr, size, &entry, mtr,
482 ctx->is_bulk());
483
484 if (err != DB_SUCCESS) {
485 return (err);
486 }
487
488 entry.set_lob_version(1);
489
490 ptr += size;
491 remain -= size;
492
493 entry.push_back(idx_list);
494
495 if (++nth_chunk % commit_freq == 0) {
496 ctx->check_redolog();
497 field_ref = ctx->get_field_ref(field->field_no);
498 ref.set_ref(field_ref);
499 first.load_x(first_page_id, page_size);
500
501 /* The first page could have been re-located. Reset
502 the idx_list to the correct value. */
503 idx_list = first.index_list();
504 }
505 }
506
507 /* Must have inserted atleast one chunk. */
508 ut_ad(nth_chunk > 0);
509
510 field_ref = ctx->get_field_ref(field->field_no);
511 ref.set_ref(field_ref);
512
513 ref.update(space_id, first_page_no, 1, nullptr);
514 ref.set_length(len, nullptr);
515
516 ctx->make_nth_extern(field->field_no);
517
518 if (!ctx->is_bulk()) {
519 ctx->zblob_write_blobref(field->field_no, ctx->m_mtr);
520 }
521
522 /* If the full LOB could not be inserted, then we report error. */
523 ut_ad(remain == 0);
524
525 #ifdef ZLOB_DEBUG
526 std::cout << "thread=" << std::this_thread::get_id()
527 << ", lob::z_insert(): table=" << ctx->index()->table->name
528 << ", ref=" << ref << std::endl;
529 first.print(std::cout);
530 #endif
531
532 DBUG_EXECUTE_IF("innodb_zlob_print", z_print_info(index, ref, std::cerr););
533
534 return (err);
535 }
536
537 /** Print information about the given compressed lob.
538 @param[in] index the index dictionary object.
539 @param[in] ref the LOB reference
540 @param[out] out the output stream where information is printed.
541 @return DB_SUCCESS on success, or an error code. */
z_print_info(const dict_index_t * index,const lob::ref_t & ref,std::ostream & out)542 dberr_t z_print_info(const dict_index_t *index, const lob::ref_t &ref,
543 std::ostream &out) {
544 mtr_t mtr;
545 mtr_start(&mtr);
546 z_first_page_t first(&mtr, const_cast<dict_index_t *>(index));
547 first.load_x(ref.page_no());
548 first.print(out);
549 mtr_commit(&mtr);
550 return (DB_SUCCESS);
551 }
552
alloc(z_first_page_t & first,page_no_t hint,bool bulk)553 buf_block_t *z_frag_page_t::alloc(z_first_page_t &first, page_no_t hint,
554 bool bulk) {
555 /* The m_block member could point to valid block. Overwriting it is
556 good enough. */
557
558 /* For testing purposes, pretend that the LOB page allocation failed.*/
559 DBUG_EXECUTE_IF("innodb_lob_alloc_z_frag_page_failed", return (nullptr););
560
561 m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
562
563 if (m_block == nullptr) {
564 return (nullptr);
565 }
566
567 /* Set page type to FIL_PAGE_TYPE_ZLOB_FRAG. */
568 set_page_type();
569 set_version_0();
570
571 /* All allocated fragment pages are linked via the next page of the first page
572 * of LOB. */
573 page_no_t frag_page_no = first.get_frag_page_no();
574
575 if (frag_page_no == 0) {
576 /* If the frag_page_no is equal to 0, it means that this LOB was created
577 * before storing the fragment page list in the FIL_PAGE_PREV of the first
578 * page. So don't change that. */
579 } else {
580 if (frag_page_no != FIL_NULL) {
581 /* Load the first fragment page and updates its prev page. */
582 z_frag_page_t tmp(m_mtr, m_index);
583 tmp.load_x(frag_page_no);
584 tmp.set_page_prev(get_page_no());
585 }
586 set_page_next(frag_page_no);
587 set_page_prev(FIL_NULL);
588 first.set_frag_page_no(get_page_no());
589 }
590
591 set_frag_entry_null();
592
593 /* Initialize the frag free list. */
594 plist_base_node_t fl = free_list();
595 fl.init();
596 ut_ad(fl.validate());
597
598 /* Initialize the used frag list. */
599 plist_base_node_t frag_lst = frag_list();
600 frag_lst.init();
601 ut_ad(frag_lst.validate());
602
603 byte *page = frame();
604
605 /* Add the available space as free frag to free list. */
606 frag_node_t frag(page, page + OFFSET_FRAGS_BEGIN, payload(), m_mtr);
607 fl.push_front(frag.m_node);
608 frag.set_frag_id_null();
609
610 ut_ad(fl.validate());
611 return (m_block);
612 }
613
614 /** Determine if the given length of data can fit into a fragment page.
615 @param[in] index the clust index into which LOB is inserted.
616 @param[in] data_size The length of data to operate.
617 @return true if data can fit into fragment page, false otherwise. */
can_data_fit(dict_index_t * index,ulint data_size)618 bool z_frag_page_t::can_data_fit(dict_index_t *index, ulint data_size) {
619 ulint max_size = max_payload(index);
620
621 /* Look for a fragment page only if the data to be stored is less
622 than a quarter of the size of the fragment page. */
623 return (data_size < (max_size / 4));
624 }
625
alloc(z_first_page_t & first,bool bulk)626 buf_block_t *z_frag_node_page_t::alloc(z_first_page_t &first, bool bulk) {
627 ut_ad(m_block == nullptr);
628 page_no_t hint = FIL_NULL;
629
630 DBUG_EXECUTE_IF("innodb_lob_alloc_z_frag_node_page_failed",
631 return (nullptr););
632
633 m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
634
635 if (m_block == nullptr) {
636 return (nullptr);
637 }
638
639 set_page_type();
640 set_version_0();
641 flst_base_node_t *free_lst = first.free_frag_list();
642 init(free_lst);
643
644 /* Link the allocated index page to the first page. */
645 page_no_t page_no = first.get_frag_node_page_no();
646 set_next_page_no(page_no);
647 first.set_frag_node_page_no(get_page_no());
648 return (m_block);
649 }
650
alloc_fragment(ulint size,z_frag_entry_t & entry)651 frag_id_t z_frag_page_t::alloc_fragment(ulint size, z_frag_entry_t &entry) {
652 plist_base_node_t free_lst = free_list();
653
654 ut_ad(free_lst.get_len() > 0);
655
656 const ulint big_free_len = get_big_free_len();
657 ut_d(bool visited_big_frag = false;);
658
659 for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
660 cur = cur.get_next_node()) {
661 frag_node_t frag(cur, m_mtr);
662 const ulint total_len = frag.get_total_len();
663 const ulint payload = frag.payload();
664 const ulint overhead = frag_node_t::overhead();
665
666 /* Get the biggest free fragment available. */
667 if (total_len != big_free_len) {
668 continue;
669 }
670
671 ut_d(visited_big_frag = true;);
672
673 bool exact_fit = false;
674
675 if (is_last_frag(frag)) {
676 /* This fragment gives space for the directory
677 entry. */
678 ulint extra = frag_node_t::SIZE_OF_PAGE_DIR_ENTRY;
679 if (payload == (size + extra)) {
680 exact_fit = true;
681 }
682 } else {
683 /* This fragment does not give space for the
684 directory entry. */
685 if (payload == size) {
686 exact_fit = true;
687 }
688 }
689
690 if (exact_fit) {
691 /* Allocate the fragment id. */
692 ulint frag_id = alloc_frag_id();
693 ut_ad(frag_id != FRAG_ID_NULL);
694
695 /* this is the requested fragment. */
696 free_lst.remove(cur);
697 insert_into_frag_list(frag);
698
699 frag.set_frag_id(frag_id);
700 set_nth_dir_entry(frag_id, frag.addr());
701 entry.update(*this);
702 return (frag_id);
703
704 } else if (payload >= (size + overhead + 1)) {
705 /* Break the current fragment into two. Atleast 1 byte
706 payload must be there in the other node. */
707
708 split_free_frag(frag, size);
709 free_lst.remove(frag.m_node);
710 insert_into_frag_list(frag);
711
712 /* Allocate the fragment id. */
713 ulint frag_id = alloc_frag_id();
714 ut_ad(frag_id != FRAG_ID_NULL);
715
716 frag.set_frag_id(frag_id);
717 set_nth_dir_entry(frag_id, frag.addr());
718 entry.update(*this);
719 return (frag_id);
720 }
721 }
722
723 ut_ad(visited_big_frag);
724 return (FRAG_ID_NULL);
725 }
726
727 /** Grow the frag directory by one entry.
728 @return the fragment identifier that was newly added. */
alloc_dir_entry()729 ulint z_frag_page_t::alloc_dir_entry() {
730 plist_base_node_t free_lst = free_list();
731 plist_node_t last = free_lst.get_last_node();
732 frag_node_t frag(last, m_mtr);
733 ulint len = frag.payload();
734
735 /* The last free fragment must be adjacent to the directory.
736 Then only it can give space to one slot. */
737 if (frag.end_ptr() != slots_end_ptr()) {
738 ut_ad(0);
739 return (FRAG_ID_NULL);
740 }
741
742 if (len <= SIZE_OF_PAGE_DIR_ENTRY) {
743 ut_ad(0);
744 return (FRAG_ID_NULL);
745 }
746
747 incr_n_dir_entries();
748 frag.decr_length_by_2();
749 return (init_last_dir_entry());
750 }
751
get_frag_entry_x()752 z_frag_entry_t z_frag_page_t::get_frag_entry_x() {
753 fil_addr_t node_loc = get_frag_entry_addr();
754 flst_node_t *node = addr2ptr_x(node_loc);
755 z_frag_entry_t entry(node, m_mtr);
756 ut_ad(entry.get_page_no() == get_page_no());
757 return (entry);
758 }
759
get_frag_entry_s()760 z_frag_entry_t z_frag_page_t::get_frag_entry_s() {
761 fil_addr_t node_loc = get_frag_entry_addr();
762 flst_node_t *node = addr2ptr_s(node_loc);
763 z_frag_entry_t entry(node, m_mtr);
764 ut_ad(entry.get_page_no() == get_page_no());
765 return (entry);
766 }
767
dealloc_with_entry(z_first_page_t & first,mtr_t * alloc_mtr)768 void z_frag_page_t::dealloc_with_entry(z_first_page_t &first,
769 mtr_t *alloc_mtr) {
770 ut_ad(get_n_frags() == 0);
771 z_frag_entry_t entry = get_frag_entry_x();
772 entry.purge(first.frag_list(), first.free_frag_list());
773
774 page_no_t top_frag_page = first.get_frag_page_no();
775
776 if (top_frag_page == 0) {
777 /* If the first page contains 0 in FIL_PAGE_PREV, then this LOB does not use
778 * FIL_PAGE_PREV to point to the doubly-linked list of fragment pages. In
779 * this case, don't touch FIL_PAGE_PREV. */
780 } else {
781 page_no_t next_frag_page = get_next_page_no();
782 page_no_t prev_frag_page = get_prev_page_no();
783
784 if (top_frag_page == get_page_no()) {
785 /* The fragment page pointed to by first LOB page is being deallocated. */
786 ut_ad(prev_frag_page == FIL_NULL);
787 first.set_frag_page_no(alloc_mtr, next_frag_page);
788 } else {
789 ut_ad(prev_frag_page != FIL_NULL);
790 }
791
792 /* The fragment pages are doubly linked via FIL_PAGE_NEXT and
793 * FIL_PAGE_PREV. Update the links before deallocating a fragment page. */
794 if (next_frag_page != FIL_NULL) {
795 z_frag_page_t zfp_next(alloc_mtr, m_index);
796 zfp_next.load_x(next_frag_page);
797 zfp_next.set_page_prev(prev_frag_page);
798 }
799
800 if (prev_frag_page != FIL_NULL) {
801 z_frag_page_t zfp_prev(alloc_mtr, m_index);
802 zfp_prev.load_x(prev_frag_page);
803 zfp_prev.set_page_next(next_frag_page);
804 }
805 }
806
807 btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, alloc_mtr);
808 m_block = nullptr;
809 }
810
print_frags_in_order(std::ostream & out) const811 std::ostream &z_frag_page_t::print_frags_in_order(std::ostream &out) const {
812 if (m_block == nullptr) {
813 return (out);
814 }
815
816 plist_base_node_t free_lst = free_list();
817 plist_base_node_t frag_lst = frag_list();
818
819 out << "[Free List: " << free_lst << "]" << std::endl;
820 out << "[Frag List: " << frag_lst << "]" << std::endl;
821
822 frag_node_t cur_free(free_lst.get_first_node(), m_mtr);
823 frag_node_t cur_frag(frag_lst.get_first_node(), m_mtr);
824
825 while (!cur_free.is_null() && !cur_frag.is_null()) {
826 if (cur_free.is_before(cur_frag)) {
827 out << "F: " << cur_free << std::endl;
828 cur_free = cur_free.get_next_node();
829 } else {
830 out << "U: " << cur_frag << std::endl;
831 cur_frag = cur_frag.get_next_node();
832 }
833 }
834
835 if (cur_free.is_null()) {
836 while (!cur_frag.is_null()) {
837 out << "U: " << cur_frag << std::endl;
838 cur_frag = cur_frag.get_next_node();
839 }
840 }
841
842 if (cur_frag.is_null()) {
843 while (!cur_free.is_null()) {
844 out << "F: " << cur_free << std::endl;
845 cur_free = cur_free.get_next_node();
846 }
847 }
848
849 return (out);
850 }
851
852 /** Get the total amount of stored data in this page. */
get_total_stored_data() const853 ulint z_frag_page_t::get_total_stored_data() const {
854 ulint len = 0;
855
856 ut_ad(m_block != nullptr);
857
858 plist_base_node_t frag_lst = frag_list();
859
860 for (plist_node_t cur = frag_lst.get_first_node(); !cur.is_null();
861 cur = cur.get_next_node()) {
862 frag_node_t frag(cur, m_mtr);
863 len += frag.payload();
864 }
865
866 return (len);
867 }
868
869 /** Get the total cumulative free space in this page. */
get_total_free_len() const870 ulint z_frag_page_t::get_total_free_len() const {
871 ulint len = 0;
872
873 ut_ad(m_block != nullptr);
874
875 plist_base_node_t free_lst = free_list();
876 for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
877 cur = cur.get_next_node()) {
878 frag_node_t frag(cur, m_mtr);
879 len += frag.payload();
880 }
881 return (len);
882 }
883
884 /** Get the big free space in this page. */
get_big_free_len() const885 ulint z_frag_page_t::get_big_free_len() const {
886 ulint big = 0;
887
888 ut_ad(m_block != nullptr);
889
890 plist_base_node_t free_lst = free_list();
891 for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
892 cur = cur.get_next_node()) {
893 frag_node_t frag(cur, m_mtr);
894
895 /* Use the total length (including the meta data overhead) of the
896 fragment. */
897 ulint total_free = frag.get_total_len();
898 if (total_free > big) {
899 big = total_free;
900 }
901 }
902
903 return (big);
904 }
905
906 /** Deallocate all the free slots from the end of the page directory. */
dealloc_frag_id()907 void z_frag_page_t::dealloc_frag_id() {
908 plist_base_node_t free_lst = free_list();
909 plist_node_t last = free_lst.get_last_node();
910 frag_node_t frag(last, m_mtr);
911 /* The last free fragment must be adjacent to the directory.
912 Then only it can take space from one slot. */
913 if (frag.end_ptr() != slots_end_ptr()) {
914 return;
915 }
916
917 ulint frag_id = get_n_dir_entries() - 1;
918 paddr_t addr = frag_id_to_addr(frag_id);
919 while (addr == 0) {
920 frag.incr_length_by_2();
921 decr_n_dir_entries();
922 if (frag_id == 0) {
923 break;
924 }
925 frag_id--;
926 addr = frag_id_to_addr(frag_id);
927 }
928 }
929
930 /** Insert a large object (LOB) into the system.
931 @param[in] ctx the B-tree context for this LOB operation.
932 @param[in] trx transaction doing the insertion.
933 @param[in,out] ref the LOB reference.
934 @param[in] field the LOB field.
935 @return DB_SUCCESS on success, error code on failure.*/
insert(InsertContext * ctx,trx_t * trx,ref_t & ref,big_rec_field_t * field,ulint field_j)936 dberr_t insert(InsertContext *ctx, trx_t *trx, ref_t &ref,
937 big_rec_field_t *field, ulint field_j) {
938 const trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
939 const undo_no_t undo_no = (trx == nullptr ? 0 : trx->undo_no - 1);
940 dberr_t ret = DB_SUCCESS;
941 ulint total_written = 0;
942 const byte *ptr = field->ptr();
943 ulint len = field->len;
944 mtr_t *mtr = ctx->get_mtr();
945 dict_index_t *index = ctx->index();
946 space_id_t space_id = dict_index_get_space(index);
947 page_size_t page_size(dict_table_page_size(index->table));
948 DBUG_TRACE;
949
950 if (ref.length() > 0) {
951 ref.set_length(0, mtr);
952 }
953
954 if (!ref_t::is_big(page_size, len)) {
955 /* The LOB is not big enough to build LOB index. Insert the LOB without an
956 LOB index. */
957 Inserter blob_writer(ctx);
958 return blob_writer.write_one_small_blob(field_j);
959 }
960
961 ut_ad(ref_t::is_big(page_size, len));
962
963 DBUG_LOG("lob", PrintBuffer(ptr, len));
964 ut_ad(ref.validate(ctx->get_mtr()));
965
966 first_page_t first(mtr, index);
967 buf_block_t *first_block = first.alloc(mtr, ctx->is_bulk());
968
969 if (first_block == nullptr) {
970 /* Allocation of the first page of LOB failed. */
971 return DB_OUT_OF_FILE_SPACE;
972 }
973
974 first.set_last_trx_id(trxid);
975 first.init_lob_version();
976
977 page_no_t first_page_no = first.get_page_no();
978
979 if (dict_index_is_online_ddl(index)) {
980 row_log_table_blob_alloc(index, first_page_no);
981 }
982
983 page_id_t first_page_id(space_id, first_page_no);
984
985 flst_base_node_t *index_list = first.index_list();
986
987 ulint to_write = first.write(trxid, ptr, len);
988 total_written += to_write;
989 ulint remaining = len;
990
991 {
992 /* Insert an index entry in LOB index. */
993 flst_node_t *node = first.alloc_index_entry(ctx->is_bulk());
994
995 /* Here the first index entry is being allocated. Since this will be
996 allocated in the first page of LOB, it cannot be nullptr. */
997 ut_ad(node != nullptr);
998
999 index_entry_t entry(node, mtr, index);
1000 entry.set_versions_null();
1001 entry.set_trx_id(trxid);
1002 entry.set_trx_id_modifier(trxid);
1003 entry.set_trx_undo_no(undo_no);
1004 entry.set_trx_undo_no_modifier(undo_no);
1005 entry.set_page_no(first.get_page_no());
1006 entry.set_data_len(to_write);
1007 entry.set_lob_version(1);
1008 flst_add_last(index_list, node, mtr);
1009
1010 first.set_trx_id(trxid);
1011 first.set_data_len(to_write);
1012 }
1013
1014 ulint nth_blob_page = 0;
1015 const ulint commit_freq = 4;
1016
1017 while (remaining > 0) {
1018 data_page_t data_page(mtr, index);
1019 buf_block_t *block = data_page.alloc(mtr, ctx->is_bulk());
1020
1021 if (block == nullptr) {
1022 ret = DB_OUT_OF_FILE_SPACE;
1023 break;
1024 }
1025
1026 to_write = data_page.write(trxid, ptr, remaining);
1027 total_written += to_write;
1028 data_page.set_trx_id(trxid);
1029
1030 /* Allocate a new index entry */
1031 flst_node_t *node = first.alloc_index_entry(ctx->is_bulk());
1032
1033 if (node == nullptr) {
1034 ret = DB_OUT_OF_FILE_SPACE;
1035 break;
1036 }
1037
1038 index_entry_t entry(node, mtr, index);
1039 entry.set_versions_null();
1040 entry.set_trx_id(trxid);
1041 entry.set_trx_id_modifier(trxid);
1042 entry.set_trx_undo_no(undo_no);
1043 entry.set_trx_undo_no_modifier(undo_no);
1044 entry.set_page_no(data_page.get_page_no());
1045 entry.set_data_len(to_write);
1046 entry.set_lob_version(1);
1047 entry.push_back(first.index_list());
1048
1049 ut_ad(!entry.get_self().is_equal(entry.get_prev()));
1050 ut_ad(!entry.get_self().is_equal(entry.get_next()));
1051
1052 page_type_t type = fil_page_get_type(block->frame);
1053 ut_a(type == FIL_PAGE_TYPE_LOB_DATA);
1054
1055 if (++nth_blob_page % commit_freq == 0) {
1056 ctx->check_redolog();
1057 ref.set_ref(ctx->get_field_ref(field->field_no));
1058 first.load_x(first_page_id, page_size);
1059 }
1060 }
1061
1062 if (ret == DB_SUCCESS) {
1063 ref.update(space_id, first_page_no, 1, mtr);
1064 ref.set_length(total_written, mtr);
1065 }
1066
1067 DBUG_EXECUTE_IF("innodb_lob_print",
1068 print(trx, index, std::cerr, ref, false););
1069
1070 DBUG_EXECUTE_IF("btr_store_big_rec_extern", ret = DB_OUT_OF_FILE_SPACE;);
1071 return ret;
1072 }
1073
1074 /** Fetch a large object (LOB) from the system.
1075 @param[in] ctx the read context information.
1076 @param[in] ref the LOB reference identifying the LOB.
1077 @param[in] offset read the LOB from the given offset.
1078 @param[in] len the length of LOB data that needs to be fetched.
1079 @param[out] buf the output buffer (owned by caller) of minimum len bytes.
1080 @return the amount of data (in bytes) that was actually read. */
read(ReadContext * ctx,ref_t ref,ulint offset,ulint len,byte * buf)1081 ulint read(ReadContext *ctx, ref_t ref, ulint offset, ulint len, byte *buf) {
1082 DBUG_TRACE;
1083 ut_ad(offset == 0);
1084 const uint32_t lob_version = ref.version();
1085
1086 ref_mem_t ref_mem;
1087 ref.parse(ref_mem);
1088
1089 #ifdef LOB_DEBUG
1090 std::cout << "thread=" << std::this_thread::get_id()
1091 << ", lob::read(): table=" << ctx->index()->table->name
1092 << ", ref=" << ref << std::endl;
1093 #endif /* LOB_DEBUG */
1094
1095 /* Cache of s-latched blocks of LOB index pages.*/
1096 BlockCache cached_blocks;
1097
1098 ut_ad(len > 0);
1099
1100 /* Obtain length of LOB available in clustered index.*/
1101 const ulint avail_lob = ref.length();
1102
1103 if (avail_lob == 0) {
1104 return 0;
1105 }
1106
1107 if (ref.is_being_modified()) {
1108 /* This should happen only for READ UNCOMMITTED transactions. */
1109 ut_ad(ctx->assert_read_uncommitted());
1110 return 0;
1111 }
1112
1113 ut_ad(ctx->m_index->is_clustered());
1114
1115 ulint total_read = 0;
1116 ulint actual_read = 0;
1117 page_no_t page_no = ref.page_no();
1118 const page_id_t page_id(ctx->m_space_id, page_no);
1119 mtr_t mtr;
1120
1121 mtr_start(&mtr);
1122
1123 first_page_t first_page(&mtr, ctx->m_index);
1124 first_page.load_s(page_id, ctx->m_page_size);
1125
1126 page_type_t page_type = first_page.get_page_type();
1127
1128 if (page_type == FIL_PAGE_TYPE_BLOB || page_type == FIL_PAGE_SDI_BLOB) {
1129 mtr_commit(&mtr);
1130 Reader reader(*ctx);
1131 ulint fetch_len = reader.fetch();
1132 return fetch_len;
1133 }
1134
1135 ut_ad(page_type == FIL_PAGE_TYPE_LOB_FIRST);
1136
1137 cached_blocks.insert(
1138 std::pair<page_no_t, buf_block_t *>(page_no, first_page.get_block()));
1139
1140 ctx->m_lob_version = first_page.get_lob_version();
1141
1142 page_no_t first_page_no = first_page.get_page_no();
1143
1144 flst_base_node_t *base_node = first_page.index_list();
1145
1146 fil_addr_t node_loc = flst_get_first(base_node, &mtr);
1147 flst_node_t *node = nullptr;
1148
1149 /* Total bytes that have been skipped in this LOB */
1150 ulint skipped = 0;
1151
1152 index_entry_t cur_entry(&mtr, ctx->m_index);
1153 index_entry_t old_version(&mtr, ctx->m_index);
1154 index_entry_mem_t entry_mem;
1155
1156 ut_ad(offset >= skipped);
1157
1158 ulint page_offset = offset - skipped;
1159 ulint want = len;
1160 byte *ptr = buf;
1161
1162 /* Use a different mtr for data pages. */
1163 mtr_t data_mtr;
1164 mtr_start(&data_mtr);
1165 const ulint commit_freq = 10;
1166 ulint data_pages_count = 0;
1167
1168 while (!fil_addr_is_null(node_loc) && want > 0) {
1169 old_version.reset(nullptr);
1170
1171 node = first_page.addr2ptr_s_cache(cached_blocks, node_loc);
1172 cur_entry.reset(node);
1173
1174 cur_entry.read(entry_mem);
1175
1176 const uint32_t entry_lob_version = cur_entry.get_lob_version();
1177
1178 if (entry_lob_version > lob_version) {
1179 flst_base_node_t *ver_list = cur_entry.get_versions_list();
1180 /* Look at older versions. */
1181 fil_addr_t node_versions = flst_get_first(ver_list, &mtr);
1182
1183 while (!fil_addr_is_null(node_versions)) {
1184 flst_node_t *node_old_version =
1185 first_page.addr2ptr_s_cache(cached_blocks, node_versions);
1186
1187 old_version.reset(node_old_version);
1188
1189 old_version.read(entry_mem);
1190
1191 const uint32_t old_lob_version = old_version.get_lob_version();
1192
1193 if (old_lob_version <= lob_version) {
1194 /* The current trx can see this
1195 entry. */
1196 break;
1197 }
1198 node_versions = old_version.get_next();
1199 old_version.reset(nullptr);
1200 }
1201 }
1202
1203 page_no_t read_from_page_no = FIL_NULL;
1204
1205 if (old_version.is_null()) {
1206 read_from_page_no = cur_entry.get_page_no();
1207 } else {
1208 read_from_page_no = old_version.get_page_no();
1209 }
1210
1211 actual_read = 0;
1212 if (read_from_page_no != FIL_NULL) {
1213 if (read_from_page_no == first_page_no) {
1214 actual_read = first_page.read(page_offset, ptr, want);
1215 ptr += actual_read;
1216 want -= actual_read;
1217
1218 } else {
1219 buf_block_t *block =
1220 buf_page_get(page_id_t(ctx->m_space_id, read_from_page_no),
1221 ctx->m_page_size, RW_S_LATCH, &data_mtr);
1222
1223 data_page_t page(block, &data_mtr);
1224 actual_read = page.read(page_offset, ptr, want);
1225 ptr += actual_read;
1226 want -= actual_read;
1227
1228 page_type_t type = page.get_page_type();
1229 ut_a(type == FIL_PAGE_TYPE_LOB_DATA);
1230
1231 if (++data_pages_count % commit_freq == 0) {
1232 mtr_commit(&data_mtr);
1233 mtr_start(&data_mtr);
1234 }
1235 }
1236 }
1237
1238 total_read += actual_read;
1239 page_offset = 0;
1240 node_loc = cur_entry.get_next();
1241 }
1242
1243 /* Assert that we have read what has been requested or what is
1244 available. */
1245 ut_ad(total_read == len || total_read == avail_lob);
1246 ut_ad(total_read <= avail_lob);
1247
1248 mtr_commit(&mtr);
1249 mtr_commit(&data_mtr);
1250 return total_read;
1251 }
1252
alloc(z_first_page_t & first,bool bulk)1253 buf_block_t *z_index_page_t::alloc(z_first_page_t &first, bool bulk) {
1254 ut_ad(m_block == nullptr);
1255 page_no_t hint = FIL_NULL;
1256
1257 /* For testing purposes, pretend that the LOB page allocation failed.*/
1258 DBUG_EXECUTE_IF("innodb_lob_alloc_z_index_page_failed", return (nullptr););
1259
1260 m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
1261
1262 if (m_block == nullptr) {
1263 return (nullptr);
1264 }
1265
1266 set_page_type(m_mtr);
1267 set_version_0();
1268 flst_base_node_t *free_lst = first.free_list();
1269 init(free_lst, m_mtr);
1270
1271 /* Link the allocated index page to the first page. */
1272 page_no_t page_no = first.get_index_page_no();
1273 set_next_page_no(page_no);
1274 first.set_index_page_no(get_page_no());
1275 return (m_block);
1276 }
1277
1278 /** Allocate one data page.
1279 @param[in] hint hint page number for allocation.
1280 @param[in] bulk true if bulk operation (OPCODE_INSERT_BULK)
1281 false otherwise.
1282 @return the allocated buffer block. */
alloc(page_no_t hint,bool bulk)1283 buf_block_t *z_data_page_t::alloc(page_no_t hint, bool bulk) {
1284 ut_ad(m_block == nullptr);
1285
1286 /* For testing purposes, pretend that the LOB page allocation failed.*/
1287 DBUG_EXECUTE_IF("innodb_lob_alloc_z_data_page_failed", return (nullptr););
1288
1289 m_block = alloc_lob_page(m_index, m_mtr, hint, bulk);
1290
1291 if (m_block == nullptr) {
1292 return (nullptr);
1293 }
1294
1295 init();
1296 return (m_block);
1297 }
1298
init(flst_base_node_t * free_lst,mtr_t * mtr)1299 void z_index_page_t::init(flst_base_node_t *free_lst, mtr_t *mtr) {
1300 ulint n = get_n_index_entries();
1301 for (ulint i = 0; i < n; ++i) {
1302 byte *ptr = frame() + LOB_PAGE_DATA;
1303 ptr += (i * z_index_entry_t::SIZE);
1304 z_index_entry_t entry(ptr, mtr);
1305 entry.init();
1306 entry.push_back(free_lst);
1307 }
1308 }
1309
get_n_index_entries() const1310 ulint z_index_page_t::get_n_index_entries() const {
1311 return (payload() / z_index_entry_t::SIZE);
1312 }
1313
node_count()1314 ulint node_page_t::node_count() {
1315 return (max_space_available() / index_entry_t::SIZE);
1316 }
1317
import(trx_id_t trx_id)1318 void node_page_t::import(trx_id_t trx_id) {
1319 ulint nc = node_count();
1320 byte *cur = nodes_begin();
1321
1322 /* Update the trx id */
1323 for (ulint i = 0; i < nc; ++i) {
1324 index_entry_t entry(cur, m_mtr, m_index);
1325 entry.set_trx_id_no_redo(trx_id);
1326 entry.set_trx_id_modifier_no_redo(trx_id);
1327
1328 cur += index_entry_t::SIZE;
1329 }
1330 }
1331
1332 /** Print information about the given LOB.
1333 @param[in] trx the current transaction.
1334 @param[in] index the clust index that contains the LOB.
1335 @param[in] out the output stream into which LOB info is printed.
1336 @param[in] ref the LOB reference
1337 @param[in] fatal if true assert at end of function. */
print(trx_t * trx,dict_index_t * index,std::ostream & out,ref_t ref,bool fatal)1338 void print(trx_t *trx, dict_index_t *index, std::ostream &out, ref_t ref,
1339 bool fatal) {
1340 trx_id_t trxid = (trx == nullptr ? 0 : trx->id);
1341
1342 out << "[lob::print: trx_id=" << trxid << ", ";
1343
1344 mtr_t mtr;
1345
1346 /* Print the lob reference object. */
1347 space_id_t space_id = ref.space_id();
1348 page_no_t page_no = ref.page_no();
1349 ulint avail_lob = ref.length();
1350
1351 out << "avail_lob=" << avail_lob << ", ";
1352 out << ref;
1353
1354 const page_id_t first_page_id(space_id, page_no);
1355 const page_size_t page_size = dict_table_page_size(index->table);
1356
1357 /* Load the first page of LOB */
1358 mtr_start(&mtr);
1359
1360 first_page_t first_page(&mtr, index);
1361 first_page.load_x(first_page_id, page_size);
1362
1363 first_page.print_index_entries(out);
1364 mtr_commit(&mtr);
1365 out << "]";
1366
1367 if (fatal) {
1368 ut_error;
1369 }
1370 }
1371
import(trx_id_t trx_id)1372 void z_index_page_t::import(trx_id_t trx_id) {
1373 ulint n = get_n_index_entries();
1374 for (ulint i = 0; i < n; ++i) {
1375 byte *ptr = frame() + LOB_PAGE_DATA;
1376 ptr += (i * z_index_entry_t::SIZE);
1377 z_index_entry_t entry(ptr);
1378 entry.set_trx_id_no_redo(trx_id);
1379 entry.set_trx_id_modifier_no_redo(trx_id);
1380 }
1381 }
1382
1383 } // namespace lob
1384