1 /*****************************************************************************
2 
3 Copyright (c) 2016, 2019, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License, version 2.0, as published by the
7 Free Software Foundation.
8 
9 This program is also distributed with certain software (including but not
10 limited to OpenSSL) that is licensed under separate terms, as designated in a
11 particular file or component or in included license documentation. The authors
12 of MySQL hereby grant you an additional permission to link the program and
13 your derivative works with the separately licensed software that they have
14 included with MySQL.
15 
16 This program is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18 FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
19 for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24 
25 *****************************************************************************/
26 #ifndef lob0impl_h
27 #define lob0impl_h
28 
29 #include "btr0btr.h"
30 #include "fut0lst.h"
31 #include "lob0first.h"
32 #include "lob0lob.h"
33 #include "mach0data.h"
34 #include "mtr0log.h"
35 #include "mtr0mtr.h"
36 #include "mtr0types.h"
37 
38 namespace lob {
39 
40 struct z_index_entry_t;
41 struct z_first_page_t;
42 struct z_frag_page_t;
43 struct index_entry_t;
44 struct first_page_t;
45 
46 using paddr_t = ulint;
47 
48 /** The node of page list.  The page list is similar to the file list
49 (flst_node_t) except that it is completely within one page. */
50 class plist_node_t {
51  public:
52   /** Offset of the previous node. (2 bytes) */
53   static const uint16_t OFFSET_PREV = 0;
54 
55   /** Offset of the next node. (2 bytes) */
56   static const uint16_t OFFSET_NEXT = 2;
57 
58   /** The size of a page list node. */
59   static const uint8_t SIZE = 4;
60 
61   /** Constructor.
62   @param[in]	mtr	the mini transaction context. */
plist_node_t(mtr_t * mtr)63   explicit plist_node_t(mtr_t *mtr)
64       : m_frame(nullptr), m_node(nullptr), m_mtr(mtr) {}
65 
66   /** Default constructor. */
plist_node_t()67   plist_node_t() : m_frame(nullptr), m_node(nullptr), m_mtr(nullptr) {}
68 
69   /** Constructor.
70   @param[in]	mtr	the mini transaction context
71   @param[in]	frame	the page frame of this plist. */
plist_node_t(mtr_t * mtr,byte * frame)72   plist_node_t(mtr_t *mtr, byte *frame)
73       : m_frame(frame), m_node(nullptr), m_mtr(mtr) {}
74 
75   /** Constructor.
76   @param[in]	frame	the page frame of this plist.
77   @param[in]	node	the location of plist node. */
plist_node_t(byte * frame,byte * node)78   plist_node_t(byte *frame, byte *node)
79       : m_frame(frame), m_node(node), m_mtr(nullptr) {}
80 
81   /** Constructor.
82   @param[in]	frame	the page frame where the page list node is
83                           located.
84   @param[in]	node	the location of page list node within page
85                           frame.
86   @param[in]	mtr	the mini-transaction context. */
plist_node_t(byte * frame,byte * node,mtr_t * mtr)87   plist_node_t(byte *frame, byte *node, mtr_t *mtr)
88       : m_frame(frame), m_node(node), m_mtr(mtr) {}
89 
90   /** Copy constructor. */
91   plist_node_t(const plist_node_t &other) = default;
92 
93   plist_node_t &operator=(const plist_node_t &) = default;
94 
95   /** Check if the current node is before the given node in the
96   page (w.r.t the offset).
97   @param[in]	node	the other node.
98   @return true if current node is before the given node.
99   @return false if current node is after the given node. */
is_before(const plist_node_t & node)100   bool is_before(const plist_node_t &node) const {
101     ut_ad(!is_null());
102     ut_ad(!node.is_null());
103     return (addr() < node.addr());
104   }
105 
106   /** Initialize the current page list node. The offset of next and
107   previous nodes are set to 0. */
init()108   void init() {
109     ut_ad(!is_null());
110     ut_ad(m_mtr != nullptr);
111 
112     mlog_write_ulint(m_node + OFFSET_PREV, 0, MLOG_2BYTES, m_mtr);
113     mlog_write_ulint(m_node + OFFSET_NEXT, 0, MLOG_2BYTES, m_mtr);
114   }
115 
116   /** Set the offset of the previous node.
117   @param[in]	addr	the offset of previous node.*/
set_prev(paddr_t addr)118   void set_prev(paddr_t addr) {
119     ut_ad(addr < UNIV_PAGE_SIZE);
120     ut_ad(m_mtr != nullptr);
121 
122     mlog_write_ulint(m_node + OFFSET_PREV, addr, MLOG_2BYTES, m_mtr);
123   }
124 
125   /** Set the previous page list node.
126   @param[in]	prev	the previous page list node.*/
set_prev_node(plist_node_t & prev)127   void set_prev_node(plist_node_t &prev) { set_prev(prev.addr()); }
128 
129   /** Set the offset of the next node.
130   @param[in]	addr	the offset of next node.*/
set_next(paddr_t addr)131   void set_next(paddr_t addr) {
132     ut_ad(!is_null());
133     ut_ad(addr < UNIV_PAGE_SIZE);
134     ut_ad(m_mtr != nullptr);
135 
136     mlog_write_ulint(m_node + OFFSET_NEXT, addr, MLOG_2BYTES, m_mtr);
137   }
138 
139   /** Set the next page list node.
140   @param[in]	next	the next page list node.*/
set_next_node(const plist_node_t & next)141   void set_next_node(const plist_node_t &next) { set_next(next.addr()); }
142 
143   /** Get the offset of the previous page list node.
144   @return offset of previous node of the page list. */
get_prev()145   paddr_t get_prev() const { return (mach_read_from_2(m_node + OFFSET_PREV)); }
146 
147   /** Get the offset of the next page list node.
148   @return offset of next node of the page list. */
get_next()149   paddr_t get_next() const { return (mach_read_from_2(m_node + OFFSET_NEXT)); }
150 
151   /** Get the next page list node.
152   @return next node of the page list. */
get_next_node()153   plist_node_t get_next_node() const {
154     paddr_t addr = get_next();
155     byte *node = nullptr;
156 
157     if (addr != 0) {
158       node = m_frame + addr;
159       ut_ad(addr < UNIV_PAGE_SIZE);
160     }
161 
162     return (plist_node_t(m_frame, node, m_mtr));
163   }
164 
165   /** Get the previous page list node.
166   @return previous node of the page list. */
get_prev_node()167   plist_node_t get_prev_node() const {
168     paddr_t addr = get_prev();
169     byte *node = nullptr;
170 
171     if (addr != 0) {
172       ut_ad(addr < UNIV_PAGE_SIZE);
173       node = m_frame + addr;
174     }
175 
176     return (plist_node_t(m_frame, node, m_mtr));
177   }
178 
179   /** Obtain the offset of the page list node within the given
180   page frame.
181   @return offset from the beginning of the page. */
addr()182   paddr_t addr() const {
183     return ((m_node == nullptr) ? 0 : (m_node - m_frame));
184   }
185 
186   /** Obtain the memory location of the page list node.
187   @return the pointer to the page list node. */
ptr()188   byte *ptr() const { return (m_node); }
189 
190   /** Check if the given page list node is null.
191   @return true if null, false otherwise. */
is_null()192   bool is_null() const { return (m_node == nullptr); }
193 
194   /** Print the page list node into the given output stream.
195   @param[in]	out	the output stream.
196   @return the output stream. */
print(std::ostream & out)197   std::ostream &print(std::ostream &out) const {
198     out << "[plist_node_t: next=" << get_next() << ", prev=" << get_prev()
199         << ", this=" << addr() << ", frame=" << (void *)m_frame
200         << ", m_node=" << (void *)m_node << "]";
201     return (out);
202   }
203 
204   /** Set the page frame to the given value.
205   @param[in]	frame	the page frame */
set_frame(byte * frame)206   void set_frame(byte *frame) { m_frame = frame; }
207 
208   /** Set the page list node to the given value.
209   @param[in]	node	the page list node. */
set_node(byte * node)210   void set_node(byte *node) { m_node = node; }
211 
212   /** Set the mini transaction context to the given value.
213   @param[in]	mtr	the mini transaction context. */
set_mtr(mtr_t * mtr)214   void set_mtr(mtr_t *mtr) { m_mtr = mtr; }
215 
216   /** Get the page frame where this page list exists.
217   @return the page frame. */
get_frame()218   byte *get_frame() const { return (m_frame); }
219 
is_equal(const plist_node_t & that)220   bool is_equal(const plist_node_t &that) const {
221     if (m_node == nullptr || that.m_node == nullptr) {
222       return (false);
223     }
224     return (m_node == that.m_node);
225   }
226 
227  private:
228   /** The page frame where this page list exists. */
229   byte *m_frame;
230 
231   /** The plist node is located at this address. */
232   byte *m_node;
233 
234   /** The mini transaction context. */
235   mtr_t *m_mtr;
236 };
237 
238 inline std::ostream &operator<<(std::ostream &out, const plist_node_t &obj) {
239   return (obj.print(out));
240 }
241 
242 /** The base node of page list. */
243 struct plist_base_node_t {
244   /** The offset where the length of the page list is stored.
245   This is 4 bytes long.*/
246   static const ulint OFFSET_LEN = 0;
247 
248   /** The offset where the first node is located.
249   This is 2 bytes long. */
250   static const ulint OFFSET_FIRST = 4;
251 
252   /** The offset where the last node is located.
253   This is 2 bytes long. */
254   static const ulint OFFSET_LAST = 6;
255 
256   /** The total size (in bytes) of a page list base node. */
257   static const ulint SIZE = 8;
258 
plist_base_node_tplist_base_node_t259   plist_base_node_t(byte *frame, byte *base, mtr_t *mtr)
260       : m_frame(frame), m_base(base), m_mtr(mtr) {}
261 
initplist_base_node_t262   void init() {
263     ut_ad(m_mtr != nullptr);
264 
265     mlog_write_ulint(m_base + OFFSET_LEN, 0, MLOG_4BYTES, m_mtr);
266     mlog_write_ulint(m_base + OFFSET_FIRST, 0, MLOG_2BYTES, m_mtr);
267     mlog_write_ulint(m_base + OFFSET_LAST, 0, MLOG_2BYTES, m_mtr);
268   }
269 
removeplist_base_node_t270   void remove(plist_node_t &node) {
271     ut_ad(m_mtr != nullptr);
272 
273     plist_node_t prev = node.get_prev_node();
274     plist_node_t next = node.get_next_node();
275 
276     if (prev.is_null()) {
277       set_first(next.addr());
278     } else {
279       prev.set_next(next.addr());
280     }
281 
282     if (next.is_null()) {
283       set_last(prev.addr());
284     } else {
285       next.set_prev(prev.addr());
286     }
287 
288     node.set_next(0);
289     node.set_prev(0);
290 
291     decr_len();
292   }
293 
push_frontplist_base_node_t294   void push_front(plist_node_t &node) {
295     ut_ad(m_mtr != nullptr);
296 
297     if (get_len() == 0) {
298       add_to_empty(node);
299     } else {
300       paddr_t cur_addr = node.addr();
301       paddr_t first_addr = get_first();
302       plist_node_t first_node = get_node(first_addr);
303       node.set_next(first_addr);
304       node.set_prev(0);
305       first_node.set_prev(cur_addr);
306       set_first(cur_addr);
307       incr_len();
308     }
309   }
310 
311   /** Insert node2 after node1. */
insert_afterplist_base_node_t312   void insert_after(plist_node_t &node1, plist_node_t &node2) {
313     ut_ad(m_mtr != nullptr);
314 
315     if (node1.is_null()) {
316       push_back(node2);
317     } else {
318       plist_node_t node3 = node1.get_next_node();
319       node1.set_next_node(node2);
320       node2.set_next_node(node3);
321 
322       if (node3.is_null()) {
323         set_last(node2.addr());
324       } else {
325         node3.set_prev_node(node2);
326       }
327 
328       node2.set_prev_node(node1);
329 
330       incr_len();
331     }
332   }
333 
334   /** Insert node2 before node3. */
insert_beforeplist_base_node_t335   void insert_before(plist_node_t &node3, plist_node_t &node2) {
336     ut_ad(m_mtr != nullptr);
337 
338     if (node3.is_null()) {
339       push_back(node2);
340     } else {
341       plist_node_t node1 = node3.get_prev_node();
342 
343       if (node1.is_null()) {
344         set_first(node2.addr());
345       } else {
346         node1.set_next_node(node2);
347       }
348 
349       node2.set_next_node(node3);
350       node3.set_prev_node(node2);
351       node2.set_prev_node(node1);
352 
353       incr_len();
354     }
355   }
356 
add_to_emptyplist_base_node_t357   void add_to_empty(plist_node_t &node) {
358     ut_ad(m_mtr != nullptr);
359     ut_ad(get_len() == 0);
360 
361     set_first(node.addr());
362     set_last(node.addr());
363     incr_len();
364   }
365 
push_backplist_base_node_t366   void push_back(plist_node_t &node) {
367     ut_ad(m_mtr != nullptr);
368 
369     if (get_len() == 0) {
370       add_to_empty(node);
371     } else {
372       paddr_t cur_addr = node.addr();
373       paddr_t last_addr = get_last();
374       plist_node_t last_node = get_node(last_addr);
375       node.set_next(0);
376       node.set_prev_node(last_node);
377       last_node.set_next(cur_addr);
378       set_last(cur_addr);
379       incr_len();
380     }
381   }
382 
emptyplist_base_node_t383   bool empty() const { return (get_len() == 0); }
384 
get_lenplist_base_node_t385   ulint get_len() const { return (mach_read_from_4(m_base + OFFSET_LEN)); }
386 
get_firstplist_base_node_t387   paddr_t get_first() const {
388     return (mach_read_from_2(m_base + OFFSET_FIRST));
389   }
390 
get_first_nodeplist_base_node_t391   plist_node_t get_first_node() const {
392     plist_node_t result(m_mtr, m_frame);
393 
394     if (!empty()) {
395       byte *node = m_frame + get_first();
396       result.set_node(node);
397     }
398     return (result);
399   }
400 
get_lastplist_base_node_t401   paddr_t get_last() const { return (mach_read_from_2(m_base + OFFSET_LAST)); }
402 
get_last_nodeplist_base_node_t403   plist_node_t get_last_node() const {
404     plist_node_t result(m_mtr, m_frame);
405 
406     if (!empty()) {
407       result.set_node(m_frame + get_last());
408     }
409 
410     return (result);
411   }
412 
set_lenplist_base_node_t413   void set_len(ulint len) {
414     ut_ad(m_mtr != nullptr);
415 
416     mlog_write_ulint(m_base + OFFSET_LEN, len, MLOG_4BYTES, m_mtr);
417   }
418 
incr_lenplist_base_node_t419   void incr_len() {
420     ut_ad(m_mtr != nullptr);
421 
422     ulint len = mach_read_from_4(m_base + OFFSET_LEN);
423     mlog_write_ulint(m_base + OFFSET_LEN, len + 1, MLOG_4BYTES, m_mtr);
424   }
425 
decr_lenplist_base_node_t426   void decr_len() {
427     ut_ad(m_mtr != nullptr);
428 
429     ulint len = mach_read_from_4(m_base + OFFSET_LEN);
430 
431     ut_ad(len > 0);
432 
433     mlog_write_ulint(m_base + OFFSET_LEN, len - 1, MLOG_4BYTES, m_mtr);
434   }
435 
set_firstplist_base_node_t436   void set_first(paddr_t addr) {
437     ut_ad(m_mtr != nullptr);
438 
439     mlog_write_ulint(m_base + OFFSET_FIRST, addr, MLOG_2BYTES, m_mtr);
440   }
441 
set_lastplist_base_node_t442   void set_last(paddr_t addr) {
443     ut_ad(m_mtr != nullptr);
444 
445     mlog_write_ulint(m_base + OFFSET_LAST, addr, MLOG_2BYTES, m_mtr);
446   }
447 
get_nodeplist_base_node_t448   plist_node_t get_node(paddr_t addr) {
449     byte *node = m_frame + addr;
450     return (plist_node_t(m_frame, node, m_mtr));
451   }
452 
addrplist_base_node_t453   paddr_t addr() const { return (m_base - m_frame); }
454 
printplist_base_node_t455   std::ostream &print(std::ostream &out) const {
456     out << "[plist_base_node_t: len=" << get_len() << ", first=" << get_first()
457         << ", last=" << get_last() << ", this=" << addr() << "]";
458     return (out);
459   }
460 
print_listplist_base_node_t461   std::ostream &print_list(std::ostream &out) const {
462     print(out);
463     out << std::endl;
464 
465     for (plist_node_t cur = get_first_node(); !cur.is_null();
466          cur = cur.get_next_node()) {
467       out << cur << std::endl;
468     }
469     return (out);
470   }
471 
472 #ifdef UNIV_DEBUG
473   /** Validate the page list.
474   @return true if valid, false otherwise. */
475   bool validate() const;
476 #endif /* UNIV_DEBUG */
477 
478   byte *m_frame;
479   byte *m_base;
480   mtr_t *m_mtr;
481 };
482 
483 inline std::ostream &operator<<(std::ostream &out,
484                                 const plist_base_node_t &obj) {
485   return (obj.print(out));
486 }
487 
488 using frag_id_t = ulint;
489 const ulint FRAG_ID_NULL = std::numeric_limits<uint16_t>::max();
490 const ulint KB16 = 16 * 1024;
491 
492 /** The node page (also can be called as the index page) contains a list of
493 index_entry_t objects. */
494 struct node_page_t : public basic_page_t {
495   /** Version information. One byte. */
496   static const ulint OFFSET_VERSION = FIL_PAGE_DATA;
497   static const ulint LOB_PAGE_DATA = OFFSET_VERSION + 1;
498 
set_version_0node_page_t499   void set_version_0() {
500     mlog_write_ulint(frame() + OFFSET_VERSION, 0, MLOG_1BYTE, m_mtr);
501   }
502 
503   /** Default ctor */
node_page_tnode_page_t504   node_page_t() {}
505 
node_page_tnode_page_t506   node_page_t(buf_block_t *block, mtr_t *mtr) : basic_page_t(block, mtr) {}
507 
node_page_tnode_page_t508   node_page_t(buf_block_t *block, mtr_t *mtr, dict_index_t *index)
509       : basic_page_t(block, mtr, index) {}
510 
node_page_tnode_page_t511   node_page_t(mtr_t *mtr, dict_index_t *index)
512       : basic_page_t(nullptr, mtr, index) {}
513 
514   /** Constructor
515   @param[in]	block	the buffer block. */
node_page_tnode_page_t516   node_page_t(buf_block_t *block) : basic_page_t(block, nullptr, nullptr) {}
517 
518   /** Import the node page or the index page.
519   @param[in]	trx_id	transaction identifier. */
520   void import(trx_id_t trx_id);
521 
522   buf_block_t *alloc(first_page_t &first_page, bool bulk);
523 
load_xnode_page_t524   buf_block_t *load_x(page_id_t page_id, page_size_t page_size) {
525     m_block = buf_page_get(page_id, page_size, RW_X_LATCH, m_mtr);
526     return (m_block);
527   }
528 
deallocnode_page_t529   void dealloc() {
530     btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, m_mtr);
531     m_block = nullptr;
532   }
533 
payloadnode_page_t534   static ulint payload() {
535     return (UNIV_PAGE_SIZE - LOB_PAGE_DATA - FIL_PAGE_DATA_END);
536   }
537 
max_space_availablenode_page_t538   static ulint max_space_available() { return (payload()); }
539 
540   /** Get the number of index entries this page can hold.
541   @return Number of index entries this page can hold. */
542   static ulint node_count();
543 
set_page_typenode_page_t544   void set_page_type() {
545     mlog_write_ulint(frame() + FIL_PAGE_TYPE, FIL_PAGE_TYPE_LOB_INDEX,
546                      MLOG_2BYTES, m_mtr);
547   }
548 
nodes_beginnode_page_t549   byte *nodes_begin() const { return (frame() + LOB_PAGE_DATA); }
550 };
551 
552 /** An entry representing one fragment page. */
553 struct z_frag_entry_t {
554  public:
555   /** Offset within frag entry pointing to prev frag entry. */
556   static const ulint OFFSET_PREV = 0;
557 
558   /** Offset within frag entry pointing to next frag entry. */
559   static const ulint OFFSET_NEXT = OFFSET_PREV + FIL_ADDR_SIZE;
560 
561   /** Offset within frag entry holding the page number of frag page. */
562   static const ulint OFFSET_PAGE_NO = OFFSET_NEXT + FIL_ADDR_SIZE;
563 
564   /** Number of used fragments. */
565   static const ulint OFFSET_N_FRAGS = OFFSET_PAGE_NO + 4;
566 
567   /** Used space in bytes. */
568   static const ulint OFFSET_USED_LEN = OFFSET_N_FRAGS + 2;
569 
570   /** Total free space in bytes. */
571   static const ulint OFFSET_TOTAL_FREE_LEN = OFFSET_USED_LEN + 2;
572 
573   /** The biggest free frag space in bytes. */
574   static const ulint OFFSET_BIG_FREE_LEN = OFFSET_TOTAL_FREE_LEN + 2;
575 
576   /** Total size of one frag entry. */
577   static const ulint SIZE = OFFSET_BIG_FREE_LEN + 2;
578 
579   /** Constructor. */
z_frag_entry_tz_frag_entry_t580   z_frag_entry_t(flst_node_t *node, mtr_t *mtr) : m_node(node), m_mtr(mtr) {}
581 
582   /** Constructor. */
z_frag_entry_tz_frag_entry_t583   z_frag_entry_t() : m_node(nullptr), m_mtr(nullptr) {}
584 
585   /** Constructor. */
z_frag_entry_tz_frag_entry_t586   z_frag_entry_t(mtr_t *mtr) : m_node(nullptr), m_mtr(mtr) {}
587 
588   /** Initialize the fragment entry contents.  For this to correctly
589   work, the current object must be initialized with proper file list
590   node and the mini transaction context. */
initz_frag_entry_t591   void init() {
592     ut_ad(m_mtr != nullptr);
593     ut_ad(m_node != nullptr);
594 
595     set_prev_null();
596     set_next_null();
597     set_page_no(FIL_NULL);
598     set_n_frags(0);
599     set_used_len(0);
600     set_total_free_len(0);
601     set_big_free_len(0);
602   }
603 
604   /** Set the current fragment entry to null. */
set_nullz_frag_entry_t605   void set_null() { m_node = nullptr; }
606 
607   /** Check if the current fragment entry is null.
608   @return true if the current fragment entry is null, false otherwise. */
is_nullz_frag_entry_t609   bool is_null() const { return (m_node == nullptr); }
610 
get_self_addrz_frag_entry_t611   fil_addr_t get_self_addr() const {
612     page_t *frame = page_align(m_node);
613     page_no_t page_no = mach_read_from_4(frame + FIL_PAGE_OFFSET);
614     uint16_t offset = static_cast<uint16_t>(m_node - frame);
615     ut_ad(offset < UNIV_PAGE_SIZE);
616     return (fil_addr_t(page_no, offset));
617   }
618 
619   /** Update the current fragment entry with information about
620   the given fragment page.
621   @param[in]	frag_page	the fragment page whose information
622                           will be stored in current fragment entry. */
623   void update(const z_frag_page_t &frag_page);
624 
625   /** Remove this node from the given list.
626   @param[in]	bnode	the base node of the list from which to remove
627                           current node. */
removez_frag_entry_t628   void remove(flst_base_node_t *bnode) {
629     ut_ad(m_mtr != nullptr);
630 
631     flst_remove(bnode, m_node, m_mtr);
632   }
633 
634   /** Add this node as the last node in the given list.
635   @param[in]  bnode  the base node of the file list. */
push_backz_frag_entry_t636   void push_back(flst_base_node_t *bnode) {
637     ut_ad(m_mtr != nullptr);
638 
639     flst_add_last(bnode, m_node, m_mtr);
640   }
641 
642   /** Add this node as the first node in the given list.
643   @param[in]  bnode  the base node of the file list. */
push_frontz_frag_entry_t644   void push_front(flst_base_node_t *bnode) {
645     ut_ad(m_mtr != nullptr);
646 
647     flst_add_first(bnode, m_node, m_mtr);
648   }
649 
650   /** Point to another frag entry.
651   @param[in]  node  point to this file list node. */
resetz_frag_entry_t652   void reset(flst_node_t *node) { m_node = node; }
653 
654   /** Set the previous frag entry as null. */
set_prev_nullz_frag_entry_t655   void set_prev_null() {
656     ut_ad(m_mtr != nullptr);
657 
658     flst_write_addr(m_node + OFFSET_PREV, fil_addr_null, m_mtr);
659   }
660 
661   /** Set the previous frag entry as null. */
set_prevz_frag_entry_t662   void set_prev(const fil_addr_t &addr) {
663     ut_ad(m_mtr != nullptr);
664 
665     flst_write_addr(m_node + OFFSET_PREV, addr, m_mtr);
666   }
667 
668   /** Get the location of previous frag entry. */
get_prevz_frag_entry_t669   fil_addr_t get_prev() const {
670     return (flst_read_addr(m_node + OFFSET_PREV, m_mtr));
671   }
672 
673   /** Set the next frag entry as null. */
set_next_nullz_frag_entry_t674   void set_next_null() {
675     ut_ad(m_mtr != nullptr);
676 
677     flst_write_addr(m_node + OFFSET_NEXT, fil_addr_null, m_mtr);
678   }
679 
680   /** Set the next frag entry. */
set_nextz_frag_entry_t681   void set_next(const fil_addr_t &addr) {
682     ut_ad(m_mtr != nullptr);
683 
684     flst_write_addr(m_node + OFFSET_NEXT, addr, m_mtr);
685   }
686 
687   /** Get the location of next frag entry. */
get_nextz_frag_entry_t688   fil_addr_t get_next() const {
689     return (flst_read_addr(m_node + OFFSET_NEXT, m_mtr));
690   }
691 
692   /** Get the frag page number. */
get_page_noz_frag_entry_t693   page_no_t get_page_no() const {
694     return (mach_read_from_4(m_node + OFFSET_PAGE_NO));
695   }
696 
697   /** Set the frag page number. */
set_page_noz_frag_entry_t698   void set_page_no(page_no_t page_no) const {
699     ut_ad(m_mtr != nullptr);
700 
701     mlog_write_ulint(m_node + OFFSET_PAGE_NO, page_no, MLOG_4BYTES, m_mtr);
702   }
703 
704   /** Free the fragment page pointed to by this entry.
705    @param[in]   mtr     mini transaction to be used for this operation.
706    @param[in]   index   The index to which this LOB belongs. */
707   void free_frag_page(mtr_t *mtr, dict_index_t *index);
708 
709   /** Get the frag page number. */
get_n_fragsz_frag_entry_t710   ulint get_n_frags() const {
711     return (mach_read_from_2(m_node + OFFSET_N_FRAGS));
712   }
713 
714   /** Set the frag page number. */
set_n_fragsz_frag_entry_t715   void set_n_frags(ulint frags) const {
716     ut_ad(m_mtr != nullptr);
717 
718     mlog_write_ulint(m_node + OFFSET_N_FRAGS, frags, MLOG_2BYTES, m_mtr);
719   }
720 
721   /** Get the used bytes. */
get_used_lenz_frag_entry_t722   ulint get_used_len() const {
723     return (mach_read_from_2(m_node + OFFSET_USED_LEN));
724   }
725 
726   /** Set the used bytes. */
set_used_lenz_frag_entry_t727   void set_used_len(ulint used) const {
728     ut_ad(m_mtr != nullptr);
729 
730     mlog_write_ulint(m_node + OFFSET_USED_LEN, used, MLOG_2BYTES, m_mtr);
731   }
732 
733   /** Get the total cumulative free bytes. */
get_total_free_lenz_frag_entry_t734   ulint get_total_free_len() const {
735     return (mach_read_from_2(m_node + OFFSET_TOTAL_FREE_LEN));
736   }
737 
738   /** Get the biggest free frag bytes. */
get_big_free_lenz_frag_entry_t739   ulint get_big_free_len() const {
740     return (mach_read_from_2(m_node + OFFSET_BIG_FREE_LEN));
741   }
742 
743   /** Set the total free bytes. */
set_total_free_lenz_frag_entry_t744   void set_total_free_len(ulint n) {
745     ut_ad(m_mtr != nullptr);
746 
747     mlog_write_ulint(m_node + OFFSET_TOTAL_FREE_LEN, n, MLOG_2BYTES, m_mtr);
748   }
749 
750   /** Set the big free frag bytes. */
set_big_free_lenz_frag_entry_t751   void set_big_free_len(ulint n) {
752     ut_ad(m_mtr != nullptr);
753 
754     mlog_write_ulint(m_node + OFFSET_BIG_FREE_LEN, n, MLOG_2BYTES, m_mtr);
755   }
756 
757   void purge(flst_base_node_t *used_lst, flst_base_node_t *free_lst);
758 
759   std::ostream &print(std::ostream &out) const;
760 
761  private:
762   /** The location where the fragment entry node is located. */
763   flst_node_t *m_node;
764 
765   /** The mini transaction context for operating on this fragment
766   entry. */
767   mtr_t *m_mtr;
768 };
769 
770 inline std::ostream &operator<<(std::ostream &out, const z_frag_entry_t &obj) {
771   return (obj.print(out));
772 }
773 
774 /** An index page containing an array of z_index_entry_t objects. */
775 struct z_index_page_t {
776   /** Version information. One byte. */
777   static const ulint OFFSET_VERSION = FIL_PAGE_DATA;
778   static const ulint LOB_PAGE_DATA = OFFSET_VERSION + 1;
779 
z_index_page_tz_index_page_t780   explicit z_index_page_t(mtr_t *mtr) : m_block(nullptr), m_mtr(mtr) {}
781 
z_index_page_tz_index_page_t782   z_index_page_t(mtr_t *mtr, dict_index_t *index)
783       : m_block(nullptr), m_mtr(mtr), m_index(index) {}
784 
785   /** Constructor
786   @param[in]	block	the buffer block. */
z_index_page_tz_index_page_t787   explicit z_index_page_t(buf_block_t *block)
788       : m_block(block), m_mtr(nullptr), m_index(nullptr) {}
789 
790   /** Write the space identifier to the page header, without generating
791   redo log records.
792   @param[in]	space_id	the space identifier. */
set_space_id_no_redoz_index_page_t793   void set_space_id_no_redo(space_id_t space_id) {
794     mlog_write_ulint(frame() + FIL_PAGE_SPACE_ID, space_id, MLOG_4BYTES,
795                      nullptr);
796   }
797 
798   /** Set the correct page type. */
set_page_typez_index_page_t799   void set_page_type(mtr_t *mtr) {
800     mlog_write_ulint(frame() + FIL_PAGE_TYPE, FIL_PAGE_TYPE_ZLOB_INDEX,
801                      MLOG_2BYTES, mtr);
802   }
803 
set_version_0z_index_page_t804   void set_version_0() {
805     mlog_write_ulint(frame() + OFFSET_VERSION, 0, MLOG_1BYTE, m_mtr);
806   }
807 
808   /** Set the next page number. */
set_next_page_noz_index_page_t809   void set_next_page_no(page_no_t page_no) {
810     ut_ad(m_mtr != nullptr);
811 
812     mlog_write_ulint(frame() + FIL_PAGE_NEXT, page_no, MLOG_4BYTES, m_mtr);
813   }
814 
815   /** Get the page number. */
get_page_noz_index_page_t816   page_no_t get_page_no() const {
817     return (mach_read_from_4(frame() + FIL_PAGE_OFFSET));
818   }
819 
820   /** Get the next page number. */
get_next_page_noz_index_page_t821   page_no_t get_next_page_no() const {
822     return (mach_read_from_4(frame() + FIL_PAGE_NEXT));
823   }
824 
825   /** Allocate an ZLOB index page.
826   @return the buffer block of the allocated zlob index page. */
827   buf_block_t *alloc(z_first_page_t &first, bool bulk);
828 
829   void import(trx_id_t trx_id);
830 
831   /** Load the given compressed LOB index page.
832   @param[in]	page_no		compressed LOB index page number.
833   @return	the buffer block of the given page number. */
load_xz_index_page_t834   buf_block_t *load_x(page_no_t page_no) {
835     page_id_t page_id(dict_index_get_space(m_index), page_no);
836     page_size_t page_size(dict_table_page_size(m_index->table));
837     m_block = buf_page_get(page_id, page_size, RW_X_LATCH, m_mtr);
838 
839     ut_ad(m_block->get_page_type() == FIL_PAGE_TYPE_ZLOB_INDEX);
840     return (m_block);
841   }
842 
deallocz_index_page_t843   void dealloc() {
844     btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, m_mtr);
845     m_block = nullptr;
846   }
847 
848   void init(flst_base_node_t *free_lst, mtr_t *mtr);
849 
payloadz_index_page_t850   ulint payload() const {
851     const page_size_t page_size(dict_table_page_size(m_index->table));
852 
853     return (page_size.physical() - FIL_PAGE_DATA_END - LOB_PAGE_DATA);
854   }
855 
856   ulint get_n_index_entries() const;
857 
framez_index_page_t858   byte *frame() const { return (buf_block_get_frame(m_block)); }
859 
860   /** The buffer block of the compressed LOB index page. */
861   buf_block_t *m_block;
862 
863   /** The mini-transaction context. */
864   mtr_t *m_mtr;
865 
866   /** The index to which the LOB belongs. */
867   dict_index_t *m_index;
868 };
869 
870 /** The data page holding the zlob. */
871 struct z_data_page_t {
872   /** Version information. One byte. */
873   static const ulint OFFSET_VERSION = FIL_PAGE_DATA;
874 
875   /* The length of compressed data stored in this page. */
876   static const ulint OFFSET_DATA_LEN = OFFSET_VERSION + 1;
877 
878   /* The transaction that created this page. */
879   static const ulint OFFSET_TRX_ID = OFFSET_DATA_LEN + 4;
880 
881   /* The data stored in this page begins at this offset. */
882   static const ulint OFFSET_DATA_BEGIN = OFFSET_TRX_ID + 6;
883 
payloadz_data_page_t884   ulint payload() {
885     page_size_t page_size(dict_table_page_size(m_index->table));
886     return (page_size.physical() - OFFSET_DATA_BEGIN - FIL_PAGE_DATA_END);
887   }
888 
z_data_page_tz_data_page_t889   z_data_page_t(mtr_t *mtr, dict_index_t *index)
890       : m_block(nullptr), m_mtr(mtr), m_index(index) {}
891 
z_data_page_tz_data_page_t892   z_data_page_t(buf_block_t *block, mtr_t *mtr, dict_index_t *index)
893       : m_block(block), m_mtr(mtr), m_index(index) {}
894 
895   /* Constructor.
896   @param[in]	block	the buffer block. */
z_data_page_tz_data_page_t897   z_data_page_t(buf_block_t *block)
898       : m_block(block), m_mtr(nullptr), m_index(nullptr) {}
899 
900   /** Write the space identifier to the page header, without generating
901   redo log records.
902   @param[in]	space_id	the space identifier. */
set_space_id_no_redoz_data_page_t903   void set_space_id_no_redo(space_id_t space_id) {
904     mlog_write_ulint(frame() + FIL_PAGE_SPACE_ID, space_id, MLOG_4BYTES,
905                      nullptr);
906   }
907 
908   /** Allocate one data page.
909   @param[in]	hint	hint page number for allocation.
910   @param[in]	bulk	true if bulk operation (OPCODE_INSERT_BULK)
911                           false otherwise.
912   @return the allocated buffer block. */
913   buf_block_t *alloc(page_no_t hint, bool bulk);
914 
915   /** Free this data page holding the zlob data. */
deallocz_data_page_t916   void dealloc() {
917     btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, m_mtr);
918     m_block = nullptr;
919   }
920 
921   /** Set the correct page type. */
set_page_typez_data_page_t922   void set_page_type() {
923     ut_ad(m_mtr != nullptr);
924 
925     mlog_write_ulint(frame() + FIL_PAGE_TYPE, FIL_PAGE_TYPE_ZLOB_DATA,
926                      MLOG_2BYTES, m_mtr);
927   }
928 
set_version_0z_data_page_t929   void set_version_0() {
930     ut_ad(m_mtr != nullptr);
931 
932     mlog_write_ulint(frame() + OFFSET_VERSION, 0, MLOG_1BYTE, m_mtr);
933   }
934 
935   /** Set the next page. */
set_next_pagez_data_page_t936   void set_next_page(page_no_t page_no) {
937     ut_ad(m_mtr != nullptr);
938 
939     mlog_write_ulint(frame() + FIL_PAGE_NEXT, page_no, MLOG_4BYTES, m_mtr);
940   }
941 
initz_data_page_t942   void init() {
943     ut_ad(m_mtr != nullptr);
944 
945     set_page_type();
946     set_version_0();
947     set_next_page(FIL_NULL);
948     set_data_len(0);
949     set_trx_id(0);
950   }
951 
begin_data_ptrz_data_page_t952   byte *begin_data_ptr() const { return (frame() + OFFSET_DATA_BEGIN); }
953 
set_data_lenz_data_page_t954   void set_data_len(ulint len) {
955     ut_ad(m_mtr != nullptr);
956 
957     mlog_write_ulint(frame() + OFFSET_DATA_LEN, len, MLOG_4BYTES, m_mtr);
958   }
959 
get_data_lenz_data_page_t960   ulint get_data_len() const {
961     return (mach_read_from_4(frame() + OFFSET_DATA_LEN));
962   }
963 
set_trx_idz_data_page_t964   void set_trx_id(trx_id_t tid) {
965     ut_ad(m_mtr != nullptr);
966 
967     byte *ptr = frame() + OFFSET_TRX_ID;
968     mach_write_to_6(ptr, tid);
969     mlog_log_string(ptr, 6, m_mtr);
970   }
971 
972   /** Update the header with given transaction identifier, without
973   writing redo log records.
974   @param[in]	tid	transaction identifier.*/
set_trx_id_no_redoz_data_page_t975   void set_trx_id_no_redo(trx_id_t tid) {
976     byte *ptr = frame() + OFFSET_TRX_ID;
977     mach_write_to_6(ptr, tid);
978   }
979 
980   /** Get the page number. */
get_page_noz_data_page_t981   page_no_t get_page_no() const {
982     return (mach_read_from_4(frame() + FIL_PAGE_OFFSET));
983   }
984 
get_self_addrz_data_page_t985   fil_addr_t get_self_addr() const {
986     page_no_t page_no = get_page_no();
987     return (fil_addr_t(page_no, OFFSET_DATA_BEGIN));
988   }
989 
framez_data_page_t990   byte *frame() const { return (buf_block_get_frame(m_block)); }
991 
992   buf_block_t *m_block;
993   mtr_t *m_mtr;
994   dict_index_t *m_index;
995 };
996 
997 /** A frag nodes page containing an array of z_frag_entry_t objects. */
998 struct z_frag_node_page_t {
999   /** Version information. One byte. */
1000   static const ulint OFFSET_VERSION = FIL_PAGE_DATA;
1001   static const ulint LOB_PAGE_DATA = OFFSET_VERSION + 1;
1002 
z_frag_node_page_tz_frag_node_page_t1003   z_frag_node_page_t(mtr_t *mtr, dict_index_t *index)
1004       : m_block(nullptr), m_mtr(mtr), m_index(index) {}
1005 
1006   /** Constructor
1007   @param[in]	block	the buffer block.*/
z_frag_node_page_tz_frag_node_page_t1008   explicit z_frag_node_page_t(buf_block_t *block)
1009       : m_block(block), m_mtr(nullptr), m_index(nullptr) {}
1010 
1011   /** Write the space identifier to the page header, without generating
1012   redo log records.
1013   @param[in]	space_id	the space identifier. */
set_space_id_no_redoz_frag_node_page_t1014   void set_space_id_no_redo(space_id_t space_id) {
1015     mlog_write_ulint(frame() + FIL_PAGE_SPACE_ID, space_id, MLOG_4BYTES,
1016                      nullptr);
1017   }
1018 
1019   /** Set the correct page type. */
set_page_typez_frag_node_page_t1020   void set_page_type() {
1021     ut_ad(m_mtr != nullptr);
1022 
1023     mlog_write_ulint(frame() + FIL_PAGE_TYPE, FIL_PAGE_TYPE_ZLOB_FRAG_ENTRY,
1024                      MLOG_2BYTES, m_mtr);
1025   }
1026 
1027   /** Set the next page number. */
set_next_page_noz_frag_node_page_t1028   void set_next_page_no(page_no_t page_no) {
1029     ut_ad(m_mtr != nullptr);
1030 
1031     mlog_write_ulint(frame() + FIL_PAGE_NEXT, page_no, MLOG_4BYTES, m_mtr);
1032   }
1033 
set_version_0z_frag_node_page_t1034   void set_version_0() {
1035     ut_ad(m_mtr != nullptr);
1036 
1037     mlog_write_ulint(frame() + OFFSET_VERSION, 0, MLOG_1BYTE, m_mtr);
1038   }
1039 
1040   /** Get the page number. */
get_page_noz_frag_node_page_t1041   page_no_t get_page_no() const {
1042     return (mach_read_from_4(frame() + FIL_PAGE_OFFSET));
1043   }
1044 
1045   /** Get the next page number. */
get_next_page_noz_frag_node_page_t1046   page_no_t get_next_page_no() const {
1047     return (mach_read_from_4(frame() + FIL_PAGE_NEXT));
1048   }
1049 
1050   /** Allocate a fragment nodes page.
1051   @return buffer block of the allocated fragment nodes page or nullptr. */
1052   buf_block_t *alloc(z_first_page_t &first, bool bulk);
1053 
deallocz_frag_node_page_t1054   void dealloc() {
1055     btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, m_mtr);
1056     m_block = nullptr;
1057   }
1058 
1059   /** Load the given compressed LOB fragment page.
1060   @param[in]	page_no		compressed LOB fragment page number.
1061   @return	the buffer block of the given page number. */
load_xz_frag_node_page_t1062   buf_block_t *load_x(page_no_t page_no) {
1063     page_id_t page_id(dict_index_get_space(m_index), page_no);
1064     page_size_t page_size(dict_table_page_size(m_index->table));
1065     m_block = buf_page_get(page_id, page_size, RW_X_LATCH, m_mtr);
1066 
1067     ut_ad(m_block->get_page_type() == FIL_PAGE_TYPE_ZLOB_FRAG_ENTRY);
1068 
1069     return (m_block);
1070   }
1071 
initz_frag_node_page_t1072   void init(flst_base_node_t *free_lst) {
1073     ut_ad(m_mtr != nullptr);
1074 
1075     ulint n = get_n_frag_entries();
1076     for (ulint i = 0; i < n; ++i) {
1077       byte *ptr = frame() + LOB_PAGE_DATA;
1078       ptr += (i * z_frag_entry_t::SIZE);
1079       z_frag_entry_t entry(ptr, m_mtr);
1080       entry.init();
1081       entry.push_back(free_lst);
1082     }
1083   }
1084 
payloadz_frag_node_page_t1085   ulint payload() const {
1086     const page_size_t page_size = dict_table_page_size(m_index->table);
1087     return (page_size.physical() - FIL_PAGE_DATA_END - LOB_PAGE_DATA);
1088   }
1089 
get_n_frag_entriesz_frag_node_page_t1090   ulint get_n_frag_entries() const {
1091     return (payload() / z_frag_entry_t::SIZE);
1092   }
1093 
framez_frag_node_page_t1094   byte *frame() const { return (buf_block_get_frame(m_block)); }
1095 
1096   /** The buffer block of the fragment page. */
1097   buf_block_t *m_block;
1098 
1099   /** The mini-transaction context. */
1100   mtr_t *m_mtr;
1101 
1102   /** The index to which the LOB belongs. */
1103   dict_index_t *m_index;
1104 };  // struct z_frag_node_page_t
1105 
1106 /** Print information about the given compressed lob.
1107 @param[in]  index  the index dictionary object.
1108 @param[in]  ref    the LOB reference
1109 @param[out] out    the output stream where information is printed.
1110 @return DB_SUCCESS on success, or an error code. */
1111 dberr_t z_print_info(const dict_index_t *index, const lob::ref_t &ref,
1112                      std::ostream &out);
1113 
1114 /** The fragment node represents one fragment. */
1115 struct frag_node_t {
1116   /** The offset where the length of fragment is stored.  The length
1117   includes both the payload and the meta data overhead. */
1118   static const ulint OFFSET_LEN = plist_node_t::SIZE;
1119 
1120   /** The offset where fragment id is stored. */
1121   static const ulint OFFSET_FRAG_ID = OFFSET_LEN + 2;
1122 
1123   /** The offset where fragment data is stored. */
1124   static const ulint OFFSET_DATA = OFFSET_FRAG_ID + 2;
1125 
1126   /** The size of a page directory entry in a fragment page in bytes.
1127   This must be equal to z_frag_page_t::SIZE_OF_PAGE_DIR_ENTRY*/
1128   static const ulint SIZE_OF_PAGE_DIR_ENTRY = 2;
1129 
1130   /** Constructor.
1131   @param[in]	node	page list node.
1132   @param[in]	mtr	mini-transaction. */
frag_node_tfrag_node_t1133   frag_node_t(const plist_node_t &node, mtr_t *mtr)
1134       : m_node(node), m_mtr(mtr) {}
1135 
frag_node_tfrag_node_t1136   frag_node_t(byte *frame, byte *ptr) : m_node(frame, ptr), m_mtr(nullptr) {}
1137 
1138   /** Constructor.
1139   @param[in]	frame	the page frame where the fragment node is
1140                           located.
1141   @param[in]	ptr	the location of fragment node within page
1142                           frame.
1143   @param[in]	mtr	the mini-transaction context. */
frag_node_tfrag_node_t1144   frag_node_t(byte *frame, byte *ptr, mtr_t *mtr)
1145       : m_node(frame, ptr, mtr), m_mtr(mtr) {}
1146 
1147   /** Amount of space that will be used up by meta data. When a free
1148   space is taken from the fragment page to be used as a fragment
1149   node, header and footer will be the overhead. Footer is the page dir
1150   entry. The page dir entry may not be contiguous with the fragment.*/
overheadfrag_node_t1151   static ulint overhead() { return (SIZE_OF_PAGE_DIR_ENTRY + OFFSET_DATA); }
1152 
1153   /** Only the header size. Don't include the page dir entry size here.*/
header_sizefrag_node_t1154   static ulint header_size() { return (OFFSET_DATA); }
1155 
1156   /** Constructor.
1157   @param[in]	frame	the page frame where the fragment node is
1158                           located.
1159   @param[in]	ptr	the location of fragment node within page
1160                           frame.
1161   @param[in]	len	the length of the fragment.
1162   @param[in]	mtr	the mini-transaction context. */
frag_node_tfrag_node_t1163   frag_node_t(byte *frame, byte *ptr, ulint len, mtr_t *mtr)
1164       : m_node(frame, ptr, mtr), m_mtr(mtr) {
1165     ut_ad(mtr != nullptr);
1166 
1167     mlog_write_ulint(m_node.ptr() + OFFSET_LEN, len, MLOG_2BYTES, mtr);
1168   }
1169 
frag_beginfrag_node_t1170   byte *frag_begin() const { return (m_node.ptr() + OFFSET_DATA); }
1171 
data_beginfrag_node_t1172   byte *data_begin() const { return (m_node.ptr() + OFFSET_DATA); }
1173 
set_total_lenfrag_node_t1174   void set_total_len(ulint len) {
1175     ut_ad(m_mtr != nullptr);
1176 
1177     mlog_write_ulint(m_node.ptr() + OFFSET_LEN, len, MLOG_2BYTES, m_mtr);
1178   }
1179 
1180   /** Increment the total length of this fragment by 2 bytes. */
incr_length_by_2frag_node_t1181   void incr_length_by_2() {
1182     ut_ad(m_mtr != nullptr);
1183 
1184     ulint len = mach_read_from_2(m_node.ptr() + OFFSET_LEN);
1185     mlog_write_ulint(m_node.ptr() + OFFSET_LEN, len + SIZE_OF_PAGE_DIR_ENTRY,
1186                      MLOG_2BYTES, m_mtr);
1187   }
1188 
1189   /** Decrement the total length of this fragment by 2 bytes. */
decr_length_by_2frag_node_t1190   void decr_length_by_2() {
1191     ut_ad(m_mtr != nullptr);
1192 
1193     ulint len = mach_read_from_2(m_node.ptr() + OFFSET_LEN);
1194     mlog_write_ulint(m_node.ptr() + OFFSET_LEN, len - SIZE_OF_PAGE_DIR_ENTRY,
1195                      MLOG_2BYTES, m_mtr);
1196   }
1197 
is_beforefrag_node_t1198   bool is_before(const frag_node_t &frag) const {
1199     return (m_node.is_before(frag.m_node));
1200   }
1201 
set_frag_id_nullfrag_node_t1202   void set_frag_id_null() {
1203     ut_ad(m_mtr != nullptr);
1204 
1205     mlog_write_ulint(m_node.ptr() + OFFSET_FRAG_ID, FRAG_ID_NULL, MLOG_2BYTES,
1206                      m_mtr);
1207   }
1208 
set_frag_idfrag_node_t1209   void set_frag_id(ulint id) {
1210     ut_ad(m_mtr != nullptr);
1211 
1212     mlog_write_ulint(m_node.ptr() + OFFSET_FRAG_ID, id, MLOG_2BYTES, m_mtr);
1213   }
1214 
get_frag_idfrag_node_t1215   ulint get_frag_id() const {
1216     return (mach_read_from_2(m_node.ptr() + OFFSET_FRAG_ID));
1217   }
1218 
1219   /** Get the space available in this fragment for storing data. */
payloadfrag_node_t1220   ulint payload() const { return (get_total_len() - header_size()); }
1221 
1222   /** Get the total length of this fragment, including its metadata. */
get_total_lenfrag_node_t1223   ulint get_total_len() const {
1224     return (mach_read_from_2(m_node.ptr() + OFFSET_LEN));
1225   }
1226 
1227   /** Get the offset of the current fragment within page.
1228   @return the offset of the current fragment within. */
addrfrag_node_t1229   paddr_t addr() const { return (m_node.addr()); }
1230 
1231   /** Gets the pointer to the beginning of the current fragment.  Note
1232   that the beginning of the fragment contains meta data.
1233   @return pointer to the beginning of the current fragment. */
ptrfrag_node_t1234   byte *ptr() const {
1235     ut_ad(!m_node.is_null());
1236     return (m_node.ptr());
1237   }
1238 
1239   /** Gets the pointer just after the current fragment.  The pointer
1240   returned does not belong to this fragment.  This is used to check
1241   adjacency.
1242   @return pointer to the end of the current fragment. */
end_ptrfrag_node_t1243   byte *end_ptr() const {
1244     ut_ad(!m_node.is_null());
1245     return (ptr() + get_total_len());
1246   }
1247 
1248   /** Get the page frame.
1249   @return the page frame. */
framefrag_node_t1250   byte *frame() const { return (m_node.get_frame()); }
1251 
printfrag_node_t1252   std::ostream &print(std::ostream &out) const {
1253     if (!m_node.is_null()) {
1254       ulint len = get_total_len();
1255       out << "[frag_node_t: " << m_node << ", len=" << len << "/" << payload()
1256           << ", frag_id=" << get_frag_id() << "]";
1257     } else {
1258       out << "[frag_node_t: null, len=0]";
1259     }
1260     return (out);
1261   }
1262 
get_next_fragfrag_node_t1263   frag_node_t get_next_frag() {
1264     ut_ad(!is_null());
1265     plist_node_t next = m_node.get_next_node();
1266     return (frag_node_t(next, m_mtr));
1267   }
1268 
get_next_nodefrag_node_t1269   frag_node_t get_next_node() { return (get_next_frag()); }
1270 
get_prev_nodefrag_node_t1271   frag_node_t get_prev_node() { return (get_prev_frag()); }
1272 
get_prev_fragfrag_node_t1273   frag_node_t get_prev_frag() {
1274     ut_ad(!is_null());
1275     plist_node_t prev = m_node.get_prev_node();
1276     return (frag_node_t(prev, m_mtr));
1277   }
1278 
1279   /** Merge the current fragment node with the given next fragment node.
1280   This will succeed only if they are adjacent to each other.
1281   Detailed Note: There is a new page type FIL_PAGE_TYPE_ZLOB_FRAG_ENTRY
1282   - and we can call it the fragment pages.  Each fragment page contains
1283   one or more fragments.  Each fragment is represented by a frag_node_t.
1284   And each fragment can be of different size.  Consider a fragment page
1285   containing 4 fragments - f1, f2, f3 and f4.  Suppose we free f2 and
1286   f3, then we can merge them into one single bigger fragment which is
1287   free.
1288   @param[in]	next	the next fragment.
1289   @return true if merge done, false otherwise. */
mergefrag_node_t1290   bool merge(frag_node_t &next) {
1291     ut_ad(m_mtr != nullptr);
1292 
1293     byte *p1 = ptr();
1294     ulint len1 = get_total_len();
1295     byte *p2 = next.ptr();
1296     ulint len2 = next.get_total_len();
1297 
1298     if (p2 == (p1 + len1)) {
1299       set_total_len(len1 + len2);
1300       return (true);
1301     }
1302 
1303     return (false);
1304   }
1305 
is_nullfrag_node_t1306   bool is_null() const { return (m_node.is_null()); }
1307 
is_equalfrag_node_t1308   bool is_equal(const frag_node_t &that) const {
1309     return (m_node.is_equal(that.m_node));
1310   }
1311 
is_equalfrag_node_t1312   bool is_equal(const plist_node_t &node) const {
1313     return (m_node.is_equal(node));
1314   }
1315 
1316   /** The page list node. */
1317   plist_node_t m_node;
1318 
1319  private:
1320   /** The mini-transaction context.  It is only in-memory. */
1321   mtr_t *m_mtr;
1322 };
1323 
1324 inline std::ostream &operator<<(std::ostream &out, const frag_node_t &obj) {
1325   return (obj.print(out));
1326 }
1327 
1328 /** The fragment page.  This page will contain fragments from different
1329 zlib streams. */
1330 struct z_frag_page_t {
1331   /** Version information. One byte. */
1332   static const ulint OFFSET_VERSION = FIL_PAGE_DATA;
1333 
1334   /** The location of z_frag_entry_t for this page. */
1335   static const ulint OFFSET_FRAG_ENTRY = OFFSET_VERSION + 1;
1336 
1337   /** The offset within page where the free space list begins. */
1338   static const ulint OFFSET_FREE_LIST = OFFSET_FRAG_ENTRY + FIL_ADDR_SIZE;
1339 
1340   /** The offset within page where the fragment list begins. */
1341   static const ulint OFFSET_FRAGS_LIST =
1342       OFFSET_FREE_LIST + plist_base_node_t::SIZE;
1343 
1344   /** The offset within page where the fragments can occupy . */
1345   static const ulint OFFSET_FRAGS_BEGIN =
1346       OFFSET_FRAGS_LIST + plist_base_node_t::SIZE;
1347 
1348   /** Offset of number of page directory entries (from end) */
1349   static const ulint OFFSET_PAGE_DIR_ENTRY_COUNT = FIL_PAGE_DATA_END + 2;
1350 
1351   /** Offset of first page directory entry (from end) */
1352   static const ulint OFFSET_PAGE_DIR_ENTRY_FIRST =
1353       OFFSET_PAGE_DIR_ENTRY_COUNT + 2;
1354 
1355   static const ulint SIZE_OF_PAGE_DIR_ENTRY = 2; /* bytes */
1356 
1357   /** Constructor.
1358   @param[in]	block	the buffer block containing the fragment page.
1359   @param[in]	mtr	the mini transaction context.
1360   @param[in]	index	the clustered index to which LOB belongs. */
z_frag_page_tz_frag_page_t1361   z_frag_page_t(buf_block_t *block, mtr_t *mtr, dict_index_t *index)
1362       : m_block(block), m_mtr(mtr), m_index(index) {
1363     ut_ad(frag_node_t::SIZE_OF_PAGE_DIR_ENTRY ==
1364           z_frag_page_t::SIZE_OF_PAGE_DIR_ENTRY);
1365   }
1366 
1367   /** Constructor.
1368   @param[in]	mtr	the mini transaction context.
1369   @param[in]	index	the clustered index to which LOB belongs. */
z_frag_page_tz_frag_page_t1370   z_frag_page_t(mtr_t *mtr, dict_index_t *index)
1371       : z_frag_page_t(nullptr, mtr, index) {}
1372 
1373   /** Constructor.
1374   @param[in]	block	the buffer block containing the fragment page.*/
z_frag_page_tz_frag_page_t1375   explicit z_frag_page_t(buf_block_t *block)
1376       : m_block(block), m_mtr(nullptr), m_index(nullptr) {
1377     ut_ad(frag_node_t::SIZE_OF_PAGE_DIR_ENTRY ==
1378           z_frag_page_t::SIZE_OF_PAGE_DIR_ENTRY);
1379   }
1380 
1381   /** Write the space identifier to the page header, without generating
1382   redo log records.
1383   @param[in]	space_id	the space identifier. */
set_space_id_no_redoz_frag_page_t1384   void set_space_id_no_redo(space_id_t space_id) {
1385     mlog_write_ulint(frame() + FIL_PAGE_SPACE_ID, space_id, MLOG_4BYTES,
1386                      nullptr);
1387   }
1388 
1389   z_frag_entry_t get_frag_entry_x();
1390   z_frag_entry_t get_frag_entry_s();
1391 
update_frag_entryz_frag_page_t1392   void update_frag_entry() {
1393     z_frag_entry_t entry = get_frag_entry_x();
1394     entry.update(*this);
1395   }
1396 
set_version_0z_frag_page_t1397   void set_version_0() {
1398     mlog_write_ulint(frame() + OFFSET_VERSION, 0, MLOG_1BYTE, m_mtr);
1399   }
1400 
addr2ptr_xz_frag_page_t1401   flst_node_t *addr2ptr_x(fil_addr_t &addr) {
1402     space_id_t space = dict_index_get_space(m_index);
1403     const page_size_t page_size = dict_table_page_size(m_index->table);
1404     return (fut_get_ptr(space, page_size, addr, RW_X_LATCH, m_mtr));
1405   }
1406 
addr2ptr_sz_frag_page_t1407   flst_node_t *addr2ptr_s(fil_addr_t &addr) {
1408     space_id_t space = dict_index_get_space(m_index);
1409     const page_size_t page_size = dict_table_page_size(m_index->table);
1410     return (fut_get_ptr(space, page_size, addr, RW_S_LATCH, m_mtr));
1411   }
1412 
set_frag_entryz_frag_page_t1413   void set_frag_entry(const fil_addr_t &addr) const {
1414     ut_a(addr.boffset < get_page_size());
1415     return (flst_write_addr(frame() + OFFSET_FRAG_ENTRY, addr, m_mtr));
1416   }
1417 
1418   /** Obtain the file address of the fragment entry that denotes the
1419   current fragment page.
1420   @return the file address of the fragment entry. */
get_frag_entryz_frag_page_t1421   fil_addr_t get_frag_entry() const {
1422     return (flst_read_addr(frame() + OFFSET_FRAG_ENTRY, m_mtr));
1423   }
1424 
set_frag_entry_nullz_frag_page_t1425   void set_frag_entry_null() const {
1426     return (flst_write_addr(frame() + OFFSET_FRAG_ENTRY, fil_addr_null, m_mtr));
1427   }
1428 
get_n_dir_entriesz_frag_page_t1429   ulint get_n_dir_entries() const {
1430     byte *ptr = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_COUNT;
1431     return (mach_read_from_2(ptr));
1432   }
1433 
set_n_dir_entriesz_frag_page_t1434   void set_n_dir_entries(ulint n) const {
1435     byte *ptr = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_COUNT;
1436     mlog_write_ulint(ptr, n, MLOG_2BYTES, m_mtr);
1437   }
1438 
is_border_fragz_frag_page_t1439   bool is_border_frag(const frag_node_t &node) const {
1440     return (slots_end_ptr() == node.end_ptr());
1441   }
1442 
slots_end_ptrz_frag_page_t1443   byte *slots_end_ptr() const {
1444     ulint n = get_n_dir_entries();
1445     byte *first = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_COUNT;
1446     byte *ptr = first - (n * SIZE_OF_PAGE_DIR_ENTRY);
1447     return (ptr);
1448   }
1449 
frag_id_to_addrz_frag_page_t1450   paddr_t frag_id_to_addr(ulint frag_id) const {
1451     byte *first = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_FIRST;
1452     byte *ptr = first - (frag_id * SIZE_OF_PAGE_DIR_ENTRY);
1453     return (mach_read_from_2(ptr));
1454   }
1455 
get_nth_dir_entryz_frag_page_t1456   ulint get_nth_dir_entry(ulint frag_id) {
1457     byte *first = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_FIRST;
1458     byte *ptr = first - (frag_id * SIZE_OF_PAGE_DIR_ENTRY);
1459     return (mach_read_from_2(ptr));
1460   }
1461 
set_nth_dir_entryz_frag_page_t1462   void set_nth_dir_entry(ulint frag_id, paddr_t val) {
1463     byte *first = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_FIRST;
1464     byte *ptr = first - (frag_id * SIZE_OF_PAGE_DIR_ENTRY);
1465     mlog_write_ulint(ptr, val, MLOG_2BYTES, m_mtr);
1466   }
1467 
init_last_dir_entryz_frag_page_t1468   ulint init_last_dir_entry() {
1469     ulint n = get_n_dir_entries();
1470     set_nth_dir_entry(n - 1, 0);
1471     return (n - 1);
1472   }
1473 
incr_n_dir_entriesz_frag_page_t1474   void incr_n_dir_entries() const {
1475     byte *ptr = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_COUNT;
1476     ulint n = mach_read_from_2(ptr);
1477     ut_a(n < FRAG_ID_NULL);
1478     mlog_write_ulint(ptr, n + 1, MLOG_2BYTES, m_mtr);
1479   }
1480 
decr_n_dir_entriesz_frag_page_t1481   void decr_n_dir_entries() const {
1482     byte *ptr = frame() + get_page_size() - OFFSET_PAGE_DIR_ENTRY_COUNT;
1483     ulint n = mach_read_from_2(ptr);
1484     ut_a(n > 0);
1485     mlog_write_ulint(ptr, n - 1, MLOG_2BYTES, m_mtr);
1486   }
1487 
get_page_sizez_frag_page_t1488   ulint get_page_size() const {
1489     const page_size_t page_size = dict_table_page_size(m_index->table);
1490     return (page_size.physical());
1491   }
1492 
space_used_by_dirz_frag_page_t1493   ulint space_used_by_dir() const {
1494     ulint n = get_n_dir_entries();
1495     return (n * SIZE_OF_PAGE_DIR_ENTRY);
1496   }
1497 
locate_free_slotz_frag_page_t1498   ulint locate_free_slot() {
1499     ulint n = get_n_dir_entries();
1500 
1501     for (ulint frag_id = 0; frag_id < n; frag_id++) {
1502       ulint paddr = get_nth_dir_entry(frag_id);
1503 
1504       if (paddr == 0) {
1505         return (frag_id);
1506       }
1507     }
1508 
1509     return (FRAG_ID_NULL);
1510   }
1511 
1512   /** Allocate a fragment id.
1513   @return On success, return fragment id.
1514   @return On failure, return FRAG_ID_NULL. */
alloc_frag_idz_frag_page_t1515   ulint alloc_frag_id() {
1516     ulint id = locate_free_slot();
1517 
1518     if (id == FRAG_ID_NULL) {
1519       return (alloc_dir_entry());
1520     }
1521 
1522     return (id);
1523   }
1524 
print_frag_idz_frag_page_t1525   std::ostream &print_frag_id(std::ostream &out) {
1526     ulint n = get_n_dir_entries();
1527     out << "FRAG IDS: " << std::endl;
1528 
1529     for (ulint frag_id = 0; frag_id < n; frag_id++) {
1530       out << "id=" << frag_id << ", addr=" << frag_id_to_addr(frag_id)
1531           << std::endl;
1532     }
1533 
1534     return (out);
1535   }
1536 
1537   /** Grow the frag directory by one entry.
1538   @return the fragment identifier that was newly added. */
1539   ulint alloc_dir_entry();
1540 
1541   /** Set the next page. */
set_page_nextz_frag_page_t1542   void set_page_next(page_no_t page_no) {
1543     mlog_write_ulint(frame() + FIL_PAGE_NEXT, page_no, MLOG_4BYTES, m_mtr);
1544   }
1545 
1546   /** Set the prev page. */
set_page_prevz_frag_page_t1547   void set_page_prev(page_no_t page_no) { set_page_prev(page_no, m_mtr); }
1548 
1549   /** Set the prev page. */
set_page_prevz_frag_page_t1550   void set_page_prev(page_no_t page_no, mtr_t *mtr) {
1551     mlog_write_ulint(frame() + FIL_PAGE_PREV, page_no, MLOG_4BYTES, mtr);
1552   }
1553 
1554   /** Get the next page number.
1555   @return next page number. */
get_next_page_noz_frag_page_t1556   page_no_t get_next_page_no() const { return (m_block->get_next_page_no()); }
1557 
1558   /** Get the prev page number (FIL_PAGE_PREV).
1559   @param[in]  mtr  the mini transaction latch context.
1560   @return prev page number. */
get_prev_page_noz_frag_page_t1561   page_no_t get_prev_page_no(mtr_t *mtr) const {
1562     return (mtr_read_ulint(frame() + FIL_PAGE_PREV, MLOG_4BYTES, mtr));
1563   }
1564 
1565   /** Get the prev page number.
1566   @return prev page number. */
get_prev_page_noz_frag_page_t1567   page_no_t get_prev_page_no() const { return (get_prev_page_no(m_mtr)); }
1568 
1569   /** Allocate the fragment page.
1570   @param[in]	first	first page of this LOB.
1571   @param[in]	hint	hint page number for allocation.
1572   @param[in]	bulk	true if bulk operation (OPCODE_INSERT_BULK)
1573                           false otherwise.
1574   @return the allocated buffer block. */
1575   buf_block_t *alloc(z_first_page_t &first, page_no_t hint, bool bulk);
1576 
1577   /** Free the fragment page along with its entry.
1578   @param[in]   first   first page of LOB.
1579   @param[in]   alloc_mtr  mini trx to perform this modification. */
1580   void dealloc_with_entry(z_first_page_t &first, mtr_t *alloc_mtr);
1581 
1582   /** Free the fragment page. */
deallocz_frag_page_t1583   void dealloc() {
1584     btr_page_free_low(m_index, m_block, ULINT_UNDEFINED, m_mtr);
1585     m_block = nullptr;
1586   }
1587 
load_xz_frag_page_t1588   buf_block_t *load_x(page_no_t page_no) {
1589     page_id_t page_id(dict_index_get_space(m_index), page_no);
1590     page_size_t page_size(dict_table_page_size(m_index->table));
1591     m_block = buf_page_get(page_id, page_size, RW_X_LATCH, m_mtr);
1592     return (m_block);
1593   }
1594 
merge_free_fragsz_frag_page_t1595   void merge_free_frags() {
1596     plist_base_node_t free_lst = free_list();
1597     frag_node_t frag(free_lst.get_first_node(), m_mtr);
1598     frag_node_t next = frag.get_next_frag();
1599 
1600     while (!next.is_null() && frag.merge(next)) {
1601       free_lst.remove(next.m_node);
1602       next = frag.get_next_frag();
1603     }
1604   }
1605 
merge_free_fragsz_frag_page_t1606   void merge_free_frags(frag_node_t &frag) {
1607     ut_ad(!frag.is_null());
1608     plist_base_node_t free_lst = free_list();
1609     frag_node_t next = frag.get_next_frag();
1610 
1611     while (!next.is_null() && frag.merge(next)) {
1612       free_lst.remove(next.m_node);
1613       next = frag.get_next_frag();
1614     }
1615   }
1616 
validate_listsz_frag_page_t1617   bool validate_lists() const {
1618     plist_base_node_t free_lst = free_list();
1619     plist_base_node_t frag_lst = frag_list();
1620     plist_node_t free_node = free_lst.get_first_node();
1621 
1622     while (!free_node.is_null()) {
1623       plist_node_t frag_node = frag_lst.get_first_node();
1624 
1625       while (!frag_node.is_null()) {
1626         ut_ad(frag_node.addr() != free_node.addr());
1627         frag_node = frag_node.get_next_node();
1628       }
1629 
1630       free_node = free_node.get_next_node();
1631     }
1632     return (true);
1633   }
1634 
insert_into_free_listz_frag_page_t1635   void insert_into_free_list(frag_node_t &frag) {
1636     ut_ad(frag.get_frag_id() == FRAG_ID_NULL);
1637 
1638     plist_base_node_t free_lst = free_list();
1639 
1640     plist_node_t node = free_lst.get_first_node();
1641     plist_node_t prev_node(m_mtr);
1642 
1643     while (!node.is_null()) {
1644       ut_ad(frag.addr() != node.addr());
1645       if (frag.addr() < node.addr()) {
1646         break;
1647       }
1648       prev_node = node;
1649       node = node.get_next_node();
1650     }
1651 
1652     free_lst.insert_before(node, frag.m_node);
1653 
1654     if (prev_node.is_null()) {
1655       merge_free_frags();
1656     } else {
1657       frag_node_t prev_frag(prev_node, m_mtr);
1658       merge_free_frags(prev_frag);
1659     }
1660   }
1661 
1662   /** Insert the given fragment node into the fragment list.
1663   @param[in,out]	frag	the fragment node to be inserted.*/
insert_into_frag_listz_frag_page_t1664   void insert_into_frag_list(frag_node_t &frag) {
1665     plist_base_node_t frag_lst = frag_list();
1666     plist_node_t node = frag_lst.get_first_node();
1667 
1668     while (!node.is_null()) {
1669       ut_ad(frag.addr() != node.addr());
1670       if (frag.addr() < node.addr()) {
1671         break;
1672       }
1673       node = node.get_next_node();
1674     }
1675 
1676     frag_lst.insert_before(node, frag.m_node);
1677   }
1678 
1679   /** Split one free fragment into two. This is not splitting a
1680   fragment page.  This is just splitting one free fragment into two.
1681   When we want to allocate one fragment, we identify a big enough free
1682   fragment and split it into two - one will be the allocated portion and
1683   other will become a free fragment.
1684   @param[in]  free_frag  the free fragment that will be split.
1685   @param[in]  size  the payload size in bytes. */
split_free_fragz_frag_page_t1686   void split_free_frag(frag_node_t &free_frag, ulint size) {
1687     ut_ad(size < free_frag.payload());
1688     const ulint old_total_len = free_frag.get_total_len();
1689     plist_base_node_t free_lst = free_list();
1690 
1691     /* Locate the next fragment */
1692     byte *p2 = free_frag.data_begin() + size;
1693 
1694     ulint remain =
1695         free_frag.get_total_len() - frag_node_t::header_size() - size;
1696 
1697     ut_a(remain >= frag_node_t::OFFSET_DATA);
1698 
1699     free_frag.set_total_len(frag_node_t::header_size() + size);
1700 
1701     frag_node_t frag2(frame(), p2, remain, m_mtr);
1702     frag2.set_total_len(remain);
1703     frag2.set_frag_id_null();
1704     free_lst.insert_after(free_frag.m_node, frag2.m_node);
1705 
1706     ut_a(free_frag.get_total_len() + frag2.get_total_len() == old_total_len);
1707 
1708     ut_ad(validate_lists());
1709   }
1710 
get_frag_nodez_frag_page_t1711   frag_node_t get_frag_node(frag_id_t id) const {
1712     ut_ad(id != FRAG_ID_NULL);
1713 
1714     paddr_t off = frag_id_to_addr(id);
1715     byte *f = frame();
1716     return (frag_node_t(f, f + off));
1717   }
1718 
dealloc_fragmentz_frag_page_t1719   void dealloc_fragment(ulint frag_id) {
1720     ut_ad(frag_id != FRAG_ID_NULL);
1721 
1722     paddr_t off = frag_id_to_addr(frag_id);
1723     byte *f = frame();
1724     frag_node_t frag(f, f + off, m_mtr);
1725     dealloc_fragment(frag);
1726     dealloc_frag_id(frag_id);
1727 
1728     /* Update the index entry. */
1729     update_frag_entry();
1730   }
1731 
1732   /** Allocate a fragment with the given payload.
1733   @param[in]  size  the payload size.
1734   @param[in]  entry the index entry of the given frag page.
1735   @return the frag_id of the allocated fragment.
1736   @return FRAG_ID_NULL if fragment could not be allocated. */
1737   frag_id_t alloc_fragment(ulint size, z_frag_entry_t &entry);
1738 
free_listz_frag_page_t1739   plist_base_node_t free_list() const {
1740     byte *f = frame();
1741     return (plist_base_node_t(f, f + OFFSET_FREE_LIST, m_mtr));
1742   }
1743 
frag_listz_frag_page_t1744   plist_base_node_t frag_list() const {
1745     byte *f = frame();
1746     return (plist_base_node_t(f, f + OFFSET_FRAGS_LIST, m_mtr));
1747   }
1748 
set_page_typez_frag_page_t1749   void set_page_type() {
1750     byte *ptr = frame() + FIL_PAGE_TYPE;
1751     mlog_write_ulint(ptr, FIL_PAGE_TYPE_ZLOB_FRAG, MLOG_2BYTES, m_mtr);
1752   }
1753 
get_page_typez_frag_page_t1754   page_type_t get_page_type() const {
1755     return (mach_read_from_2(frame() + FIL_PAGE_TYPE));
1756   }
1757 
get_page_type_strz_frag_page_t1758   const char *get_page_type_str() const {
1759     page_type_t type = get_page_type();
1760     ut_a(type == FIL_PAGE_TYPE_ZLOB_FRAG);
1761     return ("FIL_PAGE_TYPE_ZLOB_FRAG");
1762   }
1763 
1764   /** The maximum free space available in a fragment page. Adjustment
1765   needs to be done with the frag_node_t::overhead().*/
payloadz_frag_page_t1766   ulint payload() { return (z_frag_page_t::max_payload(m_index)); }
1767 
1768   /** The maximum free space available in a fragment page. Adjustment
1769   needs to be done with the frag_node_t::overhead().*/
max_payloadz_frag_page_t1770   static ulint max_payload(dict_index_t *index) {
1771     page_size_t page_size(dict_table_page_size(index->table));
1772     return (page_size.physical() - OFFSET_FRAGS_BEGIN -
1773             OFFSET_PAGE_DIR_ENTRY_COUNT);
1774   }
1775 
1776   /** Determine if the given length of data can fit into a fragment
1777   page.
1778   @param[in]   index   the clust index into which LOB is inserted.
1779   @param[in]   data_size  The length of data to operate.
1780   @return true if data can fit into fragment page, false otherwise. */
1781   static bool can_data_fit(dict_index_t *index, ulint data_size);
1782 
1783   /** Get the frag page number. */
get_page_noz_frag_page_t1784   page_no_t get_page_no() const { return (m_block->get_page_no()); }
1785 
framez_frag_page_t1786   byte *frame() const { return (buf_block_get_frame(m_block)); }
1787 
printz_frag_page_t1788   std::ostream &print(std::ostream &out) const {
1789     print_free_list(out);
1790     print_frag_list(out);
1791     print_frags_in_order(out);
1792     print_page_dir(out);
1793     return (out);
1794   }
1795 
1796   /** Get the total amount of stored data in this page. */
1797   ulint get_total_stored_data() const;
1798 
1799   /** Get the total cumulative free space in this page. */
1800   ulint get_total_free_len() const;
1801 
1802   /** Get the big free space in this page. */
1803   ulint get_big_free_len() const;
1804 
1805   /** Get the number of fragments in this frag page. */
get_n_fragsz_frag_page_t1806   ulint get_n_frags() const {
1807     plist_base_node_t frag_lst = frag_list();
1808     return (frag_lst.get_len());
1809   }
1810 
1811   std::ostream &print_frags_in_order(std::ostream &out) const;
1812 
print_free_listz_frag_page_t1813   std::ostream &print_free_list(std::ostream &out) const {
1814     if (m_block == nullptr) {
1815       return (out);
1816     }
1817 
1818     plist_base_node_t free_lst = free_list();
1819     out << "[Free List: " << free_lst << "]" << std::endl;
1820 
1821     for (plist_node_t cur = free_lst.get_first_node(); !cur.is_null();
1822          cur = cur.get_next_node()) {
1823       frag_node_t frag(cur, m_mtr);
1824       out << frag << std::endl;
1825     }
1826     return (out);
1827   }
1828 
print_frag_listz_frag_page_t1829   std::ostream &print_frag_list(std::ostream &out) const {
1830     if (m_block == nullptr) {
1831       return (out);
1832     }
1833 
1834     plist_base_node_t frag_lst = frag_list();
1835     out << "[Frag List: " << frag_lst << "]" << std::endl;
1836 
1837     for (plist_node_t cur = frag_lst.get_first_node(); !cur.is_null();
1838          cur = cur.get_next_node()) {
1839       frag_node_t frag(cur, m_mtr);
1840       out << frag << std::endl;
1841     }
1842     return (out);
1843   }
1844 
print_page_dirz_frag_page_t1845   std::ostream &print_page_dir(std::ostream &out) const {
1846     if (m_block == nullptr) {
1847       return (out);
1848     }
1849 
1850     ulint n = get_n_dir_entries();
1851 
1852     for (ulint frag_id = 0; frag_id < n; ++frag_id) {
1853       paddr_t off = frag_id_to_addr(frag_id);
1854       out << "[frag_id=" << frag_id << ", addr=" << off << "]" << std::endl;
1855     }
1856 
1857     return (out);
1858   }
1859 
set_mtrz_frag_page_t1860   void set_mtr(mtr_t *mtr) { m_mtr = mtr; }
1861 
set_indexz_frag_page_t1862   void set_index(dict_index_t *index) { m_index = index; }
1863 
set_block_nullz_frag_page_t1864   void set_block_null() { m_block = nullptr; }
1865 
1866   /** Determine if the given fragment node is the last fragment
1867   node adjacent to the directory.
1868   @return true if it is last fragment node, false otherwise. */
is_last_fragz_frag_page_t1869   bool is_last_frag(const frag_node_t &node) const {
1870     return (node.end_ptr() == slots_end_ptr());
1871   }
1872 
1873  private:
get_frag_entry_addrz_frag_page_t1874   fil_addr_t get_frag_entry_addr() const {
1875     return (flst_read_addr(frame() + OFFSET_FRAG_ENTRY, m_mtr));
1876   }
1877 
dealloc_fragmentz_frag_page_t1878   void dealloc_fragment(frag_node_t &frag) {
1879     plist_base_node_t frag_lst = frag_list();
1880     frag_lst.remove(frag.m_node);
1881     frag.set_frag_id_null();
1882     insert_into_free_list(frag);
1883   }
1884 
1885   /** Deallocate all the free slots from the end of the page
1886   directory. */
1887   void dealloc_frag_id();
1888 
1889   /** Deallocate the given fragment id.
1890   @param[in] frag_id The fragment that needs to be deallocated. */
dealloc_frag_idz_frag_page_t1891   void dealloc_frag_id(ulint frag_id) {
1892     set_nth_dir_entry(frag_id, 0);
1893     dealloc_frag_id();
1894   }
1895 
1896   buf_block_t *m_block;
1897   mtr_t *m_mtr;
1898   dict_index_t *m_index;
1899 };
1900 
1901 /** Insert one chunk of input.  The maximum size of a chunk is Z_CHUNK_SIZE.
1902 @param[in]  index      clustered index in which LOB is inserted.
1903 @param[in]  first      the first page of the LOB.
1904 @param[in]  trx        transaction doing the insertion.
1905 @param[in]  ref        LOB reference in the clust rec.
1906 @param[in]  blob       the uncompressed LOB to be inserted.
1907 @param[in]  len        length of the blob.
1908 @param[out] out_entry  the newly inserted index entry. can be NULL.
1909 @param[in]  mtr        the mini transaction
1910 @param[in]  bulk       true if it is bulk operation, false otherwise.
1911 @return DB_SUCCESS on success, error code on failure. */
1912 dberr_t z_insert_chunk(dict_index_t *index, z_first_page_t &first, trx_t *trx,
1913                        ref_t ref, byte *blob, ulint len,
1914                        z_index_entry_t *out_entry, mtr_t *mtr, bool bulk);
1915 
1916 }  // namespace lob
1917 
1918 #endif  // lob0impl_h
1919