1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /*======
main()4 This file is part of PerconaFT.
5 
6 
7 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8 
9     PerconaFT is free software: you can redistribute it and/or modify
10     it under the terms of the GNU General Public License, version 2,
11     as published by the Free Software Foundation.
12 
13     PerconaFT is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17 
18     You should have received a copy of the GNU General Public License
19     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
20 
21 ----------------------------------------
22 
23     PerconaFT is free software: you can redistribute it and/or modify
24     it under the terms of the GNU Affero General Public License, version 3,
25     as published by the Free Software Foundation.
26 
27     PerconaFT is distributed in the hope that it will be useful,
28     but WITHOUT ANY WARRANTY; without even the implied warranty of
29     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30     GNU Affero General Public License for more details.
31 
32     You should have received a copy of the GNU Affero General Public License
33     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
34 ======= */
35 
36 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37 
38 #include <my_global.h>
39 #include "ft/ft.h"
40 #include "ft/ft-internal.h"
41 #include "ft/serialize/ft_node-serialize.h"
42 #include "ft/node.h"
43 #include "ft/serialize/rbuf.h"
44 #include "ft/serialize/wbuf.h"
45 #include "util/scoped_malloc.h"
46 #include "util/sort.h"
47 
48 // Effect: Fill in N as an empty ftnode.
49 // TODO: Rename toku_ftnode_create
50 void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
51     paranoid_invariant(layout_version != 0);
52     paranoid_invariant(height >= 0);
53 
54     n->max_msn_applied_to_node_on_disk = ZERO_MSN;    // correct value for root node, harmless for others
55     n->flags = flags;
56     n->blocknum = blocknum;
57     n->layout_version               = layout_version;
58     n->layout_version_original = layout_version;
59     n->layout_version_read_from_disk = layout_version;
60     n->height = height;
61     n->pivotkeys.create_empty();
62     n->bp = 0;
63     n->n_children = num_children;
64     n->oldest_referenced_xid_known = TXNID_NONE;
65 
66     if (num_children > 0) {
67         XMALLOC_N(num_children, n->bp);
68         for (int i = 0; i < num_children; i++) {
69             BP_BLOCKNUM(n,i).b=0;
70             BP_STATE(n,i) = PT_INVALID;
71             BP_WORKDONE(n,i) = 0;
72             BP_INIT_TOUCHED_CLOCK(n, i);
73             set_BNULL(n,i);
74             if (height > 0) {
75                 set_BNC(n, i, toku_create_empty_nl());
76             } else {
77                 set_BLB(n, i, toku_create_empty_bn());
78             }
79         }
80     }
81     n->set_dirty();  // special case exception, it's okay to mark as dirty because the basements are empty
82 
83     toku_ft_status_note_ftnode(height, true);
84 }
85 
86 // destroys the internals of the ftnode, but it does not free the values
87 // that are stored
88 // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
89 // MUST NOT do anything besides free the structures that have been allocated
90 void toku_destroy_ftnode_internals(FTNODE node) {
91     node->pivotkeys.destroy();
92     for (int i = 0; i < node->n_children; i++) {
93         if (BP_STATE(node,i) == PT_AVAIL) {
94             if (node->height > 0) {
95                 destroy_nonleaf_childinfo(BNC(node,i));
96             } else {
97                 paranoid_invariant(BLB_LRD(node, i) == 0);
98                 destroy_basement_node(BLB(node, i));
99             }
100         } else if (BP_STATE(node,i) == PT_COMPRESSED) {
101             SUB_BLOCK sb = BSB(node,i);
102             toku_free(sb->compressed_ptr);
103             toku_free(sb);
104         } else {
105             paranoid_invariant(is_BNULL(node, i));
106         }
107         set_BNULL(node, i);
108     }
109     toku_free(node->bp);
110     node->bp = NULL;
111 }
112 
113 /* Frees a node, including all the stuff in the hash table. */
114 void toku_ftnode_free(FTNODE *nodep) {
115     FTNODE node = *nodep;
116     toku_ft_status_note_ftnode(node->height, false);
117     toku_destroy_ftnode_internals(node);
118     toku_free(node);
119     *nodep = nullptr;
120 }
121 
122 void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) {
123     STAT64INFO_S deltas = ZEROSTATS;
124     // capture deltas before rebalancing basements for serialization
125     deltas = toku_get_and_clear_basement_stats(ftnode);
126     // locking not necessary here with respect to checkpointing
127     // in Clayface (because of the pending lock and cachetable lock
128     // in toku_cachetable_begin_checkpoint)
129     // essentially, if we are dealing with a for_checkpoint
130     // parameter in a function that is called by the flush_callback,
131     // then the cachetable needs to ensure that this is called in a safe
132     // manner that does not interfere with the beginning
133     // of a checkpoint, which it does with the cachetable lock
134     // and pending lock
135     toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
136     if (for_checkpoint) {
137         toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
138     }
139 }
140 
141 void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
142     for (int i = 0; i < node->n_children; i++) {
143         BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
144         paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
145         BP_STATE(cloned_node,i) = PT_AVAIL;
146         BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
147         if (node->height == 0) {
148             set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
149         } else {
150             set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
151         }
152     }
153 }
154 
155 void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) {
156     // free the basement node
157     assert(!node->dirty());
158     BASEMENTNODE bn = BLB(node, childnum);
159     toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
160     toku_ft_adjust_logical_row_count(ft, -BLB_LRD(node, childnum));
161     BLB_LRD(node, childnum) = 0;
162     destroy_basement_node(bn);
163     set_BNULL(node, childnum);
164     BP_STATE(node, childnum) = PT_ON_DISK;
165 }
166 
167 BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
168     assert(BP_STATE(node, childnum) == PT_AVAIL);
169     BASEMENTNODE bn = BLB(node, childnum);
170     set_BNULL(node, childnum);
171     BP_STATE(node, childnum) = PT_ON_DISK;
172     return bn;
173 }
174 
175 //
176 // Orthopush
177 //
178 
179 struct store_msg_buffer_offset_extra {
180     int32_t *offsets;
181     int i;
182 };
183 
184 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
185 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
186 {
187     extra->offsets[extra->i] = offset;
188     extra->i++;
189     return 0;
190 }
191 
192 /**
193  * Given pointers to offsets within a message buffer where we can find messages,
194  * figure out the MSN of each message, and compare those MSNs.  Returns 1,
195  * 0, or -1 if a is larger than, equal to, or smaller than b.
196  */
197 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
198 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
199 {
200     MSN amsn, bmsn;
201     msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
202     msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
203     if (amsn.msn > bmsn.msn) {
204         return +1;
205     }
206     if (amsn.msn < bmsn.msn) {
207         return -1;
208     }
209     return 0;
210 }
211 
212 /**
213  * Given a message buffer and and offset, apply the message with
214  * toku_ft_bn_apply_msg, or discard it,
215  * based on its MSN and the MSN of the basement node.
216  */
217 static void do_bn_apply_msg(
218     FT_HANDLE ft_handle,
219     BASEMENTNODE bn,
220     message_buffer* msg_buffer,
221     int32_t offset,
222     txn_gc_info* gc_info,
223     uint64_t* workdone,
224     STAT64INFO stats_to_update,
225     int64_t* logical_rows_delta) {
226 
227     DBT k, v;
228     ft_msg msg = msg_buffer->get_message(offset, &k, &v);
229 
230     // The messages are being iterated over in (key,msn) order or just in
231     // msn order, so all the messages for one key, from one buffer, are in
232     // ascending msn order.  So it's ok that we don't update the basement
233     // node's msn until the end.
234     if (msg.msn().msn > bn->max_msn_applied.msn) {
235         toku_ft_bn_apply_msg(
236             ft_handle->ft->cmp,
237             ft_handle->ft->update_fun,
238             bn,
239             msg,
240             gc_info,
241             workdone,
242             stats_to_update,
243             logical_rows_delta);
244     } else {
245         toku_ft_status_note_msn_discard();
246     }
247 
248     // We must always mark message as stale since it has been marked
249     // (using omt::iterate_and_mark_range)
250     // It is possible to call do_bn_apply_msg even when it won't apply the
251     // message because the node containing it could have been evicted and
252     // brought back in.
253     msg_buffer->set_freshness(offset, false);
254 }
255 
256 
257 struct iterate_do_bn_apply_msg_extra {
258     FT_HANDLE t;
259     BASEMENTNODE bn;
260     NONLEAF_CHILDINFO bnc;
261     txn_gc_info *gc_info;
262     uint64_t *workdone;
263     STAT64INFO stats_to_update;
264     int64_t *logical_rows_delta;
265 };
266 
267 int iterate_do_bn_apply_msg(
268     const int32_t &offset,
269     const uint32_t UU(idx),
270     struct iterate_do_bn_apply_msg_extra* const e)
271     __attribute__((nonnull(3)));
272 
273 int iterate_do_bn_apply_msg(
274     const int32_t &offset,
275     const uint32_t UU(idx),
276     struct iterate_do_bn_apply_msg_extra* const e)
277 {
278     do_bn_apply_msg(
279         e->t,
280         e->bn,
281         &e->bnc->msg_buffer,
282         offset,
283         e->gc_info,
284         e->workdone,
285         e->stats_to_update,
286         e->logical_rows_delta);
287     return 0;
288 }
289 
290 /**
291  * Given the bounds of the basement node to which we will apply messages,
292  * find the indexes within message_tree which contain the range of
293  * relevant messages.
294  *
295  * The message tree contains offsets into the buffer, where messages are
296  * found.  The pivot_bounds are the lower bound exclusive and upper bound
297  * inclusive, because they come from pivot keys in the tree.  We want OMT
298  * indices, which must have the lower bound be inclusive and the upper
299  * bound exclusive.  We will get these by telling omt::find to look
300  * for something strictly bigger than each of our pivot bounds.
301  *
302  * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
303  * bound exclusive).
304  */
305 template<typename find_bounds_omt_t>
306 static void
307 find_bounds_within_message_tree(
308     const toku::comparator &cmp,
309     const find_bounds_omt_t &message_tree,      /// tree holding message buffer offsets, in which we want to look for indices
310     message_buffer *msg_buffer,           /// message buffer in which messages are found
311     const pivot_bounds &bounds,  /// key bounds within the basement node we're applying messages to
312     uint32_t *lbi,        /// (output) "lower bound inclusive" (index into message_tree)
313     uint32_t *ube         /// (output) "upper bound exclusive" (index into message_tree)
314     )
315 {
316     int r = 0;
317 
318     if (!toku_dbt_is_empty(bounds.lbe())) {
319         // By setting msn to MAX_MSN and by using direction of +1, we will
320         // get the first message greater than (in (key, msn) order) any
321         // message (with any msn) with the key lower_bound_exclusive.
322         // This will be a message we want to try applying, so it is the
323         // "lower bound inclusive" within the message_tree.
324         struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
325         int32_t found_lb;
326         r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
327         if (r == DB_NOTFOUND) {
328             // There is no relevant data (the lower bound is bigger than
329             // any message in this tree), so we have no range and we're
330             // done.
331             *lbi = 0;
332             *ube = 0;
333             return;
334         }
335         if (!toku_dbt_is_empty(bounds.ubi())) {
336             // Check if what we found for lbi is greater than the upper
337             // bound inclusive that we have.  If so, there are no relevant
338             // messages between these bounds.
339             const DBT *ubi = bounds.ubi();
340             const int32_t offset = found_lb;
341             DBT found_lbidbt;
342             msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
343             int c = cmp(&found_lbidbt, ubi);
344             // These DBTs really are both inclusive bounds, so we need
345             // strict inequality in order to determine that there's
346             // nothing between them.  If they're equal, then we actually
347             // need to apply the message pointed to by lbi, and also
348             // anything with the same key but a bigger msn.
349             if (c > 0) {
350                 *lbi = 0;
351                 *ube = 0;
352                 return;
353             }
354         }
355     } else {
356         // No lower bound given, it's negative infinity, so we start at
357         // the first message in the OMT.
358         *lbi = 0;
359     }
360     if (!toku_dbt_is_empty(bounds.ubi())) {
361         // Again, we use an msn of MAX_MSN and a direction of +1 to get
362         // the first thing bigger than the upper_bound_inclusive key.
363         // This is therefore the smallest thing we don't want to apply,
364         // and omt::iterate_on_range will not examine it.
365         struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
366         r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
367         if (r == DB_NOTFOUND) {
368             // Couldn't find anything in the buffer bigger than our key,
369             // so we need to look at everything up to the end of
370             // message_tree.
371             *ube = message_tree.size();
372         }
373     } else {
374         // No upper bound given, it's positive infinity, so we need to go
375         // through the end of the OMT.
376         *ube = message_tree.size();
377     }
378 }
379 
380 // For each message in the ancestor's buffer (determined by childnum) that
381 // is key-wise between lower_bound_exclusive and upper_bound_inclusive,
382 // apply the message to the basement node.  We treat the bounds as minus
383 // or plus infinity respectively if they are NULL.  Do not mark the node
384 // as dirty (preserve previous state of 'dirty' bit).
385 static void bnc_apply_messages_to_basement_node(
386     FT_HANDLE t,      // used for comparison function
387     BASEMENTNODE bn,  // where to apply messages
388     FTNODE ancestor,  // the ancestor node where we can find messages to apply
389     int childnum,  // which child buffer of ancestor contains messages we want
390     const pivot_bounds &
391         bounds,  // contains pivot key bounds of this basement node
392     txn_gc_info *gc_info,
393     bool *msgs_applied) {
394     int r;
395     NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
396 
397     // Determine the offsets in the message trees between which we need to
398     // apply messages from this buffer
399     STAT64INFO_S stats_delta = {0, 0};
400     uint64_t workdone_this_ancestor = 0;
401     int64_t logical_rows_delta = 0;
402 
403     uint32_t stale_lbi, stale_ube;
404     if (!bn->stale_ancestor_messages_applied) {
405         find_bounds_within_message_tree(t->ft->cmp,
406                                         bnc->stale_message_tree,
407                                         &bnc->msg_buffer,
408                                         bounds,
409                                         &stale_lbi,
410                                         &stale_ube);
411     } else {
412         stale_lbi = 0;
413         stale_ube = 0;
414     }
415     uint32_t fresh_lbi, fresh_ube;
416     find_bounds_within_message_tree(t->ft->cmp,
417                                     bnc->fresh_message_tree,
418                                     &bnc->msg_buffer,
419                                     bounds,
420                                     &fresh_lbi,
421                                     &fresh_ube);
422 
423     // We now know where all the messages we must apply are, so one of the
424     // following 4 cases will do the application, depending on which of
425     // the lists contains relevant messages:
426     //
427     // 1. broadcast messages and anything else, or a mix of fresh and stale
428     // 2. only fresh messages
429     // 3. only stale messages
430     if (bnc->broadcast_list.size() > 0 ||
431         (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
432         // We have messages in multiple trees, so we grab all
433         // the relevant messages' offsets and sort them by MSN, then apply
434         // them in MSN order.
435         const int buffer_size =
436             ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
437              bnc->broadcast_list.size());
438         toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
439         int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
440         struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
441                                                           .i = 0};
442 
443         // Populate offsets array with offsets to stale messages
444         r = bnc->stale_message_tree
445                 .iterate_on_range<struct store_msg_buffer_offset_extra,
446                                   store_msg_buffer_offset>(
447                     stale_lbi, stale_ube, &sfo_extra);
448         assert_zero(r);
449 
450         // Then store fresh offsets, and mark them to be moved to stale later.
451         r = bnc->fresh_message_tree
452                 .iterate_and_mark_range<struct store_msg_buffer_offset_extra,
453                                         store_msg_buffer_offset>(
454                     fresh_lbi, fresh_ube, &sfo_extra);
455         assert_zero(r);
456 
457         // Store offsets of all broadcast messages.
458         r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
459                                         store_msg_buffer_offset>(&sfo_extra);
460         assert_zero(r);
461         invariant(sfo_extra.i == buffer_size);
462 
463         // Sort by MSN.
464         toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
465             mergesort_r(offsets, buffer_size, bnc->msg_buffer);
466 
467         // Apply the messages in MSN order.
468         for (int i = 0; i < buffer_size; ++i) {
469             *msgs_applied = true;
470             do_bn_apply_msg(t,
471                             bn,
472                             &bnc->msg_buffer,
473                             offsets[i],
474                             gc_info,
475                             &workdone_this_ancestor,
476                             &stats_delta,
477                             &logical_rows_delta);
478         }
479     } else if (stale_lbi == stale_ube) {
480         // No stale messages to apply, we just apply fresh messages, and mark
481         // them to be moved to stale later.
482         struct iterate_do_bn_apply_msg_extra iter_extra = {
483             .t = t,
484             .bn = bn,
485             .bnc = bnc,
486             .gc_info = gc_info,
487             .workdone = &workdone_this_ancestor,
488             .stats_to_update = &stats_delta,
489             .logical_rows_delta = &logical_rows_delta};
490         if (fresh_ube - fresh_lbi > 0)
491             *msgs_applied = true;
492         r = bnc->fresh_message_tree
493                 .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
494                                         iterate_do_bn_apply_msg>(
495                     fresh_lbi, fresh_ube, &iter_extra);
496         assert_zero(r);
497     } else {
498         invariant(fresh_lbi == fresh_ube);
499         // No fresh messages to apply, we just apply stale messages.
500 
501         if (stale_ube - stale_lbi > 0)
502             *msgs_applied = true;
503         struct iterate_do_bn_apply_msg_extra iter_extra = {
504             .t = t,
505             .bn = bn,
506             .bnc = bnc,
507             .gc_info = gc_info,
508             .workdone = &workdone_this_ancestor,
509             .stats_to_update = &stats_delta,
510             .logical_rows_delta = &logical_rows_delta};
511 
512         r = bnc->stale_message_tree
513                 .iterate_on_range<struct iterate_do_bn_apply_msg_extra,
514                                   iterate_do_bn_apply_msg>(
515                     stale_lbi, stale_ube, &iter_extra);
516         assert_zero(r);
517     }
518     //
519     // update stats
520     //
521     if (workdone_this_ancestor > 0) {
522         (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
523                                       workdone_this_ancestor);
524     }
525     if (stats_delta.numbytes || stats_delta.numrows) {
526         toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
527     }
528     toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
529     bn->logical_rows_delta += logical_rows_delta;
530 }
531 
532 static void
533 apply_ancestors_messages_to_bn(
534     FT_HANDLE t,
535     FTNODE node,
536     int childnum,
537     ANCESTORS ancestors,
538     const pivot_bounds &bounds,
539     txn_gc_info *gc_info,
540     bool* msgs_applied
541     )
542 {
543     BASEMENTNODE curr_bn = BLB(node, childnum);
544     const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
545     for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
546         if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
547             paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
548             bnc_apply_messages_to_basement_node(
549                 t,
550                 curr_bn,
551                 curr_ancestors->node,
552                 curr_ancestors->childnum,
553                 curr_bounds,
554                 gc_info,
555                 msgs_applied
556                 );
557             // We don't want to check this ancestor node again if the
558             // next time we query it, the msn hasn't changed.
559             curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
560         }
561     }
562     // At this point, we know all the stale messages above this
563     // basement node have been applied, and any new messages will be
564     // fresh, so we don't need to look at stale messages for this
565     // basement node, unless it gets evicted (and this field becomes
566     // false when it's read in again).
567     curr_bn->stale_ancestor_messages_applied = true;
568 }
569 
570 void
571 toku_apply_ancestors_messages_to_node (
572     FT_HANDLE t,
573     FTNODE node,
574     ANCESTORS ancestors,
575     const pivot_bounds &bounds,
576     bool* msgs_applied,
577     int child_to_read
578     )
579 // Effect:
580 //   Bring a leaf node up-to-date according to all the messages in the ancestors.
581 //   If the leaf node is already up-to-date then do nothing.
582 //   If the leaf node is not already up-to-date, then record the work done
583 //   for that leaf in each ancestor.
584 // Requires:
585 //   This is being called when pinning a leaf node for the query path.
586 //   The entire root-to-leaf path is pinned and appears in the ancestors list.
587 {
588     VERIFY_NODE(t, node);
589     paranoid_invariant(node->height == 0);
590 
591     TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
592     txn_manager_state txn_state_for_gc(txn_manager);
593 
594     TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
595     txn_gc_info gc_info(&txn_state_for_gc,
596                         oldest_referenced_xid_for_simple_gc,
597                         node->oldest_referenced_xid_known,
598                         true);
599     if (!node->dirty() && child_to_read >= 0) {
600         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
601         apply_ancestors_messages_to_bn(
602             t,
603             node,
604             child_to_read,
605             ancestors,
606             bounds,
607             &gc_info,
608             msgs_applied
609             );
610     }
611     else {
612         // know we are a leaf node
613         // An important invariant:
614         // We MUST bring every available basement node for a dirty node up to date.
615         // flushing on the cleaner thread depends on this. This invariant
616         // allows the cleaner thread to just pick an internal node and flush it
617         // as opposed to being forced to start from the root.
618         for (int i = 0; i < node->n_children; i++) {
619             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
620             apply_ancestors_messages_to_bn(
621                 t,
622                 node,
623                 i,
624                 ancestors,
625                 bounds,
626                 &gc_info,
627                 msgs_applied
628                 );
629         }
630     }
631     VERIFY_NODE(t, node);
632 }
633 
634 static bool bn_needs_ancestors_messages(
635     FT ft,
636     FTNODE node,
637     int childnum,
638     const pivot_bounds &bounds,
639     ANCESTORS ancestors,
640     MSN* max_msn_applied
641     )
642 {
643     BASEMENTNODE bn = BLB(node, childnum);
644     const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
645     bool needs_ancestors_messages = false;
646     for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
647         if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
648             paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
649             NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
650             if (bnc->broadcast_list.size() > 0) {
651                 needs_ancestors_messages = true;
652                 goto cleanup;
653             }
654             if (!bn->stale_ancestor_messages_applied) {
655                 uint32_t stale_lbi, stale_ube;
656                 find_bounds_within_message_tree(ft->cmp,
657                                                 bnc->stale_message_tree,
658                                                 &bnc->msg_buffer,
659                                                 curr_bounds,
660                                                 &stale_lbi,
661                                                 &stale_ube);
662                 if (stale_lbi < stale_ube) {
663                     needs_ancestors_messages = true;
664                     goto cleanup;
665                 }
666             }
667             uint32_t fresh_lbi, fresh_ube;
668             find_bounds_within_message_tree(ft->cmp,
669                                             bnc->fresh_message_tree,
670                                             &bnc->msg_buffer,
671                                             curr_bounds,
672                                             &fresh_lbi,
673                                             &fresh_ube);
674             if (fresh_lbi < fresh_ube) {
675                 needs_ancestors_messages = true;
676                 goto cleanup;
677             }
678             if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
679                 max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
680             }
681         }
682     }
683 cleanup:
684     return needs_ancestors_messages;
685 }
686 
687 bool toku_ft_leaf_needs_ancestors_messages(
688     FT ft,
689     FTNODE node,
690     ANCESTORS ancestors,
691     const pivot_bounds &bounds,
692     MSN *const max_msn_in_path,
693     int child_to_read
694     )
695 // Effect: Determine whether there are messages in a node's ancestors
696 //  which must be applied to it.  These messages are in the correct
697 //  keyrange for any available basement nodes, and are in nodes with the
698 //  correct max_msn_applied_to_node_on_disk.
699 // Notes:
700 //  This is an approximate query.
701 // Output:
702 //  max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
703 //    ancestors.  This is used later to update basement nodes'
704 //    max_msn_applied values in case we don't do the full algorithm.
705 // Returns:
706 //  true if there may be some such messages
707 //  false only if there are definitely no such messages
708 // Rationale:
709 //  When we pin a node with a read lock, we want to quickly determine if
710 //  we should exchange it for a write lock in preparation for applying
711 //  messages.  If there are no messages, we don't need the write lock.
712 {
713     paranoid_invariant(node->height == 0);
714     bool needs_ancestors_messages = false;
715     // child_to_read may be -1 in test cases
716     if (!node->dirty() && child_to_read >= 0) {
717         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
718         needs_ancestors_messages = bn_needs_ancestors_messages(
719             ft,
720             node,
721             child_to_read,
722             bounds,
723             ancestors,
724             max_msn_in_path
725             );
726     }
727     else {
728         for (int i = 0; i < node->n_children; ++i) {
729             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
730             needs_ancestors_messages = bn_needs_ancestors_messages(
731                 ft,
732                 node,
733                 i,
734                 bounds,
735                 ancestors,
736                 max_msn_in_path
737                 );
738             if (needs_ancestors_messages) {
739                 goto cleanup;
740             }
741         }
742     }
743 cleanup:
744     return needs_ancestors_messages;
745 }
746 
747 void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
748     invariant(node->height == 0);
749     if (!node->dirty() && child_to_read >= 0) {
750         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
751         BASEMENTNODE bn = BLB(node, child_to_read);
752         if (max_msn_applied.msn > bn->max_msn_applied.msn) {
753             // see comment below
754             (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
755         }
756     }
757     else {
758         for (int i = 0; i < node->n_children; ++i) {
759             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
760             BASEMENTNODE bn = BLB(node, i);
761             if (max_msn_applied.msn > bn->max_msn_applied.msn) {
762                 // This function runs in a shared access context, so to silence tools
763                 // like DRD, we use a CAS and ignore the result.
764                 // Any threads trying to update these basement nodes should be
765                 // updating them to the same thing (since they all have a read lock on
766                 // the same root-to-leaf path) so this is safe.
767                 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
768             }
769         }
770     }
771 }
772 
773 struct copy_to_stale_extra {
774     FT ft;
775     NONLEAF_CHILDINFO bnc;
776 };
777 
778 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
779 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
780 {
781     MSN msn;
782     DBT key;
783     extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
784     struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra(extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn);
785     int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
786     invariant_zero(r);
787     return 0;
788 }
789 
790 void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) {
791     struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
792     int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
793     invariant_zero(r);
794     bnc->fresh_message_tree.delete_all_marked();
795 }
796 
797 void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
798     invariant(node->height > 0);
799     for (int i = 0; i < node->n_children; ++i) {
800         if (BP_STATE(node, i) != PT_AVAIL) {
801             continue;
802         }
803         NONLEAF_CHILDINFO bnc = BNC(node, i);
804         // We can't delete things out of the fresh tree inside the above
805         // procedures because we're still looking at the fresh tree.  Instead
806         // we have to move messages after we're done looking at it.
807         toku_ft_bnc_move_messages_to_stale(ft, bnc);
808     }
809 }
810 
811 //
812 // Balance // Availibility // Size
813 
814 struct rebalance_array_info {
815     uint32_t offset;
816     LEAFENTRY *le_array;
817     uint32_t *key_sizes_array;
818     const void **key_ptr_array;
819     static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le,
820            const uint32_t idx, struct rebalance_array_info *const ai) {
821         ai->le_array[idx+ai->offset] = le;
822         ai->key_sizes_array[idx+ai->offset] = keylen;
823         ai->key_ptr_array[idx+ai->offset] = key;
824         return 0;
825     }
826 };
827 
828 // There must still be at least one child
829 // Requires that all messages in buffers above have been applied.
830 // Because all messages above have been applied, setting msn of all new basements
831 // to max msn of existing basements is correct.  (There cannot be any messages in
832 // buffers above that still need to be applied.)
833 void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) {
834 
835     assert(node->height == 0);
836     assert(node->dirty());
837 
838     uint32_t num_orig_basements = node->n_children;
839     // Count number of leaf entries in this leaf (num_le).
840     uint32_t num_le = 0;
841     for (uint32_t i = 0; i < num_orig_basements; i++) {
842         num_le += BLB_DATA(node, i)->num_klpairs();
843     }
844 
845     uint32_t num_alloc = num_le ? num_le : 1;  // simplify logic below by always having at least one entry per array
846 
847     // Create an array of OMTVALUE's that store all the pointers to all the data.
848     // Each element in leafpointers is a pointer to a leaf.
849     toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc);
850     LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get());
851     leafpointers[0] = NULL;
852 
853     toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc);
854     const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get());
855     key_pointers[0] = NULL;
856 
857     toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc);
858     uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get());
859 
860     // Capture pointers to old mempools' buffers (so they can be destroyed)
861     toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements);
862     BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get());
863     old_bns[0] = NULL;
864 
865     uint32_t curr_le = 0;
866     for (uint32_t i = 0; i < num_orig_basements; i++) {
867         bn_data* bd = BLB_DATA(node, i);
868         struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers };
869         bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai);
870         curr_le += bd->num_klpairs();
871     }
872 
873     // Create an array that will store indexes of new pivots.
874     // Each element in new_pivots is the index of a pivot key.
875     // (Allocating num_le of them is overkill, but num_le is an upper bound.)
876     toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc);
877     uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get());
878     new_pivots[0] = 0;
879 
880     // Each element in le_sizes is the size of the leafentry pointed to by leafpointers.
881     toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc);
882     size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get());
883     le_sizes[0] = 0;
884 
885     // Create an array that will store the size of each basement.
886     // This is the sum of the leaf sizes of all the leaves in that basement.
887     // We don't know how many basements there will be, so we use num_le as the upper bound.
888 
889     // Sum of all le sizes in a single basement
890     toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc);
891     size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get());
892 
893     // Sum of all key sizes in a single basement
894     toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc);
895     size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get());
896 
897     // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les).
898     // Each entry is the number of leafentries in this basement.  (Again, num_le is overkill upper baound.)
899     toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc);
900     uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get());
901     num_les_this_bn[0] = 0;
902 
903     // Figure out the new pivots.
904     // We need the index of each pivot, and for each basement we need
905     // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement).
906     uint32_t curr_pivot = 0;
907     uint32_t num_le_in_curr_bn = 0;
908     uint32_t bn_size_so_far = 0;
909     for (uint32_t i = 0; i < num_le; i++) {
910         uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]);
911         le_sizes[i] = curr_le_size;
912         if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) {
913             // cap off the current basement node to end with the element before i
914             new_pivots[curr_pivot] = i-1;
915             curr_pivot++;
916             num_le_in_curr_bn = 0;
917             bn_size_so_far = 0;
918         }
919         num_le_in_curr_bn++;
920         num_les_this_bn[curr_pivot] = num_le_in_curr_bn;
921         bn_le_sizes[curr_pivot] += curr_le_size;
922         bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i];  // uint32_t le_offset
923         bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i];
924     }
925     // curr_pivot is now the total number of pivot keys in the leaf node
926     int num_pivots   = curr_pivot;
927     int num_children = num_pivots + 1;
928 
929     // now we need to fill in the new basement nodes and pivots
930 
931     // TODO: (Zardosht) this is an ugly thing right now
932     // Need to figure out how to properly deal with seqinsert.
933     // I am not happy with how this is being
934     // handled with basement nodes
935     uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1);
936 
937     // choose the max msn applied to any basement as the max msn applied to all new basements
938     MSN max_msn = ZERO_MSN;
939     for (uint32_t i = 0; i < num_orig_basements; i++) {
940         MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
941         max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
942     }
943     // remove the basement node in the node, we've saved a copy
944     for (uint32_t i = 0; i < num_orig_basements; i++) {
945         // save a reference to the old basement nodes
946         // we will need them to ensure that the memory
947         // stays intact
948         old_bns[i] = toku_detach_bn(node, i);
949     }
950     // Now destroy the old basements, but do not destroy leaves
951     toku_destroy_ftnode_internals(node);
952 
953     // now reallocate pieces and start filling them in
954     invariant(num_children > 0);
955 
956     node->n_children = num_children;
957     XCALLOC_N(num_children, node->bp);             // allocate pointers to basements (bp)
958     for (int i = 0; i < num_children; i++) {
959         set_BLB(node, i, toku_create_empty_bn());  // allocate empty basements and set bp pointers
960     }
961 
962     // now we start to fill in the data
963 
964     // first the pivots
965     toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT));
966     DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get());
967     for (int i = 0; i < num_pivots; i++) {
968         uint32_t size = key_sizes[new_pivots[i]];
969         const void *key = key_pointers[new_pivots[i]];
970         toku_fill_dbt(&pivotkeys[i], key, size);
971     }
972     node->pivotkeys.create_from_dbts(pivotkeys, num_pivots);
973 
974     uint32_t baseindex_this_bn = 0;
975     // now the basement nodes
976     for (int i = 0; i < num_children; i++) {
977         // put back seqinsert
978         BLB_SEQINSERT(node, i) = tmp_seqinsert;
979 
980         // create start (inclusive) and end (exclusive) boundaries for data of basement node
981         uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1;               // index of first leaf in basement
982         uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1;     // index of first leaf in next basement
983         uint32_t num_in_bn = curr_end - curr_start;                         // number of leaves in this basement
984 
985         // create indexes for new basement
986         invariant(baseindex_this_bn == curr_start);
987         uint32_t num_les_to_copy = num_les_this_bn[i];
988         invariant(num_les_to_copy == num_in_bn);
989 
990         bn_data* bd = BLB_DATA(node, i);
991         bd->set_contents_as_clone_of_sorted_array(
992             num_les_to_copy,
993             &key_pointers[baseindex_this_bn],
994             &key_sizes[baseindex_this_bn],
995             &leafpointers[baseindex_this_bn],
996             &le_sizes[baseindex_this_bn],
997             bn_key_sizes[i],  // Total key sizes
998             bn_le_sizes[i]  // total le sizes
999             );
1000 
1001         BP_STATE(node,i) = PT_AVAIL;
1002         BP_TOUCH_CLOCK(node,i);
1003         BLB_MAX_MSN_APPLIED(node,i) = max_msn;
1004         baseindex_this_bn += num_les_to_copy;  // set to index of next bn
1005     }
1006     node->max_msn_applied_to_node_on_disk = max_msn;
1007 
1008     // destroy buffers of old mempools
1009     for (uint32_t i = 0; i < num_orig_basements; i++) {
1010         destroy_basement_node(old_bns[i]);
1011     }
1012 }
1013 
1014 bool toku_ftnode_fully_in_memory(FTNODE node) {
1015     for (int i = 0; i < node->n_children; i++) {
1016         if (BP_STATE(node,i) != PT_AVAIL) {
1017             return false;
1018         }
1019     }
1020     return true;
1021 }
1022 
1023 void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) {
1024     paranoid_invariant(toku_ftnode_fully_in_memory(node));
1025 }
1026 
1027 uint32_t toku_ftnode_leaf_num_entries(FTNODE node) {
1028     toku_ftnode_assert_fully_in_memory(node);
1029     uint32_t num_entries = 0;
1030     for (int i = 0; i < node->n_children; i++) {
1031         num_entries += BLB_DATA(node, i)->num_klpairs();
1032     }
1033     return num_entries;
1034 }
1035 
1036 enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) {
1037     enum reactivity re = RE_STABLE;
1038     toku_ftnode_assert_fully_in_memory(node);
1039     paranoid_invariant(node->height==0);
1040     unsigned int size = toku_serialize_ftnode_size(node);
1041     if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) {
1042         re = RE_FISSIBLE;
1043     } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
1044         re = RE_FUSIBLE;
1045     }
1046     return re;
1047 }
1048 
1049 enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) {
1050     paranoid_invariant(node->height > 0);
1051     int n_children = node->n_children;
1052     if (n_children > (int) fanout) {
1053         return RE_FISSIBLE;
1054     }
1055     if (n_children * 4 < (int) fanout) {
1056         return RE_FUSIBLE;
1057     }
1058     return RE_STABLE;
1059 }
1060 
1061 enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) {
1062     toku_ftnode_assert_fully_in_memory(node);
1063     if (node->height == 0) {
1064         return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize);
1065     } else {
1066         return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout);
1067     }
1068 }
1069 
1070 unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) {
1071     return bnc->msg_buffer.buffer_size_in_use();
1072 }
1073 
1074 // Return true if the size of the buffers plus the amount of work done is large enough.
1075 // Return false if there is nothing to be flushed (the buffers empty).
1076 bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) {
1077     uint64_t size = toku_serialize_ftnode_size(node);
1078 
1079     bool buffers_are_empty = true;
1080     toku_ftnode_assert_fully_in_memory(node);
1081     //
1082     // the nonleaf node is gorged if the following holds true:
1083     //  - the buffers are non-empty
1084     //  - the total workdone by the buffers PLUS the size of the buffers
1085     //     is greater than nodesize (which as of Maxwell should be
1086     //     4MB)
1087     //
1088     paranoid_invariant(node->height > 0);
1089     for (int child = 0; child < node->n_children; ++child) {
1090         size += BP_WORKDONE(node, child);
1091     }
1092     for (int child = 0; child < node->n_children; ++child) {
1093         if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
1094             buffers_are_empty = false;
1095             break;
1096         }
1097     }
1098     return ((size > nodesize)
1099             &&
1100             (!buffers_are_empty));
1101 }
1102 
1103 int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) {
1104     return bnc->msg_buffer.num_entries();
1105 }
1106 
1107 // how much memory does this child buffer consume?
1108 long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) {
1109     return (sizeof(*bnc) +
1110             bnc->msg_buffer.memory_footprint() +
1111             bnc->fresh_message_tree.memory_size() +
1112             bnc->stale_message_tree.memory_size() +
1113             bnc->broadcast_list.memory_size());
1114 }
1115 
1116 // how much memory in this child buffer holds useful data?
1117 // originally created solely for use by test program(s).
1118 long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) {
1119     return (sizeof(*bnc) +
1120             bnc->msg_buffer.memory_size_in_use() +
1121             bnc->fresh_message_tree.memory_size() +
1122             bnc->stale_message_tree.memory_size() +
1123             bnc->broadcast_list.memory_size());
1124 }
1125 
1126 //
1127 // Garbage collection
1128 // Message injection
1129 // Message application
1130 //
1131 
1132 // Used only by test programs: append a child node to a parent node
1133 void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1134     int childnum = node->n_children;
1135     node->n_children++;
1136     REALLOC_N(node->n_children, node->bp);
1137     BP_BLOCKNUM(node,childnum) = child->blocknum;
1138     BP_STATE(node,childnum) = PT_AVAIL;
1139     BP_WORKDONE(node, childnum)   = 0;
1140     set_BNC(node, childnum, toku_create_empty_nl());
1141     if (pivotkey) {
1142         invariant(childnum > 0);
1143         node->pivotkeys.insert_at(pivotkey, childnum - 1);
1144     }
1145     node->set_dirty();
1146 }
1147 
1148 void
1149 toku_ft_bn_apply_msg_once (
1150     BASEMENTNODE bn,
1151     const ft_msg &msg,
1152     uint32_t idx,
1153     uint32_t le_keylen,
1154     LEAFENTRY le,
1155     txn_gc_info *gc_info,
1156     uint64_t *workdone,
1157     STAT64INFO stats_to_update,
1158     int64_t *logical_rows_delta
1159     )
1160 // Effect: Apply msg to leafentry (msn is ignored)
1161 //         Calculate work done by message on leafentry and add it to caller's workdone counter.
1162 //   idx is the location where it goes
1163 //   le is old leafentry
1164 {
1165     size_t newsize=0, oldsize=0, workdone_this_le=0;
1166     LEAFENTRY new_le=0;
1167     // how many bytes of user data (not including overhead) were added or
1168     // deleted from this row
1169     int64_t numbytes_delta = 0;
1170     // will be +1 or -1 or 0 (if row was added or deleted or not)
1171     int64_t numrows_delta = 0;
1172     // will be +1, -1 or 0 if a message that was accounted for logically has
1173     // changed in meaning such as an insert changed to an update or a delete
1174     // changed to a noop
1175     int64_t logical_rows_delta_le = 0;
1176     uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
1177     if (le) {
1178         oldsize = leafentry_memsize(le) + key_storage_size;
1179     }
1180 
1181     // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
1182     // to allocate more space. That means le is guaranteed to not cause a
1183     // sigsegv but it may point to a mempool that is no longer in use.
1184     // We'll have to release the old mempool later.
1185     logical_rows_delta_le = toku_le_apply_msg(
1186         msg,
1187         le,
1188         &bn->data_buffer,
1189         idx,
1190         le_keylen,
1191         gc_info,
1192         &new_le,
1193         &numbytes_delta);
1194 
1195     // at this point, we cannot trust cmd->u.id.key to be valid.
1196     // The dmt may have realloced its mempool and freed the one containing key.
1197 
1198     newsize = new_le ? (leafentry_memsize(new_le) +  + key_storage_size) : 0;
1199     if (le && new_le) {
1200         workdone_this_le = (oldsize > newsize ? oldsize : newsize);  // work done is max of le size before and after message application
1201 
1202     } else {           // we did not just replace a row, so ...
1203         if (le) {
1204             //            ... we just deleted a row ...
1205             workdone_this_le = oldsize;
1206             numrows_delta = -1;
1207         }
1208         if (new_le) {
1209             //            ... or we just added a row
1210             workdone_this_le = newsize;
1211             numrows_delta = 1;
1212         }
1213     }
1214     if (FT_LIKELY(workdone != NULL)) {  // test programs may call with NULL
1215         *workdone += workdone_this_le;
1216     }
1217 
1218     if (FT_LIKELY(logical_rows_delta != NULL)) {
1219         *logical_rows_delta += logical_rows_delta_le;
1220     }
1221     // now update stat64 statistics
1222     bn->stat64_delta.numrows  += numrows_delta;
1223     bn->stat64_delta.numbytes += numbytes_delta;
1224     // the only reason stats_to_update may be null is for tests
1225     if (FT_LIKELY(stats_to_update != NULL)) {
1226         stats_to_update->numrows += numrows_delta;
1227         stats_to_update->numbytes += numbytes_delta;
1228     }
1229 }
1230 
1231 static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number.  We want to make sure that the user actually passes us the setval_extra_s that we passed in.
1232 struct setval_extra_s {
1233     uint32_t  tag;
1234     bool did_set_val;
1235     // any error code that setval_fun wants to return goes here.
1236     int setval_r;
1237     // need arguments for toku_ft_bn_apply_msg_once
1238     BASEMENTNODE bn;
1239     // captured from original message, not currently used
1240     MSN msn;
1241     XIDS xids;
1242     const DBT* key;
1243     uint32_t idx;
1244     uint32_t le_keylen;
1245     LEAFENTRY le;
1246     txn_gc_info* gc_info;
1247     uint64_t* workdone;  // set by toku_ft_bn_apply_msg_once()
1248     STAT64INFO stats_to_update;
1249     int64_t* logical_rows_delta;
1250 };
1251 
1252 /*
1253  * If new_val == NULL, we send a delete message instead of an insert.
1254  * This happens here instead of in do_delete() for consistency.
1255  * setval_fun() is called from handlerton, passing in svextra_v
1256  * from setval_extra_s input arg to ft->update_fun().
1257  */
1258 static void setval_fun (const DBT *new_val, void *svextra_v) {
1259     struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1260     paranoid_invariant(svextra->tag==setval_tag);
1261     paranoid_invariant(!svextra->did_set_val);
1262     svextra->did_set_val = true;
1263 
1264     {
1265         // can't leave scope until toku_ft_bn_apply_msg_once if
1266         // this is a delete
1267         DBT val;
1268         ft_msg msg(
1269             svextra->key,
1270             new_val ? new_val : toku_init_dbt(&val),
1271             new_val ? FT_INSERT : FT_DELETE_ANY,
1272             svextra->msn,
1273             svextra->xids);
1274         toku_ft_bn_apply_msg_once(
1275             svextra->bn,
1276             msg,
1277             svextra->idx,
1278             svextra->le_keylen,
1279             svextra->le,
1280             svextra->gc_info,
1281             svextra->workdone,
1282             svextra->stats_to_update,
1283             svextra->logical_rows_delta);
1284         svextra->setval_r = 0;
1285     }
1286 }
1287 
1288 // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
1289 // do_update()), so capturing the msn in the setval_extra_s is not strictly
1290 // required. The alternative would be to put a dummy msn in the messages
1291 // created by setval_fun(), but preserving the original msn seems cleaner and
1292 // it preserves accountability at a lower layer.
1293 static int do_update(
1294     ft_update_func update_fun,
1295     const DESCRIPTOR_S* desc,
1296     BASEMENTNODE bn,
1297     const ft_msg &msg,
1298     uint32_t idx,
1299     LEAFENTRY le,
1300     void* keydata,
1301     uint32_t keylen,
1302     txn_gc_info* gc_info,
1303     uint64_t* workdone,
1304     STAT64INFO stats_to_update,
1305     int64_t* logical_rows_delta) {
1306 
1307     LEAFENTRY le_for_update;
1308     DBT key;
1309     const DBT *keyp;
1310     const DBT *update_function_extra;
1311     DBT vdbt;
1312     const DBT *vdbtp;
1313 
1314     // the location of data depends whether this is a regular or
1315     // broadcast update
1316     if (msg.type() == FT_UPDATE) {
1317         // key is passed in with command (should be same as from le)
1318         // update function extra is passed in with command
1319         keyp = msg.kdbt();
1320         update_function_extra = msg.vdbt();
1321     } else {
1322         invariant(msg.type() == FT_UPDATE_BROADCAST_ALL);
1323         // key is not passed in with broadcast, it comes from le
1324         // update function extra is passed in with command
1325         paranoid_invariant(le);  // for broadcast updates, we just hit all leafentries
1326                      // so this cannot be null
1327         paranoid_invariant(keydata);
1328         paranoid_invariant(keylen);
1329         paranoid_invariant(msg.kdbt()->size == 0);
1330         keyp = toku_fill_dbt(&key, keydata, keylen);
1331         update_function_extra = msg.vdbt();
1332     }
1333     toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL);
1334 
1335     if (le && !le_latest_is_del(le)) {
1336         // if the latest val exists, use it, and we'll use the leafentry later
1337         uint32_t vallen;
1338         void *valp = le_latest_val_and_len(le, &vallen);
1339         vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
1340     } else {
1341         // otherwise, the val and leafentry are both going to be null
1342         vdbtp = NULL;
1343     }
1344     le_for_update = le;
1345 
1346     struct setval_extra_s setval_extra = {
1347         setval_tag,
1348         false,
1349         0,
1350         bn,
1351         msg.msn(),
1352         msg.xids(),
1353         keyp,
1354         idx,
1355         keylen,
1356         le_for_update,
1357         gc_info,
1358         workdone,
1359         stats_to_update,
1360         logical_rows_delta
1361     };
1362     // call handlerton's ft->update_fun(), which passes setval_extra
1363     // to setval_fun()
1364     FAKE_DB(db, desc);
1365     int r = update_fun(
1366         &db,
1367         keyp,
1368         vdbtp,
1369         update_function_extra,
1370         setval_fun,
1371         &setval_extra);
1372 
1373     if (r == 0) { r = setval_extra.setval_r; }
1374     return r;
1375 }
1376 
1377 // Should be renamed as something like "apply_msg_to_basement()."
1378 void toku_ft_bn_apply_msg(
1379     const toku::comparator& cmp,
1380     ft_update_func update_fun,
1381     BASEMENTNODE bn,
1382     const ft_msg& msg,
1383     txn_gc_info* gc_info,
1384     uint64_t* workdone,
1385     STAT64INFO stats_to_update,
1386     int64_t* logical_rows_delta) {
1387 // Effect:
1388 //   Put a msg into a leaf.
1389 //   Calculate work done by message on leafnode and add it to caller's
1390 //   workdone counter.
1391 // The leaf could end up "too big" or "too small".  The caller must fix that up.
1392     LEAFENTRY storeddata;
1393     void* key = NULL;
1394     uint32_t keylen = 0;
1395 
1396     uint32_t num_klpairs;
1397     int r;
1398     struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt());
1399 
1400     unsigned int doing_seqinsert = bn->seqinsert;
1401     bn->seqinsert = 0;
1402 
1403     switch (msg.type()) {
1404     case FT_INSERT_NO_OVERWRITE:
1405     case FT_INSERT: {
1406         uint32_t idx;
1407         if (doing_seqinsert) {
1408             idx = bn->data_buffer.num_klpairs();
1409             DBT kdbt;
1410             r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data);
1411             if (r != 0) goto fz;
1412             int c = toku_msg_leafval_heaviside(kdbt, be);
1413             if (c >= 0) goto fz;
1414             r = DB_NOTFOUND;
1415         } else {
1416         fz:
1417             r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1418                 be,
1419                 &storeddata,
1420                 &key,
1421                 &keylen,
1422                 &idx
1423                 );
1424         }
1425         if (r==DB_NOTFOUND) {
1426             storeddata = 0;
1427         } else {
1428             assert_zero(r);
1429         }
1430         toku_ft_bn_apply_msg_once(
1431             bn,
1432             msg,
1433             idx,
1434             keylen,
1435             storeddata,
1436             gc_info,
1437             workdone,
1438             stats_to_update,
1439             logical_rows_delta);
1440 
1441         // if the insertion point is within a window of the right edge of
1442         // the leaf then it is sequential
1443         // window = min(32, number of leaf entries/16)
1444         {
1445             uint32_t s = bn->data_buffer.num_klpairs();
1446             uint32_t w = s / 16;
1447             if (w == 0) w = 1;
1448             if (w > 32) w = 32;
1449 
1450             // within the window?
1451             if (s - idx <= w)
1452                 bn->seqinsert = doing_seqinsert + 1;
1453         }
1454         break;
1455     }
1456     case FT_DELETE_ANY:
1457     case FT_ABORT_ANY:
1458     case FT_COMMIT_ANY: {
1459         uint32_t idx;
1460         // Apply to all the matches
1461 
1462         r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1463             be,
1464             &storeddata,
1465             &key,
1466             &keylen,
1467             &idx);
1468         if (r == DB_NOTFOUND) break;
1469         assert_zero(r);
1470         toku_ft_bn_apply_msg_once(
1471             bn,
1472             msg,
1473             idx,
1474             keylen,
1475             storeddata,
1476             gc_info,
1477             workdone,
1478             stats_to_update,
1479             logical_rows_delta);
1480         break;
1481     }
1482     case FT_OPTIMIZE_FOR_UPGRADE:
1483         // fall through so that optimize_for_upgrade performs rest of the optimize logic
1484     case FT_COMMIT_BROADCAST_ALL:
1485     case FT_OPTIMIZE:
1486         // Apply to all leafentries
1487         num_klpairs = bn->data_buffer.num_klpairs();
1488         for (uint32_t idx = 0; idx < num_klpairs; ) {
1489             void* curr_keyp = NULL;
1490             uint32_t curr_keylen = 0;
1491             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1492             assert_zero(r);
1493             int deleted = 0;
1494             if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1495                 // message application code needs a key in order to determine
1496                 // how much work was done by this message. since this is a
1497                 // broadcast message, we have to create a new message whose
1498                 // key is the current le's key.
1499                 DBT curr_keydbt;
1500                 ft_msg curr_msg(
1501                     toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1502                     msg.vdbt(),
1503                     msg.type(),
1504                     msg.msn(),
1505                     msg.xids());
1506                 toku_ft_bn_apply_msg_once(
1507                     bn,
1508                     curr_msg,
1509                     idx,
1510                     curr_keylen,
1511                     storeddata,
1512                     gc_info,
1513                     workdone,
1514                     stats_to_update,
1515                     logical_rows_delta);
1516                 // at this point, we cannot trust msg.kdbt to be valid.
1517                 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1518                 if (new_dmt_size != num_klpairs) {
1519                     paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1520                     //Item was deleted.
1521                     deleted = 1;
1522                 }
1523             }
1524             if (deleted)
1525                 num_klpairs--;
1526             else
1527                 idx++;
1528         }
1529         paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1530 
1531         break;
1532     case FT_COMMIT_BROADCAST_TXN:
1533     case FT_ABORT_BROADCAST_TXN:
1534         // Apply to all leafentries if txn is represented
1535         num_klpairs = bn->data_buffer.num_klpairs();
1536         for (uint32_t idx = 0; idx < num_klpairs; ) {
1537             void* curr_keyp = NULL;
1538             uint32_t curr_keylen = 0;
1539             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1540             assert_zero(r);
1541             int deleted = 0;
1542             if (le_has_xids(storeddata, msg.xids())) {
1543                 // message application code needs a key in order to determine
1544                 // how much work was done by this message. since this is a
1545                 // broadcast message, we have to create a new message whose key
1546                 // is the current le's key.
1547                 DBT curr_keydbt;
1548                 ft_msg curr_msg(
1549                     toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1550                     msg.vdbt(),
1551                     msg.type(),
1552                     msg.msn(),
1553                     msg.xids());
1554                 toku_ft_bn_apply_msg_once(
1555                     bn,
1556                     curr_msg,
1557                     idx,
1558                     curr_keylen,
1559                     storeddata,
1560                     gc_info,
1561                     workdone,
1562                     stats_to_update,
1563                     logical_rows_delta);
1564                 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1565                 if (new_dmt_size != num_klpairs) {
1566                     paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1567                     //Item was deleted.
1568                     deleted = 1;
1569                 }
1570             }
1571             if (deleted)
1572                 num_klpairs--;
1573             else
1574                 idx++;
1575         }
1576         paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1577 
1578         break;
1579     case FT_UPDATE: {
1580         uint32_t idx;
1581         r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1582             be,
1583             &storeddata,
1584             &key,
1585             &keylen,
1586             &idx
1587             );
1588         if (r==DB_NOTFOUND) {
1589             {
1590                 //Point to msg's copy of the key so we don't worry about le being freed
1591                 //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled
1592                 key = msg.kdbt()->data;
1593                 keylen = msg.kdbt()->size;
1594             }
1595             r = do_update(
1596                 update_fun,
1597                 cmp.get_descriptor(),
1598                 bn,
1599                 msg,
1600                 idx,
1601                 NULL,
1602                 NULL,
1603                 0,
1604                 gc_info,
1605                 workdone,
1606                 stats_to_update,
1607                 logical_rows_delta);
1608         } else if (r==0) {
1609             r = do_update(
1610                 update_fun,
1611                 cmp.get_descriptor(),
1612                 bn,
1613                 msg,
1614                 idx,
1615                 storeddata,
1616                 key,
1617                 keylen,
1618                 gc_info,
1619                 workdone,
1620                 stats_to_update,
1621                 logical_rows_delta);
1622         } // otherwise, a worse error, just return it
1623         break;
1624     }
1625     case FT_UPDATE_BROADCAST_ALL: {
1626         // apply to all leafentries.
1627         uint32_t idx = 0;
1628         uint32_t num_leafentries_before;
1629         // This is used to avoid having the logical row count changed on apply
1630         // of this message since it will return a negative number of the number
1631         // of leaf entries visited and cause the ft header value to go to 0;
1632         // This message will not change the number of rows, so just use the
1633         // bogus value.
1634         int64_t temp_logical_rows_delta = 0;
1635         while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1636             void* curr_key = nullptr;
1637             uint32_t curr_keylen = 0;
1638             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key);
1639             assert_zero(r);
1640 
1641             //TODO: 46 replace this with something better than cloning key
1642             // TODO: (Zardosht) This may be unnecessary now, due to how the key
1643             // is handled in the bndata. Investigate and determine
1644             char clone_mem[curr_keylen];  // only lasts one loop, alloca would overflow (end of function)
1645             memcpy((void*)clone_mem, curr_key, curr_keylen);
1646             curr_key = (void*)clone_mem;
1647 
1648             // This is broken below. Have a compilation error checked
1649             // in as a reminder
1650             r = do_update(
1651                 update_fun,
1652                 cmp.get_descriptor(),
1653                 bn,
1654                 msg,
1655                 idx,
1656                 storeddata,
1657                 curr_key,
1658                 curr_keylen,
1659                 gc_info,
1660                 workdone,
1661                 stats_to_update,
1662                 &temp_logical_rows_delta);
1663             assert_zero(r);
1664 
1665             if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1666                 // we didn't delete something, so increment the index.
1667                 idx++;
1668             }
1669         }
1670         break;
1671     }
1672     case FT_NONE: break; // don't do anything
1673     }
1674 
1675     return;
1676 }
1677 
1678 static inline int
1679 key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) {
1680     int r = cmp(a, b);
1681     if (r == 0) {
1682         if (amsn.msn > bmsn.msn) {
1683             r = +1;
1684         } else if (amsn.msn < bmsn.msn) {
1685             r = -1;
1686         } else {
1687             r = 0;
1688         }
1689     }
1690     return r;
1691 }
1692 
1693 int toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra) {
1694     MSN query_msn;
1695     DBT query_key;
1696     extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
1697     return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp);
1698 }
1699 
1700 int toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) {
1701     MSN amsn, bmsn;
1702     DBT akey, bkey;
1703     extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
1704     extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
1705     return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp);
1706 }
1707 
1708 // Effect: Enqueue the message represented by the parameters into the
1709 //   bnc's buffer, and put it in either the fresh or stale message tree,
1710 //   or the broadcast list.
1711 static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) {
1712     int r = 0;
1713     int32_t offset;
1714     bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
1715     enum ft_msg_type type = msg.type();
1716     if (ft_msg_type_applies_once(type)) {
1717         DBT key;
1718         toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size);
1719         struct toku_msg_buffer_key_msn_heaviside_extra extra(cmp, &bnc->msg_buffer, &key, msg.msn());
1720         if (is_fresh) {
1721             r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1722             assert_zero(r);
1723         } else {
1724             r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1725             assert_zero(r);
1726         }
1727     } else {
1728         invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
1729         const uint32_t idx = bnc->broadcast_list.size();
1730         r = bnc->broadcast_list.insert_at(offset, idx);
1731         assert_zero(r);
1732     }
1733 }
1734 
1735 // This is only exported for tests.
1736 void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp)
1737 {
1738     DBT k, v;
1739     ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids);
1740     bnc_insert_msg(bnc, msg, is_fresh, cmp);
1741 }
1742 
1743 // append a msg to a nonleaf node's child buffer
1744 static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node,
1745                                           int childnum, const ft_msg &msg, bool is_fresh) {
1746     paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
1747     bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp);
1748     node->set_dirty();
1749 }
1750 
1751 // This is only exported for tests.
1752 void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1753     ft_msg msg(key, val, type, msn, xids);
1754     ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1755 }
1756 
1757 static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1758 // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint.  So now we are just putting it in the buffer here.
1759 // Also we don't worry about the node getting overfull here.  It's the caller's problem.
1760 {
1761     unsigned int childnum = (target_childnum >= 0
1762                              ? target_childnum
1763                              : toku_ftnode_which_child(node, msg.kdbt(), cmp));
1764     ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1765     NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1766     bnc->flow[0] += flow_deltas[0];
1767     bnc->flow[1] += flow_deltas[1];
1768 }
1769 
1770 // TODO: Remove me, I'm boring.
1771 static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) {
1772     return cmp(key, pivot);
1773 }
1774 
1775 /* Find the leftmost child that may contain the key.
1776  * If the key exists it will be in the child whose number
1777  * is the return value of this function.
1778  */
1779 int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1780     // a funny case of no pivots
1781     if (node->n_children <= 1) return 0;
1782 
1783     DBT pivot;
1784 
1785     // check the last key to optimize seq insertions
1786     int n = node->n_children-1;
1787     int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
1788     if (c > 0) return n;
1789 
1790     // binary search the pivots
1791     int lo = 0;
1792     int hi = n-1; // skip the last one, we checked it above
1793     int mi;
1794     while (lo < hi) {
1795         mi = (lo + hi) / 2;
1796         c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1797         if (c > 0) {
1798             lo = mi+1;
1799             continue;
1800         }
1801         if (c < 0) {
1802             hi = mi;
1803             continue;
1804         }
1805         return mi;
1806     }
1807     return lo;
1808 }
1809 
1810 // Used for HOT.
1811 int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1812     DBT pivot;
1813     int low = 0;
1814     int hi = node->n_children - 1;
1815     int mi;
1816     while (low < hi) {
1817         mi = (low + hi) / 2;
1818         int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1819         if (r > 0) {
1820             low = mi + 1;
1821         } else if (r < 0) {
1822             hi = mi;
1823         } else {
1824             // if they were exactly equal, then we want the sub-tree under
1825             // the next pivot.
1826             return mi + 1;
1827         }
1828     }
1829     invariant(low == hi);
1830     return low;
1831 }
1832 
1833 void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
1834     FTNODE CAST_FROM_VOIDP(node, value_data);
1835     node->ct_pair = p;
1836 }
1837 
1838 static void
1839 ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1840 // Effect: Put the message into a nonleaf node.  We put it into all children, possibly causing the children to become reactive.
1841 //  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
1842 //  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
1843 {
1844     for (int i = 0; i < node->n_children; i++) {
1845         ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas);
1846     }
1847 }
1848 
1849 static void
1850 ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1851 // Effect: Put the message into a nonleaf node.  We may put it into a child, possibly causing the child to become reactive.
1852 //  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
1853 //  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
1854 //
1855 {
1856 
1857     //
1858     // see comments in toku_ft_leaf_apply_msg
1859     // to understand why we handle setting
1860     // node->max_msn_applied_to_node_on_disk here,
1861     // and don't do it in toku_ftnode_put_msg
1862     //
1863     MSN msg_msn = msg.msn();
1864     invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
1865     node->max_msn_applied_to_node_on_disk = msg_msn;
1866 
1867     if (ft_msg_type_applies_once(msg.type())) {
1868         ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
1869     } else if (ft_msg_type_applies_all(msg.type())) {
1870         ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas);
1871     } else {
1872         paranoid_invariant(ft_msg_type_does_nothing(msg.type()));
1873     }
1874 }
1875 
1876 // Garbage collect one leaf entry.
1877 static void
1878 ft_basement_node_gc_once(BASEMENTNODE bn,
1879                           uint32_t index,
1880                           void* keyp,
1881                           uint32_t keylen,
1882                           LEAFENTRY leaf_entry,
1883                           txn_gc_info *gc_info,
1884                           STAT64INFO_S * delta)
1885 {
1886     paranoid_invariant(leaf_entry);
1887 
1888     // Don't run garbage collection on non-mvcc leaf entries.
1889     if (leaf_entry->type != LE_MVCC) {
1890         goto exit;
1891     }
1892 
1893     // Don't run garbage collection if this leafentry decides it's not worth it.
1894     if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
1895         goto exit;
1896     }
1897 
1898     LEAFENTRY new_leaf_entry;
1899     new_leaf_entry = NULL;
1900 
1901     // The mempool doesn't free itself.  When it allocates new memory,
1902     // this pointer will be set to the older memory that must now be
1903     // freed.
1904     void * maybe_free;
1905     maybe_free = NULL;
1906 
1907     // These will represent the number of bytes and rows changed as
1908     // part of the garbage collection.
1909     int64_t numbytes_delta;
1910     int64_t numrows_delta;
1911     toku_le_garbage_collect(leaf_entry,
1912                             &bn->data_buffer,
1913                             index,
1914                             keyp,
1915                             keylen,
1916                             gc_info,
1917                             &new_leaf_entry,
1918                             &numbytes_delta);
1919 
1920     numrows_delta = 0;
1921     if (new_leaf_entry) {
1922         numrows_delta = 0;
1923     } else {
1924         numrows_delta = -1;
1925     }
1926 
1927     // If we created a new mempool buffer we must free the
1928     // old/original buffer.
1929     if (maybe_free) {
1930         toku_free(maybe_free);
1931     }
1932 
1933     // Update stats.
1934     bn->stat64_delta.numrows += numrows_delta;
1935     bn->stat64_delta.numbytes += numbytes_delta;
1936     delta->numrows += numrows_delta;
1937     delta->numbytes += numbytes_delta;
1938 
1939 exit:
1940     return;
1941 }
1942 
1943 // Garbage collect all leaf entries for a given basement node.
1944 static void
1945 basement_node_gc_all_les(BASEMENTNODE bn,
1946                          txn_gc_info *gc_info,
1947                          STAT64INFO_S * delta)
1948 {
1949     int r = 0;
1950     uint32_t index = 0;
1951     uint32_t num_leafentries_before;
1952     while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1953         void* keyp = NULL;
1954         uint32_t keylen = 0;
1955         LEAFENTRY leaf_entry;
1956         r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp);
1957         assert_zero(r);
1958         ft_basement_node_gc_once(
1959             bn,
1960             index,
1961             keyp,
1962             keylen,
1963             leaf_entry,
1964             gc_info,
1965             delta
1966             );
1967         // Check if the leaf entry was deleted or not.
1968         if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1969             ++index;
1970         }
1971     }
1972 }
1973 
1974 // Garbage collect all leaf entires in all basement nodes.
1975 static void
1976 ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
1977 {
1978     toku_ftnode_assert_fully_in_memory(node);
1979     paranoid_invariant_zero(node->height);
1980     // Loop through each leaf entry, garbage collecting as we go.
1981     for (int i = 0; i < node->n_children; ++i) {
1982         // Perform the garbage collection.
1983         BASEMENTNODE bn = BLB(node, i);
1984         STAT64INFO_S delta;
1985         delta.numrows = 0;
1986         delta.numbytes = 0;
1987         basement_node_gc_all_les(bn, gc_info, &delta);
1988         toku_ft_update_stats(&ft->in_memory_stats, delta);
1989     }
1990 }
1991 
1992 void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
1993     TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1994     if (logger) {
1995         TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
1996         txn_manager_state txn_state_for_gc(txn_manager);
1997         txn_state_for_gc.init();
1998         TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1999 
2000         // Perform full garbage collection.
2001         //
2002         // - txn_state_for_gc
2003         //     a fresh snapshot of the transaction system.
2004         // - oldest_referenced_xid_for_simple_gc
2005         //     the oldest xid in any live list as of right now - suitible for simple gc
2006         // - node->oldest_referenced_xid_known
2007         //     the last known oldest referenced xid for this node and any unapplied messages.
2008         //     it is a lower bound on the actual oldest referenced xid - but becasue there
2009         //     may be abort messages above us, we need to be careful to only use this value
2010         //     for implicit promotion (as opposed to the oldest referenced xid for simple gc)
2011         //
2012         // The node has its own oldest referenced xid because it must be careful not to implicitly promote
2013         // provisional entries for transactions that are no longer live, but may have abort messages
2014         // somewhere above us in the tree.
2015         txn_gc_info gc_info(&txn_state_for_gc,
2016                             oldest_referenced_xid_for_simple_gc,
2017                             node->oldest_referenced_xid_known,
2018                             true);
2019         ft_leaf_gc_all_les(ft, node, &gc_info);
2020     }
2021 }
2022 
2023 void toku_ftnode_put_msg(
2024     const toku::comparator &cmp,
2025     ft_update_func update_fun,
2026     FTNODE node,
2027     int target_childnum,
2028     const ft_msg &msg,
2029     bool is_fresh,
2030     txn_gc_info* gc_info,
2031     size_t flow_deltas[],
2032     STAT64INFO stats_to_update,
2033     int64_t* logical_rows_delta) {
2034 // Effect: Push message into the subtree rooted at NODE.
2035 //   If NODE is a leaf, then
2036 //   put message into leaf, applying it to the leafentries
2037 //   If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
2038 //   The node may become overfull.  That's not our problem.
2039     toku_ftnode_assert_fully_in_memory(node);
2040     //
2041     // see comments in toku_ft_leaf_apply_msg
2042     // to understand why we don't handle setting
2043     // node->max_msn_applied_to_node_on_disk here,
2044     // and instead defer to these functions
2045     //
2046     if (node->height==0) {
2047         toku_ft_leaf_apply_msg(
2048             cmp,
2049             update_fun,
2050             node,
2051             target_childnum, msg,
2052             gc_info,
2053             nullptr,
2054             stats_to_update,
2055             logical_rows_delta);
2056     } else {
2057         ft_nonleaf_put_msg(
2058             cmp,
2059             node,
2060             target_childnum,
2061             msg,
2062             is_fresh,
2063             flow_deltas);
2064     }
2065 }
2066 
2067 // Effect: applies the message to the leaf if the appropriate basement node is
2068 //           in memory. This function is called during message injection and/or
2069 //           flushing, so the entire node MUST be in memory.
2070 void toku_ft_leaf_apply_msg(
2071     const toku::comparator& cmp,
2072     ft_update_func update_fun,
2073     FTNODE node,
2074     int target_childnum,  // which child to inject to, or -1 if unknown
2075     const ft_msg& msg,
2076     txn_gc_info* gc_info,
2077     uint64_t* workdone,
2078     STAT64INFO stats_to_update,
2079     int64_t* logical_rows_delta) {
2080 
2081     VERIFY_NODE(t, node);
2082     toku_ftnode_assert_fully_in_memory(node);
2083 
2084     //
2085     // Because toku_ft_leaf_apply_msg is called with the intent of permanently
2086     // applying a message to a leaf node (meaning the message is permanently applied
2087     // and will be purged from the system after this call, as opposed to
2088     // toku_apply_ancestors_messages_to_node, which applies a message
2089     // for a query, but the message may still reside in the system and
2090     // be reapplied later), we mark the node as dirty and
2091     // take the opportunity to update node->max_msn_applied_to_node_on_disk.
2092     //
2093     node->set_dirty();
2094 
2095     //
2096     // we cannot blindly update node->max_msn_applied_to_node_on_disk,
2097     // we must check to see if the msn is greater that the one already stored,
2098     // because the message may have already been applied earlier (via
2099     // toku_apply_ancestors_messages_to_node) to answer a query
2100     //
2101     // This is why we handle node->max_msn_applied_to_node_on_disk both here
2102     // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg.
2103     //
2104     MSN msg_msn = msg.msn();
2105     if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
2106         node->max_msn_applied_to_node_on_disk = msg_msn;
2107     }
2108 
2109     if (ft_msg_type_applies_once(msg.type())) {
2110         unsigned int childnum = (target_childnum >= 0
2111                                  ? target_childnum
2112                                  : toku_ftnode_which_child(node, msg.kdbt(), cmp));
2113         BASEMENTNODE bn = BLB(node, childnum);
2114         if (msg.msn().msn > bn->max_msn_applied.msn) {
2115             bn->max_msn_applied = msg.msn();
2116             toku_ft_bn_apply_msg(
2117                 cmp,
2118                 update_fun,
2119                 bn,
2120                 msg,
2121                 gc_info,
2122                 workdone,
2123                 stats_to_update,
2124                 logical_rows_delta);
2125         } else {
2126             toku_ft_status_note_msn_discard();
2127         }
2128     } else if (ft_msg_type_applies_all(msg.type())) {
2129         for (int childnum=0; childnum<node->n_children; childnum++) {
2130             if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
2131                 BLB(node, childnum)->max_msn_applied = msg.msn();
2132                 toku_ft_bn_apply_msg(
2133                     cmp,
2134                     update_fun,
2135                     BLB(node, childnum),
2136                     msg,
2137                     gc_info,
2138                     workdone,
2139                     stats_to_update,
2140                     logical_rows_delta);
2141             } else {
2142                 toku_ft_status_note_msn_discard();
2143             }
2144         }
2145     } else if (!ft_msg_type_does_nothing(msg.type())) {
2146         invariant(ft_msg_type_does_nothing(msg.type()));
2147     }
2148     VERIFY_NODE(t, node);
2149 }
2150 
2151