1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /*======
4 This file is part of PerconaFT.
5 
6 
7 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8 
9     PerconaFT is free software: you can redistribute it and/or modify
10     it under the terms of the GNU General Public License, version 2,
11     as published by the Free Software Foundation.
12 
13     PerconaFT is distributed in the hope that it will be useful,
14     but WITHOUT ANY WARRANTY; without even the implied warranty of
15     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16     GNU General Public License for more details.
17 
18     You should have received a copy of the GNU General Public License
19     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
20 
21 ----------------------------------------
22 
23     PerconaFT is free software: you can redistribute it and/or modify
24     it under the terms of the GNU Affero General Public License, version 3,
25     as published by the Free Software Foundation.
26 
27     PerconaFT is distributed in the hope that it will be useful,
28     but WITHOUT ANY WARRANTY; without even the implied warranty of
29     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30     GNU Affero General Public License for more details.
31 
32     You should have received a copy of the GNU Affero General Public License
33     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
34 ======= */
35 
36 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37 
38 #include "ft/ft.h"
39 #include "ft/ft-internal.h"
40 #include "ft/serialize/ft_node-serialize.h"
41 #include "ft/node.h"
42 #include "ft/serialize/rbuf.h"
43 #include "ft/serialize/wbuf.h"
44 #include "util/scoped_malloc.h"
45 #include "util/sort.h"
46 
47 // Effect: Fill in N as an empty ftnode.
48 // TODO: Rename toku_ftnode_create
toku_initialize_empty_ftnode(FTNODE n,BLOCKNUM blocknum,int height,int num_children,int layout_version,unsigned int flags)49 void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
50     paranoid_invariant(layout_version != 0);
51     paranoid_invariant(height >= 0);
52 
53     n->max_msn_applied_to_node_on_disk = ZERO_MSN;    // correct value for root node, harmless for others
54     n->flags = flags;
55     n->blocknum = blocknum;
56     n->layout_version               = layout_version;
57     n->layout_version_original = layout_version;
58     n->layout_version_read_from_disk = layout_version;
59     n->height = height;
60     n->pivotkeys.create_empty();
61     n->bp = 0;
62     n->n_children = num_children;
63     n->oldest_referenced_xid_known = TXNID_NONE;
64 
65     if (num_children > 0) {
66         XMALLOC_N(num_children, n->bp);
67         for (int i = 0; i < num_children; i++) {
68             BP_BLOCKNUM(n,i).b=0;
69             BP_STATE(n,i) = PT_INVALID;
70             BP_WORKDONE(n,i) = 0;
71             BP_INIT_TOUCHED_CLOCK(n, i);
72             set_BNULL(n,i);
73             if (height > 0) {
74                 set_BNC(n, i, toku_create_empty_nl());
75             } else {
76                 set_BLB(n, i, toku_create_empty_bn());
77             }
78         }
79     }
80     n->set_dirty();  // special case exception, it's okay to mark as dirty because the basements are empty
81 
82     toku_ft_status_note_ftnode(height, true);
83 }
84 
85 // destroys the internals of the ftnode, but it does not free the values
86 // that are stored
87 // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
88 // MUST NOT do anything besides free the structures that have been allocated
toku_destroy_ftnode_internals(FTNODE node)89 void toku_destroy_ftnode_internals(FTNODE node) {
90     node->pivotkeys.destroy();
91     for (int i = 0; i < node->n_children; i++) {
92         if (BP_STATE(node,i) == PT_AVAIL) {
93             if (node->height > 0) {
94                 destroy_nonleaf_childinfo(BNC(node,i));
95             } else {
96                 paranoid_invariant(BLB_LRD(node, i) == 0);
97                 destroy_basement_node(BLB(node, i));
98             }
99         } else if (BP_STATE(node,i) == PT_COMPRESSED) {
100             SUB_BLOCK sb = BSB(node,i);
101             toku_free(sb->compressed_ptr);
102             toku_free(sb);
103         } else {
104             paranoid_invariant(is_BNULL(node, i));
105         }
106         set_BNULL(node, i);
107     }
108     toku_free(node->bp);
109     node->bp = NULL;
110 }
111 
112 /* Frees a node, including all the stuff in the hash table. */
toku_ftnode_free(FTNODE * nodep)113 void toku_ftnode_free(FTNODE *nodep) {
114     FTNODE node = *nodep;
115     toku_ft_status_note_ftnode(node->height, false);
116     toku_destroy_ftnode_internals(node);
117     toku_free(node);
118     *nodep = nullptr;
119 }
120 
toku_ftnode_update_disk_stats(FTNODE ftnode,FT ft,bool for_checkpoint)121 void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) {
122     STAT64INFO_S deltas = ZEROSTATS;
123     // capture deltas before rebalancing basements for serialization
124     deltas = toku_get_and_clear_basement_stats(ftnode);
125     // locking not necessary here with respect to checkpointing
126     // in Clayface (because of the pending lock and cachetable lock
127     // in toku_cachetable_begin_checkpoint)
128     // essentially, if we are dealing with a for_checkpoint
129     // parameter in a function that is called by the flush_callback,
130     // then the cachetable needs to ensure that this is called in a safe
131     // manner that does not interfere with the beginning
132     // of a checkpoint, which it does with the cachetable lock
133     // and pending lock
134     toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
135     if (for_checkpoint) {
136         toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
137     }
138 }
139 
toku_ftnode_clone_partitions(FTNODE node,FTNODE cloned_node)140 void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
141     for (int i = 0; i < node->n_children; i++) {
142         BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
143         paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
144         BP_STATE(cloned_node,i) = PT_AVAIL;
145         BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
146         if (node->height == 0) {
147             set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
148         } else {
149             set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
150         }
151     }
152 }
153 
toku_evict_bn_from_memory(FTNODE node,int childnum,FT ft)154 void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) {
155     // free the basement node
156     assert(!node->dirty());
157     BASEMENTNODE bn = BLB(node, childnum);
158     toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
159     toku_ft_adjust_logical_row_count(ft, -BLB_LRD(node, childnum));
160     BLB_LRD(node, childnum) = 0;
161     destroy_basement_node(bn);
162     set_BNULL(node, childnum);
163     BP_STATE(node, childnum) = PT_ON_DISK;
164 }
165 
toku_detach_bn(FTNODE node,int childnum)166 BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
167     assert(BP_STATE(node, childnum) == PT_AVAIL);
168     BASEMENTNODE bn = BLB(node, childnum);
169     set_BNULL(node, childnum);
170     BP_STATE(node, childnum) = PT_ON_DISK;
171     return bn;
172 }
173 
174 //
175 // Orthopush
176 //
177 
178 struct store_msg_buffer_offset_extra {
179     int32_t *offsets;
180     int i;
181 };
182 
183 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
store_msg_buffer_offset(const int32_t & offset,const uint32_t UU (idx),struct store_msg_buffer_offset_extra * const extra)184 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
185 {
186     extra->offsets[extra->i] = offset;
187     extra->i++;
188     return 0;
189 }
190 
191 /**
192  * Given pointers to offsets within a message buffer where we can find messages,
193  * figure out the MSN of each message, and compare those MSNs.  Returns 1,
194  * 0, or -1 if a is larger than, equal to, or smaller than b.
195  */
196 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
msg_buffer_offset_msn_cmp(message_buffer & msg_buffer,const int32_t & ao,const int32_t & bo)197 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
198 {
199     MSN amsn, bmsn;
200     msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
201     msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
202     if (amsn.msn > bmsn.msn) {
203         return +1;
204     }
205     if (amsn.msn < bmsn.msn) {
206         return -1;
207     }
208     return 0;
209 }
210 
211 /**
212  * Given a message buffer and and offset, apply the message with
213  * toku_ft_bn_apply_msg, or discard it,
214  * based on its MSN and the MSN of the basement node.
215  */
do_bn_apply_msg(FT_HANDLE ft_handle,BASEMENTNODE bn,message_buffer * msg_buffer,int32_t offset,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)216 static void do_bn_apply_msg(
217     FT_HANDLE ft_handle,
218     BASEMENTNODE bn,
219     message_buffer* msg_buffer,
220     int32_t offset,
221     txn_gc_info* gc_info,
222     uint64_t* workdone,
223     STAT64INFO stats_to_update,
224     int64_t* logical_rows_delta) {
225 
226     DBT k, v;
227     ft_msg msg = msg_buffer->get_message(offset, &k, &v);
228 
229     // The messages are being iterated over in (key,msn) order or just in
230     // msn order, so all the messages for one key, from one buffer, are in
231     // ascending msn order.  So it's ok that we don't update the basement
232     // node's msn until the end.
233     if (msg.msn().msn > bn->max_msn_applied.msn) {
234         toku_ft_bn_apply_msg(
235             ft_handle->ft->cmp,
236             ft_handle->ft->update_fun,
237             bn,
238             msg,
239             gc_info,
240             workdone,
241             stats_to_update,
242             logical_rows_delta);
243     } else {
244         toku_ft_status_note_msn_discard();
245     }
246 
247     // We must always mark message as stale since it has been marked
248     // (using omt::iterate_and_mark_range)
249     // It is possible to call do_bn_apply_msg even when it won't apply the
250     // message because the node containing it could have been evicted and
251     // brought back in.
252     msg_buffer->set_freshness(offset, false);
253 }
254 
255 
256 struct iterate_do_bn_apply_msg_extra {
257     FT_HANDLE t;
258     BASEMENTNODE bn;
259     NONLEAF_CHILDINFO bnc;
260     txn_gc_info *gc_info;
261     uint64_t *workdone;
262     STAT64INFO stats_to_update;
263     int64_t *logical_rows_delta;
264 };
265 
266 int iterate_do_bn_apply_msg(
267     const int32_t &offset,
268     const uint32_t UU(idx),
269     struct iterate_do_bn_apply_msg_extra* const e)
270     __attribute__((nonnull(3)));
271 
iterate_do_bn_apply_msg(const int32_t & offset,const uint32_t UU (idx),struct iterate_do_bn_apply_msg_extra * const e)272 int iterate_do_bn_apply_msg(
273     const int32_t &offset,
274     const uint32_t UU(idx),
275     struct iterate_do_bn_apply_msg_extra* const e)
276 {
277     do_bn_apply_msg(
278         e->t,
279         e->bn,
280         &e->bnc->msg_buffer,
281         offset,
282         e->gc_info,
283         e->workdone,
284         e->stats_to_update,
285         e->logical_rows_delta);
286     return 0;
287 }
288 
289 /**
290  * Given the bounds of the basement node to which we will apply messages,
291  * find the indexes within message_tree which contain the range of
292  * relevant messages.
293  *
294  * The message tree contains offsets into the buffer, where messages are
295  * found.  The pivot_bounds are the lower bound exclusive and upper bound
296  * inclusive, because they come from pivot keys in the tree.  We want OMT
297  * indices, which must have the lower bound be inclusive and the upper
298  * bound exclusive.  We will get these by telling omt::find to look
299  * for something strictly bigger than each of our pivot bounds.
300  *
301  * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
302  * bound exclusive).
303  */
304 template<typename find_bounds_omt_t>
305 static void
find_bounds_within_message_tree(const toku::comparator & cmp,const find_bounds_omt_t & message_tree,message_buffer * msg_buffer,const pivot_bounds & bounds,uint32_t * lbi,uint32_t * ube)306 find_bounds_within_message_tree(
307     const toku::comparator &cmp,
308     const find_bounds_omt_t &message_tree,      /// tree holding message buffer offsets, in which we want to look for indices
309     message_buffer *msg_buffer,           /// message buffer in which messages are found
310     const pivot_bounds &bounds,  /// key bounds within the basement node we're applying messages to
311     uint32_t *lbi,        /// (output) "lower bound inclusive" (index into message_tree)
312     uint32_t *ube         /// (output) "upper bound exclusive" (index into message_tree)
313     )
314 {
315     int r = 0;
316 
317     if (!toku_dbt_is_empty(bounds.lbe())) {
318         // By setting msn to MAX_MSN and by using direction of +1, we will
319         // get the first message greater than (in (key, msn) order) any
320         // message (with any msn) with the key lower_bound_exclusive.
321         // This will be a message we want to try applying, so it is the
322         // "lower bound inclusive" within the message_tree.
323         struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
324         int32_t found_lb;
325         r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
326         if (r == DB_NOTFOUND) {
327             // There is no relevant data (the lower bound is bigger than
328             // any message in this tree), so we have no range and we're
329             // done.
330             *lbi = 0;
331             *ube = 0;
332             return;
333         }
334         if (!toku_dbt_is_empty(bounds.ubi())) {
335             // Check if what we found for lbi is greater than the upper
336             // bound inclusive that we have.  If so, there are no relevant
337             // messages between these bounds.
338             const DBT *ubi = bounds.ubi();
339             const int32_t offset = found_lb;
340             DBT found_lbidbt;
341             msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
342             int c = cmp(&found_lbidbt, ubi);
343             // These DBTs really are both inclusive bounds, so we need
344             // strict inequality in order to determine that there's
345             // nothing between them.  If they're equal, then we actually
346             // need to apply the message pointed to by lbi, and also
347             // anything with the same key but a bigger msn.
348             if (c > 0) {
349                 *lbi = 0;
350                 *ube = 0;
351                 return;
352             }
353         }
354     } else {
355         // No lower bound given, it's negative infinity, so we start at
356         // the first message in the OMT.
357         *lbi = 0;
358     }
359     if (!toku_dbt_is_empty(bounds.ubi())) {
360         // Again, we use an msn of MAX_MSN and a direction of +1 to get
361         // the first thing bigger than the upper_bound_inclusive key.
362         // This is therefore the smallest thing we don't want to apply,
363         // and omt::iterate_on_range will not examine it.
364         struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
365         r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
366         if (r == DB_NOTFOUND) {
367             // Couldn't find anything in the buffer bigger than our key,
368             // so we need to look at everything up to the end of
369             // message_tree.
370             *ube = message_tree.size();
371         }
372     } else {
373         // No upper bound given, it's positive infinity, so we need to go
374         // through the end of the OMT.
375         *ube = message_tree.size();
376     }
377 }
378 
379 // For each message in the ancestor's buffer (determined by childnum) that
380 // is key-wise between lower_bound_exclusive and upper_bound_inclusive,
381 // apply the message to the basement node.  We treat the bounds as minus
382 // or plus infinity respectively if they are NULL.  Do not mark the node
383 // as dirty (preserve previous state of 'dirty' bit).
bnc_apply_messages_to_basement_node(FT_HANDLE t,BASEMENTNODE bn,FTNODE ancestor,int childnum,const pivot_bounds & bounds,txn_gc_info * gc_info,bool * msgs_applied)384 static void bnc_apply_messages_to_basement_node(
385     FT_HANDLE t,      // used for comparison function
386     BASEMENTNODE bn,  // where to apply messages
387     FTNODE ancestor,  // the ancestor node where we can find messages to apply
388     int childnum,  // which child buffer of ancestor contains messages we want
389     const pivot_bounds &
390         bounds,  // contains pivot key bounds of this basement node
391     txn_gc_info *gc_info,
392     bool *msgs_applied) {
393     int r;
394     NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
395 
396     // Determine the offsets in the message trees between which we need to
397     // apply messages from this buffer
398     STAT64INFO_S stats_delta = {0, 0};
399     uint64_t workdone_this_ancestor = 0;
400     int64_t logical_rows_delta = 0;
401 
402     uint32_t stale_lbi, stale_ube;
403     if (!bn->stale_ancestor_messages_applied) {
404         find_bounds_within_message_tree(t->ft->cmp,
405                                         bnc->stale_message_tree,
406                                         &bnc->msg_buffer,
407                                         bounds,
408                                         &stale_lbi,
409                                         &stale_ube);
410     } else {
411         stale_lbi = 0;
412         stale_ube = 0;
413     }
414     uint32_t fresh_lbi, fresh_ube;
415     find_bounds_within_message_tree(t->ft->cmp,
416                                     bnc->fresh_message_tree,
417                                     &bnc->msg_buffer,
418                                     bounds,
419                                     &fresh_lbi,
420                                     &fresh_ube);
421 
422     // We now know where all the messages we must apply are, so one of the
423     // following 4 cases will do the application, depending on which of
424     // the lists contains relevant messages:
425     //
426     // 1. broadcast messages and anything else, or a mix of fresh and stale
427     // 2. only fresh messages
428     // 3. only stale messages
429     if (bnc->broadcast_list.size() > 0 ||
430         (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
431         // We have messages in multiple trees, so we grab all
432         // the relevant messages' offsets and sort them by MSN, then apply
433         // them in MSN order.
434         const int buffer_size =
435             ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
436              bnc->broadcast_list.size());
437         toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
438         int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
439         struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
440                                                           .i = 0};
441 
442         // Populate offsets array with offsets to stale messages
443         r = bnc->stale_message_tree
444                 .iterate_on_range<struct store_msg_buffer_offset_extra,
445                                   store_msg_buffer_offset>(
446                     stale_lbi, stale_ube, &sfo_extra);
447         assert_zero(r);
448 
449         // Then store fresh offsets, and mark them to be moved to stale later.
450         r = bnc->fresh_message_tree
451                 .iterate_and_mark_range<struct store_msg_buffer_offset_extra,
452                                         store_msg_buffer_offset>(
453                     fresh_lbi, fresh_ube, &sfo_extra);
454         assert_zero(r);
455 
456         // Store offsets of all broadcast messages.
457         r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
458                                         store_msg_buffer_offset>(&sfo_extra);
459         assert_zero(r);
460         invariant(sfo_extra.i == buffer_size);
461 
462         // Sort by MSN.
463         toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
464             mergesort_r(offsets, buffer_size, bnc->msg_buffer);
465 
466         // Apply the messages in MSN order.
467         for (int i = 0; i < buffer_size; ++i) {
468             *msgs_applied = true;
469             do_bn_apply_msg(t,
470                             bn,
471                             &bnc->msg_buffer,
472                             offsets[i],
473                             gc_info,
474                             &workdone_this_ancestor,
475                             &stats_delta,
476                             &logical_rows_delta);
477         }
478     } else if (stale_lbi == stale_ube) {
479         // No stale messages to apply, we just apply fresh messages, and mark
480         // them to be moved to stale later.
481         struct iterate_do_bn_apply_msg_extra iter_extra = {
482             .t = t,
483             .bn = bn,
484             .bnc = bnc,
485             .gc_info = gc_info,
486             .workdone = &workdone_this_ancestor,
487             .stats_to_update = &stats_delta,
488             .logical_rows_delta = &logical_rows_delta};
489         if (fresh_ube - fresh_lbi > 0)
490             *msgs_applied = true;
491         r = bnc->fresh_message_tree
492                 .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
493                                         iterate_do_bn_apply_msg>(
494                     fresh_lbi, fresh_ube, &iter_extra);
495         assert_zero(r);
496     } else {
497         invariant(fresh_lbi == fresh_ube);
498         // No fresh messages to apply, we just apply stale messages.
499 
500         if (stale_ube - stale_lbi > 0)
501             *msgs_applied = true;
502         struct iterate_do_bn_apply_msg_extra iter_extra = {
503             .t = t,
504             .bn = bn,
505             .bnc = bnc,
506             .gc_info = gc_info,
507             .workdone = &workdone_this_ancestor,
508             .stats_to_update = &stats_delta,
509             .logical_rows_delta = &logical_rows_delta};
510 
511         r = bnc->stale_message_tree
512                 .iterate_on_range<struct iterate_do_bn_apply_msg_extra,
513                                   iterate_do_bn_apply_msg>(
514                     stale_lbi, stale_ube, &iter_extra);
515         assert_zero(r);
516     }
517     //
518     // update stats
519     //
520     if (workdone_this_ancestor > 0) {
521         (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
522                                       workdone_this_ancestor);
523     }
524     if (stats_delta.numbytes || stats_delta.numrows) {
525         toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
526     }
527     toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
528     bn->logical_rows_delta += logical_rows_delta;
529 }
530 
531 static void
apply_ancestors_messages_to_bn(FT_HANDLE t,FTNODE node,int childnum,ANCESTORS ancestors,const pivot_bounds & bounds,txn_gc_info * gc_info,bool * msgs_applied)532 apply_ancestors_messages_to_bn(
533     FT_HANDLE t,
534     FTNODE node,
535     int childnum,
536     ANCESTORS ancestors,
537     const pivot_bounds &bounds,
538     txn_gc_info *gc_info,
539     bool* msgs_applied
540     )
541 {
542     BASEMENTNODE curr_bn = BLB(node, childnum);
543     const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
544     for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
545         if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
546             paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
547             bnc_apply_messages_to_basement_node(
548                 t,
549                 curr_bn,
550                 curr_ancestors->node,
551                 curr_ancestors->childnum,
552                 curr_bounds,
553                 gc_info,
554                 msgs_applied
555                 );
556             // We don't want to check this ancestor node again if the
557             // next time we query it, the msn hasn't changed.
558             curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
559         }
560     }
561     // At this point, we know all the stale messages above this
562     // basement node have been applied, and any new messages will be
563     // fresh, so we don't need to look at stale messages for this
564     // basement node, unless it gets evicted (and this field becomes
565     // false when it's read in again).
566     curr_bn->stale_ancestor_messages_applied = true;
567 }
568 
569 void
toku_apply_ancestors_messages_to_node(FT_HANDLE t,FTNODE node,ANCESTORS ancestors,const pivot_bounds & bounds,bool * msgs_applied,int child_to_read)570 toku_apply_ancestors_messages_to_node (
571     FT_HANDLE t,
572     FTNODE node,
573     ANCESTORS ancestors,
574     const pivot_bounds &bounds,
575     bool* msgs_applied,
576     int child_to_read
577     )
578 // Effect:
579 //   Bring a leaf node up-to-date according to all the messages in the ancestors.
580 //   If the leaf node is already up-to-date then do nothing.
581 //   If the leaf node is not already up-to-date, then record the work done
582 //   for that leaf in each ancestor.
583 // Requires:
584 //   This is being called when pinning a leaf node for the query path.
585 //   The entire root-to-leaf path is pinned and appears in the ancestors list.
586 {
587     VERIFY_NODE(t, node);
588     paranoid_invariant(node->height == 0);
589 
590     TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
591     txn_manager_state txn_state_for_gc(txn_manager);
592 
593     TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
594     txn_gc_info gc_info(&txn_state_for_gc,
595                         oldest_referenced_xid_for_simple_gc,
596                         node->oldest_referenced_xid_known,
597                         true);
598     if (!node->dirty() && child_to_read >= 0) {
599         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
600         apply_ancestors_messages_to_bn(
601             t,
602             node,
603             child_to_read,
604             ancestors,
605             bounds,
606             &gc_info,
607             msgs_applied
608             );
609     }
610     else {
611         // know we are a leaf node
612         // An important invariant:
613         // We MUST bring every available basement node for a dirty node up to date.
614         // flushing on the cleaner thread depends on this. This invariant
615         // allows the cleaner thread to just pick an internal node and flush it
616         // as opposed to being forced to start from the root.
617         for (int i = 0; i < node->n_children; i++) {
618             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
619             apply_ancestors_messages_to_bn(
620                 t,
621                 node,
622                 i,
623                 ancestors,
624                 bounds,
625                 &gc_info,
626                 msgs_applied
627                 );
628         }
629     }
630     VERIFY_NODE(t, node);
631 }
632 
bn_needs_ancestors_messages(FT ft,FTNODE node,int childnum,const pivot_bounds & bounds,ANCESTORS ancestors,MSN * max_msn_applied)633 static bool bn_needs_ancestors_messages(
634     FT ft,
635     FTNODE node,
636     int childnum,
637     const pivot_bounds &bounds,
638     ANCESTORS ancestors,
639     MSN* max_msn_applied
640     )
641 {
642     BASEMENTNODE bn = BLB(node, childnum);
643     const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
644     bool needs_ancestors_messages = false;
645     for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
646         if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
647             paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
648             NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
649             if (bnc->broadcast_list.size() > 0) {
650                 needs_ancestors_messages = true;
651                 goto cleanup;
652             }
653             if (!bn->stale_ancestor_messages_applied) {
654                 uint32_t stale_lbi, stale_ube;
655                 find_bounds_within_message_tree(ft->cmp,
656                                                 bnc->stale_message_tree,
657                                                 &bnc->msg_buffer,
658                                                 curr_bounds,
659                                                 &stale_lbi,
660                                                 &stale_ube);
661                 if (stale_lbi < stale_ube) {
662                     needs_ancestors_messages = true;
663                     goto cleanup;
664                 }
665             }
666             uint32_t fresh_lbi, fresh_ube;
667             find_bounds_within_message_tree(ft->cmp,
668                                             bnc->fresh_message_tree,
669                                             &bnc->msg_buffer,
670                                             curr_bounds,
671                                             &fresh_lbi,
672                                             &fresh_ube);
673             if (fresh_lbi < fresh_ube) {
674                 needs_ancestors_messages = true;
675                 goto cleanup;
676             }
677             if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
678                 max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
679             }
680         }
681     }
682 cleanup:
683     return needs_ancestors_messages;
684 }
685 
toku_ft_leaf_needs_ancestors_messages(FT ft,FTNODE node,ANCESTORS ancestors,const pivot_bounds & bounds,MSN * const max_msn_in_path,int child_to_read)686 bool toku_ft_leaf_needs_ancestors_messages(
687     FT ft,
688     FTNODE node,
689     ANCESTORS ancestors,
690     const pivot_bounds &bounds,
691     MSN *const max_msn_in_path,
692     int child_to_read
693     )
694 // Effect: Determine whether there are messages in a node's ancestors
695 //  which must be applied to it.  These messages are in the correct
696 //  keyrange for any available basement nodes, and are in nodes with the
697 //  correct max_msn_applied_to_node_on_disk.
698 // Notes:
699 //  This is an approximate query.
700 // Output:
701 //  max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
702 //    ancestors.  This is used later to update basement nodes'
703 //    max_msn_applied values in case we don't do the full algorithm.
704 // Returns:
705 //  true if there may be some such messages
706 //  false only if there are definitely no such messages
707 // Rationale:
708 //  When we pin a node with a read lock, we want to quickly determine if
709 //  we should exchange it for a write lock in preparation for applying
710 //  messages.  If there are no messages, we don't need the write lock.
711 {
712     paranoid_invariant(node->height == 0);
713     bool needs_ancestors_messages = false;
714     // child_to_read may be -1 in test cases
715     if (!node->dirty() && child_to_read >= 0) {
716         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
717         needs_ancestors_messages = bn_needs_ancestors_messages(
718             ft,
719             node,
720             child_to_read,
721             bounds,
722             ancestors,
723             max_msn_in_path
724             );
725     }
726     else {
727         for (int i = 0; i < node->n_children; ++i) {
728             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
729             needs_ancestors_messages = bn_needs_ancestors_messages(
730                 ft,
731                 node,
732                 i,
733                 bounds,
734                 ancestors,
735                 max_msn_in_path
736                 );
737             if (needs_ancestors_messages) {
738                 goto cleanup;
739             }
740         }
741     }
742 cleanup:
743     return needs_ancestors_messages;
744 }
745 
toku_ft_bn_update_max_msn(FTNODE node,MSN max_msn_applied,int child_to_read)746 void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
747     invariant(node->height == 0);
748     if (!node->dirty() && child_to_read >= 0) {
749         paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
750         BASEMENTNODE bn = BLB(node, child_to_read);
751         if (max_msn_applied.msn > bn->max_msn_applied.msn) {
752             // see comment below
753             (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
754         }
755     }
756     else {
757         for (int i = 0; i < node->n_children; ++i) {
758             if (BP_STATE(node, i) != PT_AVAIL) { continue; }
759             BASEMENTNODE bn = BLB(node, i);
760             if (max_msn_applied.msn > bn->max_msn_applied.msn) {
761                 // This function runs in a shared access context, so to silence tools
762                 // like DRD, we use a CAS and ignore the result.
763                 // Any threads trying to update these basement nodes should be
764                 // updating them to the same thing (since they all have a read lock on
765                 // the same root-to-leaf path) so this is safe.
766                 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
767             }
768         }
769     }
770 }
771 
772 struct copy_to_stale_extra {
773     FT ft;
774     NONLEAF_CHILDINFO bnc;
775 };
776 
777 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
copy_to_stale(const int32_t & offset,const uint32_t UU (idx),struct copy_to_stale_extra * const extra)778 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
779 {
780     MSN msn;
781     DBT key;
782     extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
783     struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra(extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn);
784     int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
785     invariant_zero(r);
786     return 0;
787 }
788 
toku_ft_bnc_move_messages_to_stale(FT ft,NONLEAF_CHILDINFO bnc)789 void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) {
790     struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
791     int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
792     invariant_zero(r);
793     bnc->fresh_message_tree.delete_all_marked();
794 }
795 
toku_move_ftnode_messages_to_stale(FT ft,FTNODE node)796 void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
797     invariant(node->height > 0);
798     for (int i = 0; i < node->n_children; ++i) {
799         if (BP_STATE(node, i) != PT_AVAIL) {
800             continue;
801         }
802         NONLEAF_CHILDINFO bnc = BNC(node, i);
803         // We can't delete things out of the fresh tree inside the above
804         // procedures because we're still looking at the fresh tree.  Instead
805         // we have to move messages after we're done looking at it.
806         toku_ft_bnc_move_messages_to_stale(ft, bnc);
807     }
808 }
809 
810 //
811 // Balance // Availibility // Size
812 
813 struct rebalance_array_info {
814     uint32_t offset;
815     LEAFENTRY *le_array;
816     uint32_t *key_sizes_array;
817     const void **key_ptr_array;
fnrebalance_array_info818     static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le,
819            const uint32_t idx, struct rebalance_array_info *const ai) {
820         ai->le_array[idx+ai->offset] = le;
821         ai->key_sizes_array[idx+ai->offset] = keylen;
822         ai->key_ptr_array[idx+ai->offset] = key;
823         return 0;
824     }
825 };
826 
827 // There must still be at least one child
828 // Requires that all messages in buffers above have been applied.
829 // Because all messages above have been applied, setting msn of all new basements
830 // to max msn of existing basements is correct.  (There cannot be any messages in
831 // buffers above that still need to be applied.)
toku_ftnode_leaf_rebalance(FTNODE node,unsigned int basementnodesize)832 void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) {
833 
834     assert(node->height == 0);
835     assert(node->dirty());
836 
837     uint32_t num_orig_basements = node->n_children;
838     // Count number of leaf entries in this leaf (num_le).
839     uint32_t num_le = 0;
840     for (uint32_t i = 0; i < num_orig_basements; i++) {
841         num_le += BLB_DATA(node, i)->num_klpairs();
842     }
843 
844     uint32_t num_alloc = num_le ? num_le : 1;  // simplify logic below by always having at least one entry per array
845 
846     // Create an array of OMTVALUE's that store all the pointers to all the data.
847     // Each element in leafpointers is a pointer to a leaf.
848     toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc);
849     LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get());
850     leafpointers[0] = NULL;
851 
852     toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc);
853     const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get());
854     key_pointers[0] = NULL;
855 
856     toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc);
857     uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get());
858 
859     // Capture pointers to old mempools' buffers (so they can be destroyed)
860     toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements);
861     BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get());
862     old_bns[0] = NULL;
863 
864     uint32_t curr_le = 0;
865     for (uint32_t i = 0; i < num_orig_basements; i++) {
866         bn_data* bd = BLB_DATA(node, i);
867         struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers };
868         bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai);
869         curr_le += bd->num_klpairs();
870     }
871 
872     // Create an array that will store indexes of new pivots.
873     // Each element in new_pivots is the index of a pivot key.
874     // (Allocating num_le of them is overkill, but num_le is an upper bound.)
875     toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc);
876     uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get());
877     new_pivots[0] = 0;
878 
879     // Each element in le_sizes is the size of the leafentry pointed to by leafpointers.
880     toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc);
881     size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get());
882     le_sizes[0] = 0;
883 
884     // Create an array that will store the size of each basement.
885     // This is the sum of the leaf sizes of all the leaves in that basement.
886     // We don't know how many basements there will be, so we use num_le as the upper bound.
887 
888     // Sum of all le sizes in a single basement
889     toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc);
890     size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get());
891 
892     // Sum of all key sizes in a single basement
893     toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc);
894     size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get());
895 
896     // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les).
897     // Each entry is the number of leafentries in this basement.  (Again, num_le is overkill upper baound.)
898     toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc);
899     uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get());
900     num_les_this_bn[0] = 0;
901 
902     // Figure out the new pivots.
903     // We need the index of each pivot, and for each basement we need
904     // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement).
905     uint32_t curr_pivot = 0;
906     uint32_t num_le_in_curr_bn = 0;
907     uint32_t bn_size_so_far = 0;
908     for (uint32_t i = 0; i < num_le; i++) {
909         uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]);
910         le_sizes[i] = curr_le_size;
911         if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) {
912             // cap off the current basement node to end with the element before i
913             new_pivots[curr_pivot] = i-1;
914             curr_pivot++;
915             num_le_in_curr_bn = 0;
916             bn_size_so_far = 0;
917         }
918         num_le_in_curr_bn++;
919         num_les_this_bn[curr_pivot] = num_le_in_curr_bn;
920         bn_le_sizes[curr_pivot] += curr_le_size;
921         bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i];  // uint32_t le_offset
922         bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i];
923     }
924     // curr_pivot is now the total number of pivot keys in the leaf node
925     int num_pivots   = curr_pivot;
926     int num_children = num_pivots + 1;
927 
928     // now we need to fill in the new basement nodes and pivots
929 
930     // TODO: (Zardosht) this is an ugly thing right now
931     // Need to figure out how to properly deal with seqinsert.
932     // I am not happy with how this is being
933     // handled with basement nodes
934     uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1);
935 
936     // choose the max msn applied to any basement as the max msn applied to all new basements
937     MSN max_msn = ZERO_MSN;
938     for (uint32_t i = 0; i < num_orig_basements; i++) {
939         MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
940         max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
941     }
942     // remove the basement node in the node, we've saved a copy
943     for (uint32_t i = 0; i < num_orig_basements; i++) {
944         // save a reference to the old basement nodes
945         // we will need them to ensure that the memory
946         // stays intact
947         old_bns[i] = toku_detach_bn(node, i);
948     }
949     // Now destroy the old basements, but do not destroy leaves
950     toku_destroy_ftnode_internals(node);
951 
952     // now reallocate pieces and start filling them in
953     invariant(num_children > 0);
954 
955     node->n_children = num_children;
956     XCALLOC_N(num_children, node->bp);             // allocate pointers to basements (bp)
957     for (int i = 0; i < num_children; i++) {
958         set_BLB(node, i, toku_create_empty_bn());  // allocate empty basements and set bp pointers
959     }
960 
961     // now we start to fill in the data
962 
963     // first the pivots
964     toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT));
965     DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get());
966     for (int i = 0; i < num_pivots; i++) {
967         uint32_t size = key_sizes[new_pivots[i]];
968         const void *key = key_pointers[new_pivots[i]];
969         toku_fill_dbt(&pivotkeys[i], key, size);
970     }
971     node->pivotkeys.create_from_dbts(pivotkeys, num_pivots);
972 
973     uint32_t baseindex_this_bn = 0;
974     // now the basement nodes
975     for (int i = 0; i < num_children; i++) {
976         // put back seqinsert
977         BLB_SEQINSERT(node, i) = tmp_seqinsert;
978 
979         // create start (inclusive) and end (exclusive) boundaries for data of basement node
980         uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1;               // index of first leaf in basement
981         uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1;     // index of first leaf in next basement
982         uint32_t num_in_bn = curr_end - curr_start;                         // number of leaves in this basement
983 
984         // create indexes for new basement
985         invariant(baseindex_this_bn == curr_start);
986         uint32_t num_les_to_copy = num_les_this_bn[i];
987         invariant(num_les_to_copy == num_in_bn);
988 
989         bn_data* bd = BLB_DATA(node, i);
990         bd->set_contents_as_clone_of_sorted_array(
991             num_les_to_copy,
992             &key_pointers[baseindex_this_bn],
993             &key_sizes[baseindex_this_bn],
994             &leafpointers[baseindex_this_bn],
995             &le_sizes[baseindex_this_bn],
996             bn_key_sizes[i],  // Total key sizes
997             bn_le_sizes[i]  // total le sizes
998             );
999 
1000         BP_STATE(node,i) = PT_AVAIL;
1001         BP_TOUCH_CLOCK(node,i);
1002         BLB_MAX_MSN_APPLIED(node,i) = max_msn;
1003         baseindex_this_bn += num_les_to_copy;  // set to index of next bn
1004     }
1005     node->max_msn_applied_to_node_on_disk = max_msn;
1006 
1007     // destroy buffers of old mempools
1008     for (uint32_t i = 0; i < num_orig_basements; i++) {
1009         destroy_basement_node(old_bns[i]);
1010     }
1011 }
1012 
toku_ftnode_fully_in_memory(FTNODE node)1013 bool toku_ftnode_fully_in_memory(FTNODE node) {
1014     for (int i = 0; i < node->n_children; i++) {
1015         if (BP_STATE(node,i) != PT_AVAIL) {
1016             return false;
1017         }
1018     }
1019     return true;
1020 }
1021 
toku_ftnode_assert_fully_in_memory(FTNODE UU (node))1022 void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) {
1023     paranoid_invariant(toku_ftnode_fully_in_memory(node));
1024 }
1025 
toku_ftnode_leaf_num_entries(FTNODE node)1026 uint32_t toku_ftnode_leaf_num_entries(FTNODE node) {
1027     toku_ftnode_assert_fully_in_memory(node);
1028     uint32_t num_entries = 0;
1029     for (int i = 0; i < node->n_children; i++) {
1030         num_entries += BLB_DATA(node, i)->num_klpairs();
1031     }
1032     return num_entries;
1033 }
1034 
toku_ftnode_get_leaf_reactivity(FTNODE node,uint32_t nodesize)1035 enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) {
1036     enum reactivity re = RE_STABLE;
1037     toku_ftnode_assert_fully_in_memory(node);
1038     paranoid_invariant(node->height==0);
1039     unsigned int size = toku_serialize_ftnode_size(node);
1040     if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) {
1041         re = RE_FISSIBLE;
1042     } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
1043         re = RE_FUSIBLE;
1044     }
1045     return re;
1046 }
1047 
toku_ftnode_get_nonleaf_reactivity(FTNODE node,unsigned int fanout)1048 enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) {
1049     paranoid_invariant(node->height > 0);
1050     int n_children = node->n_children;
1051     if (n_children > (int) fanout) {
1052         return RE_FISSIBLE;
1053     }
1054     if (n_children * 4 < (int) fanout) {
1055         return RE_FUSIBLE;
1056     }
1057     return RE_STABLE;
1058 }
1059 
toku_ftnode_get_reactivity(FT ft,FTNODE node)1060 enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) {
1061     toku_ftnode_assert_fully_in_memory(node);
1062     if (node->height == 0) {
1063         return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize);
1064     } else {
1065         return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout);
1066     }
1067 }
1068 
toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc)1069 unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) {
1070     return bnc->msg_buffer.buffer_size_in_use();
1071 }
1072 
1073 // Return true if the size of the buffers plus the amount of work done is large enough.
1074 // Return false if there is nothing to be flushed (the buffers empty).
toku_ftnode_nonleaf_is_gorged(FTNODE node,uint32_t nodesize)1075 bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) {
1076     uint64_t size = toku_serialize_ftnode_size(node);
1077 
1078     bool buffers_are_empty = true;
1079     toku_ftnode_assert_fully_in_memory(node);
1080     //
1081     // the nonleaf node is gorged if the following holds true:
1082     //  - the buffers are non-empty
1083     //  - the total workdone by the buffers PLUS the size of the buffers
1084     //     is greater than nodesize (which as of Maxwell should be
1085     //     4MB)
1086     //
1087     paranoid_invariant(node->height > 0);
1088     for (int child = 0; child < node->n_children; ++child) {
1089         size += BP_WORKDONE(node, child);
1090     }
1091     for (int child = 0; child < node->n_children; ++child) {
1092         if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
1093             buffers_are_empty = false;
1094             break;
1095         }
1096     }
1097     return ((size > nodesize)
1098             &&
1099             (!buffers_are_empty));
1100 }
1101 
toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)1102 int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) {
1103     return bnc->msg_buffer.num_entries();
1104 }
1105 
1106 // how much memory does this child buffer consume?
toku_bnc_memory_size(NONLEAF_CHILDINFO bnc)1107 long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) {
1108     return (sizeof(*bnc) +
1109             bnc->msg_buffer.memory_footprint() +
1110             bnc->fresh_message_tree.memory_size() +
1111             bnc->stale_message_tree.memory_size() +
1112             bnc->broadcast_list.memory_size());
1113 }
1114 
1115 // how much memory in this child buffer holds useful data?
1116 // originally created solely for use by test program(s).
toku_bnc_memory_used(NONLEAF_CHILDINFO bnc)1117 long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) {
1118     return (sizeof(*bnc) +
1119             bnc->msg_buffer.memory_size_in_use() +
1120             bnc->fresh_message_tree.memory_size() +
1121             bnc->stale_message_tree.memory_size() +
1122             bnc->broadcast_list.memory_size());
1123 }
1124 
1125 //
1126 // Garbage collection
1127 // Message injection
1128 // Message application
1129 //
1130 
1131 // Used only by test programs: append a child node to a parent node
toku_ft_nonleaf_append_child(FTNODE node,FTNODE child,const DBT * pivotkey)1132 void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1133     int childnum = node->n_children;
1134     node->n_children++;
1135     REALLOC_N(node->n_children, node->bp);
1136     BP_BLOCKNUM(node,childnum) = child->blocknum;
1137     BP_STATE(node,childnum) = PT_AVAIL;
1138     BP_WORKDONE(node, childnum)   = 0;
1139     set_BNC(node, childnum, toku_create_empty_nl());
1140     if (pivotkey) {
1141         invariant(childnum > 0);
1142         node->pivotkeys.insert_at(pivotkey, childnum - 1);
1143     }
1144     node->set_dirty();
1145 }
1146 
1147 void
toku_ft_bn_apply_msg_once(BASEMENTNODE bn,const ft_msg & msg,uint32_t idx,uint32_t le_keylen,LEAFENTRY le,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1148 toku_ft_bn_apply_msg_once (
1149     BASEMENTNODE bn,
1150     const ft_msg &msg,
1151     uint32_t idx,
1152     uint32_t le_keylen,
1153     LEAFENTRY le,
1154     txn_gc_info *gc_info,
1155     uint64_t *workdone,
1156     STAT64INFO stats_to_update,
1157     int64_t *logical_rows_delta
1158     )
1159 // Effect: Apply msg to leafentry (msn is ignored)
1160 //         Calculate work done by message on leafentry and add it to caller's workdone counter.
1161 //   idx is the location where it goes
1162 //   le is old leafentry
1163 {
1164     size_t newsize=0, oldsize=0, workdone_this_le=0;
1165     LEAFENTRY new_le=0;
1166     // how many bytes of user data (not including overhead) were added or
1167     // deleted from this row
1168     int64_t numbytes_delta = 0;
1169     // will be +1 or -1 or 0 (if row was added or deleted or not)
1170     int64_t numrows_delta = 0;
1171     // will be +1, -1 or 0 if a message that was accounted for logically has
1172     // changed in meaning such as an insert changed to an update or a delete
1173     // changed to a noop
1174     int64_t logical_rows_delta_le = 0;
1175     uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
1176     if (le) {
1177         oldsize = leafentry_memsize(le) + key_storage_size;
1178     }
1179 
1180     // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
1181     // to allocate more space. That means le is guaranteed to not cause a
1182     // sigsegv but it may point to a mempool that is no longer in use.
1183     // We'll have to release the old mempool later.
1184     logical_rows_delta_le = toku_le_apply_msg(
1185         msg,
1186         le,
1187         &bn->data_buffer,
1188         idx,
1189         le_keylen,
1190         gc_info,
1191         &new_le,
1192         &numbytes_delta);
1193 
1194     // at this point, we cannot trust cmd->u.id.key to be valid.
1195     // The dmt may have realloced its mempool and freed the one containing key.
1196 
1197     newsize = new_le ? (leafentry_memsize(new_le) +  + key_storage_size) : 0;
1198     if (le && new_le) {
1199         workdone_this_le = (oldsize > newsize ? oldsize : newsize);  // work done is max of le size before and after message application
1200 
1201     } else {           // we did not just replace a row, so ...
1202         if (le) {
1203             //            ... we just deleted a row ...
1204             workdone_this_le = oldsize;
1205             numrows_delta = -1;
1206         }
1207         if (new_le) {
1208             //            ... or we just added a row
1209             workdone_this_le = newsize;
1210             numrows_delta = 1;
1211         }
1212     }
1213     if (FT_LIKELY(workdone != NULL)) {  // test programs may call with NULL
1214         *workdone += workdone_this_le;
1215     }
1216 
1217     if (FT_LIKELY(logical_rows_delta != NULL)) {
1218         *logical_rows_delta += logical_rows_delta_le;
1219     }
1220     // now update stat64 statistics
1221     bn->stat64_delta.numrows  += numrows_delta;
1222     bn->stat64_delta.numbytes += numbytes_delta;
1223     // the only reason stats_to_update may be null is for tests
1224     if (FT_LIKELY(stats_to_update != NULL)) {
1225         stats_to_update->numrows += numrows_delta;
1226         stats_to_update->numbytes += numbytes_delta;
1227     }
1228 }
1229 
1230 static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number.  We want to make sure that the user actually passes us the setval_extra_s that we passed in.
1231 struct setval_extra_s {
1232     uint32_t  tag;
1233     bool did_set_val;
1234     // any error code that setval_fun wants to return goes here.
1235     int setval_r;
1236     // need arguments for toku_ft_bn_apply_msg_once
1237     BASEMENTNODE bn;
1238     // captured from original message, not currently used
1239     MSN msn;
1240     XIDS xids;
1241     const DBT* key;
1242     uint32_t idx;
1243     uint32_t le_keylen;
1244     LEAFENTRY le;
1245     txn_gc_info* gc_info;
1246     uint64_t* workdone;  // set by toku_ft_bn_apply_msg_once()
1247     STAT64INFO stats_to_update;
1248     int64_t* logical_rows_delta;
1249 };
1250 
1251 /*
1252  * If new_val == NULL, we send a delete message instead of an insert.
1253  * This happens here instead of in do_delete() for consistency.
1254  * setval_fun() is called from handlerton, passing in svextra_v
1255  * from setval_extra_s input arg to ft->update_fun().
1256  */
setval_fun(const DBT * new_val,void * svextra_v)1257 static void setval_fun (const DBT *new_val, void *svextra_v) {
1258     struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1259     paranoid_invariant(svextra->tag==setval_tag);
1260     paranoid_invariant(!svextra->did_set_val);
1261     svextra->did_set_val = true;
1262 
1263     {
1264         // can't leave scope until toku_ft_bn_apply_msg_once if
1265         // this is a delete
1266         DBT val;
1267         ft_msg msg(
1268             svextra->key,
1269             new_val ? new_val : toku_init_dbt(&val),
1270             new_val ? FT_INSERT : FT_DELETE_ANY,
1271             svextra->msn,
1272             svextra->xids);
1273         toku_ft_bn_apply_msg_once(
1274             svextra->bn,
1275             msg,
1276             svextra->idx,
1277             svextra->le_keylen,
1278             svextra->le,
1279             svextra->gc_info,
1280             svextra->workdone,
1281             svextra->stats_to_update,
1282             svextra->logical_rows_delta);
1283         svextra->setval_r = 0;
1284     }
1285 }
1286 
1287 // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
1288 // do_update()), so capturing the msn in the setval_extra_s is not strictly
1289 // required. The alternative would be to put a dummy msn in the messages
1290 // created by setval_fun(), but preserving the original msn seems cleaner and
1291 // it preserves accountability at a lower layer.
do_update(ft_update_func update_fun,const DESCRIPTOR_S * desc,BASEMENTNODE bn,const ft_msg & msg,uint32_t idx,LEAFENTRY le,void * keydata,uint32_t keylen,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1292 static int do_update(
1293     ft_update_func update_fun,
1294     const DESCRIPTOR_S* desc,
1295     BASEMENTNODE bn,
1296     const ft_msg &msg,
1297     uint32_t idx,
1298     LEAFENTRY le,
1299     void* keydata,
1300     uint32_t keylen,
1301     txn_gc_info* gc_info,
1302     uint64_t* workdone,
1303     STAT64INFO stats_to_update,
1304     int64_t* logical_rows_delta) {
1305 
1306     LEAFENTRY le_for_update;
1307     DBT key;
1308     const DBT *keyp;
1309     const DBT *update_function_extra;
1310     DBT vdbt;
1311     const DBT *vdbtp;
1312 
1313     // the location of data depends whether this is a regular or
1314     // broadcast update
1315     if (msg.type() == FT_UPDATE) {
1316         // key is passed in with command (should be same as from le)
1317         // update function extra is passed in with command
1318         keyp = msg.kdbt();
1319         update_function_extra = msg.vdbt();
1320     } else {
1321         invariant(msg.type() == FT_UPDATE_BROADCAST_ALL);
1322         // key is not passed in with broadcast, it comes from le
1323         // update function extra is passed in with command
1324         paranoid_invariant(le);  // for broadcast updates, we just hit all leafentries
1325                      // so this cannot be null
1326         paranoid_invariant(keydata);
1327         paranoid_invariant(keylen);
1328         paranoid_invariant(msg.kdbt()->size == 0);
1329         keyp = toku_fill_dbt(&key, keydata, keylen);
1330         update_function_extra = msg.vdbt();
1331     }
1332     toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL);
1333 
1334     if (le && !le_latest_is_del(le)) {
1335         // if the latest val exists, use it, and we'll use the leafentry later
1336         uint32_t vallen;
1337         void *valp = le_latest_val_and_len(le, &vallen);
1338         vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
1339     } else {
1340         // otherwise, the val and leafentry are both going to be null
1341         vdbtp = NULL;
1342     }
1343     le_for_update = le;
1344 
1345     struct setval_extra_s setval_extra = {
1346         setval_tag,
1347         false,
1348         0,
1349         bn,
1350         msg.msn(),
1351         msg.xids(),
1352         keyp,
1353         idx,
1354         keylen,
1355         le_for_update,
1356         gc_info,
1357         workdone,
1358         stats_to_update,
1359         logical_rows_delta
1360     };
1361     // call handlerton's ft->update_fun(), which passes setval_extra
1362     // to setval_fun()
1363     FAKE_DB(db, desc);
1364     int r = update_fun(
1365         &db,
1366         keyp,
1367         vdbtp,
1368         update_function_extra,
1369         setval_fun,
1370         &setval_extra);
1371 
1372     if (r == 0) { r = setval_extra.setval_r; }
1373     return r;
1374 }
1375 
1376 // Should be renamed as something like "apply_msg_to_basement()."
toku_ft_bn_apply_msg(const toku::comparator & cmp,ft_update_func update_fun,BASEMENTNODE bn,const ft_msg & msg,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1377 void toku_ft_bn_apply_msg(
1378     const toku::comparator& cmp,
1379     ft_update_func update_fun,
1380     BASEMENTNODE bn,
1381     const ft_msg& msg,
1382     txn_gc_info* gc_info,
1383     uint64_t* workdone,
1384     STAT64INFO stats_to_update,
1385     int64_t* logical_rows_delta) {
1386 // Effect:
1387 //   Put a msg into a leaf.
1388 //   Calculate work done by message on leafnode and add it to caller's
1389 //   workdone counter.
1390 // The leaf could end up "too big" or "too small".  The caller must fix that up.
1391     LEAFENTRY storeddata;
1392     void* key = NULL;
1393     uint32_t keylen = 0;
1394 
1395     uint32_t num_klpairs;
1396     int r;
1397     struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt());
1398 
1399     unsigned int doing_seqinsert = bn->seqinsert;
1400     bn->seqinsert = 0;
1401 
1402     switch (msg.type()) {
1403     case FT_INSERT_NO_OVERWRITE:
1404     case FT_INSERT: {
1405         uint32_t idx;
1406         if (doing_seqinsert) {
1407             idx = bn->data_buffer.num_klpairs();
1408             DBT kdbt;
1409             r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data);
1410             if (r != 0) goto fz;
1411             int c = toku_msg_leafval_heaviside(kdbt, be);
1412             if (c >= 0) goto fz;
1413             r = DB_NOTFOUND;
1414         } else {
1415         fz:
1416             r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1417                 be,
1418                 &storeddata,
1419                 &key,
1420                 &keylen,
1421                 &idx
1422                 );
1423         }
1424         if (r==DB_NOTFOUND) {
1425             storeddata = 0;
1426         } else {
1427             assert_zero(r);
1428         }
1429         toku_ft_bn_apply_msg_once(
1430             bn,
1431             msg,
1432             idx,
1433             keylen,
1434             storeddata,
1435             gc_info,
1436             workdone,
1437             stats_to_update,
1438             logical_rows_delta);
1439 
1440         // if the insertion point is within a window of the right edge of
1441         // the leaf then it is sequential
1442         // window = min(32, number of leaf entries/16)
1443         {
1444             uint32_t s = bn->data_buffer.num_klpairs();
1445             uint32_t w = s / 16;
1446             if (w == 0) w = 1;
1447             if (w > 32) w = 32;
1448 
1449             // within the window?
1450             if (s - idx <= w)
1451                 bn->seqinsert = doing_seqinsert + 1;
1452         }
1453         break;
1454     }
1455     case FT_DELETE_ANY:
1456     case FT_ABORT_ANY:
1457     case FT_COMMIT_ANY: {
1458         uint32_t idx;
1459         // Apply to all the matches
1460 
1461         r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1462             be,
1463             &storeddata,
1464             &key,
1465             &keylen,
1466             &idx);
1467         if (r == DB_NOTFOUND) break;
1468         assert_zero(r);
1469         toku_ft_bn_apply_msg_once(
1470             bn,
1471             msg,
1472             idx,
1473             keylen,
1474             storeddata,
1475             gc_info,
1476             workdone,
1477             stats_to_update,
1478             logical_rows_delta);
1479         break;
1480     }
1481     case FT_OPTIMIZE_FOR_UPGRADE:
1482         // fall through so that optimize_for_upgrade performs rest of the optimize logic
1483     case FT_COMMIT_BROADCAST_ALL:
1484     case FT_OPTIMIZE:
1485         // Apply to all leafentries
1486         num_klpairs = bn->data_buffer.num_klpairs();
1487         for (uint32_t idx = 0; idx < num_klpairs; ) {
1488             void* curr_keyp = NULL;
1489             uint32_t curr_keylen = 0;
1490             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1491             assert_zero(r);
1492             int deleted = 0;
1493             if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1494                 // message application code needs a key in order to determine
1495                 // how much work was done by this message. since this is a
1496                 // broadcast message, we have to create a new message whose
1497                 // key is the current le's key.
1498                 DBT curr_keydbt;
1499                 ft_msg curr_msg(
1500                     toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1501                     msg.vdbt(),
1502                     msg.type(),
1503                     msg.msn(),
1504                     msg.xids());
1505                 toku_ft_bn_apply_msg_once(
1506                     bn,
1507                     curr_msg,
1508                     idx,
1509                     curr_keylen,
1510                     storeddata,
1511                     gc_info,
1512                     workdone,
1513                     stats_to_update,
1514                     logical_rows_delta);
1515                 // at this point, we cannot trust msg.kdbt to be valid.
1516                 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1517                 if (new_dmt_size != num_klpairs) {
1518                     paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1519                     //Item was deleted.
1520                     deleted = 1;
1521                 }
1522             }
1523             if (deleted)
1524                 num_klpairs--;
1525             else
1526                 idx++;
1527         }
1528         paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1529 
1530         break;
1531     case FT_COMMIT_BROADCAST_TXN:
1532     case FT_ABORT_BROADCAST_TXN:
1533         // Apply to all leafentries if txn is represented
1534         num_klpairs = bn->data_buffer.num_klpairs();
1535         for (uint32_t idx = 0; idx < num_klpairs; ) {
1536             void* curr_keyp = NULL;
1537             uint32_t curr_keylen = 0;
1538             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1539             assert_zero(r);
1540             int deleted = 0;
1541             if (le_has_xids(storeddata, msg.xids())) {
1542                 // message application code needs a key in order to determine
1543                 // how much work was done by this message. since this is a
1544                 // broadcast message, we have to create a new message whose key
1545                 // is the current le's key.
1546                 DBT curr_keydbt;
1547                 ft_msg curr_msg(
1548                     toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1549                     msg.vdbt(),
1550                     msg.type(),
1551                     msg.msn(),
1552                     msg.xids());
1553                 toku_ft_bn_apply_msg_once(
1554                     bn,
1555                     curr_msg,
1556                     idx,
1557                     curr_keylen,
1558                     storeddata,
1559                     gc_info,
1560                     workdone,
1561                     stats_to_update,
1562                     logical_rows_delta);
1563                 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1564                 if (new_dmt_size != num_klpairs) {
1565                     paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1566                     //Item was deleted.
1567                     deleted = 1;
1568                 }
1569             }
1570             if (deleted)
1571                 num_klpairs--;
1572             else
1573                 idx++;
1574         }
1575         paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1576 
1577         break;
1578     case FT_UPDATE: {
1579         uint32_t idx;
1580         r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1581             be,
1582             &storeddata,
1583             &key,
1584             &keylen,
1585             &idx
1586             );
1587         if (r==DB_NOTFOUND) {
1588             {
1589                 //Point to msg's copy of the key so we don't worry about le being freed
1590                 //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled
1591                 key = msg.kdbt()->data;
1592                 keylen = msg.kdbt()->size;
1593             }
1594             r = do_update(
1595                 update_fun,
1596                 cmp.get_descriptor(),
1597                 bn,
1598                 msg,
1599                 idx,
1600                 NULL,
1601                 NULL,
1602                 0,
1603                 gc_info,
1604                 workdone,
1605                 stats_to_update,
1606                 logical_rows_delta);
1607         } else if (r==0) {
1608             r = do_update(
1609                 update_fun,
1610                 cmp.get_descriptor(),
1611                 bn,
1612                 msg,
1613                 idx,
1614                 storeddata,
1615                 key,
1616                 keylen,
1617                 gc_info,
1618                 workdone,
1619                 stats_to_update,
1620                 logical_rows_delta);
1621         } // otherwise, a worse error, just return it
1622         break;
1623     }
1624     case FT_UPDATE_BROADCAST_ALL: {
1625         // apply to all leafentries.
1626         uint32_t idx = 0;
1627         uint32_t num_leafentries_before;
1628         // This is used to avoid having the logical row count changed on apply
1629         // of this message since it will return a negative number of the number
1630         // of leaf entries visited and cause the ft header value to go to 0;
1631         // This message will not change the number of rows, so just use the
1632         // bogus value.
1633         int64_t temp_logical_rows_delta = 0;
1634         while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1635             void* curr_key = nullptr;
1636             uint32_t curr_keylen = 0;
1637             r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key);
1638             assert_zero(r);
1639 
1640             //TODO: 46 replace this with something better than cloning key
1641             // TODO: (Zardosht) This may be unnecessary now, due to how the key
1642             // is handled in the bndata. Investigate and determine
1643             char clone_mem[curr_keylen];  // only lasts one loop, alloca would overflow (end of function)
1644             memcpy((void*)clone_mem, curr_key, curr_keylen);
1645             curr_key = (void*)clone_mem;
1646 
1647             // This is broken below. Have a compilation error checked
1648             // in as a reminder
1649             r = do_update(
1650                 update_fun,
1651                 cmp.get_descriptor(),
1652                 bn,
1653                 msg,
1654                 idx,
1655                 storeddata,
1656                 curr_key,
1657                 curr_keylen,
1658                 gc_info,
1659                 workdone,
1660                 stats_to_update,
1661                 &temp_logical_rows_delta);
1662             assert_zero(r);
1663 
1664             if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1665                 // we didn't delete something, so increment the index.
1666                 idx++;
1667             }
1668         }
1669         break;
1670     }
1671     case FT_NONE: break; // don't do anything
1672     }
1673 
1674     return;
1675 }
1676 
1677 static inline int
key_msn_cmp(const DBT * a,const DBT * b,const MSN amsn,const MSN bmsn,const toku::comparator & cmp)1678 key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) {
1679     int r = cmp(a, b);
1680     if (r == 0) {
1681         if (amsn.msn > bmsn.msn) {
1682             r = +1;
1683         } else if (amsn.msn < bmsn.msn) {
1684             r = -1;
1685         } else {
1686             r = 0;
1687         }
1688     }
1689     return r;
1690 }
1691 
toku_msg_buffer_key_msn_heaviside(const int32_t & offset,const struct toku_msg_buffer_key_msn_heaviside_extra & extra)1692 int toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra) {
1693     MSN query_msn;
1694     DBT query_key;
1695     extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
1696     return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp);
1697 }
1698 
toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra & extra,const int32_t & ao,const int32_t & bo)1699 int toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) {
1700     MSN amsn, bmsn;
1701     DBT akey, bkey;
1702     extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
1703     extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
1704     return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp);
1705 }
1706 
1707 // Effect: Enqueue the message represented by the parameters into the
1708 //   bnc's buffer, and put it in either the fresh or stale message tree,
1709 //   or the broadcast list.
bnc_insert_msg(NONLEAF_CHILDINFO bnc,const ft_msg & msg,bool is_fresh,const toku::comparator & cmp)1710 static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) {
1711     int r = 0;
1712     int32_t offset;
1713     bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
1714     enum ft_msg_type type = msg.type();
1715     if (ft_msg_type_applies_once(type)) {
1716         DBT key;
1717         toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size);
1718         struct toku_msg_buffer_key_msn_heaviside_extra extra(cmp, &bnc->msg_buffer, &key, msg.msn());
1719         if (is_fresh) {
1720             r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1721             assert_zero(r);
1722         } else {
1723             r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1724             assert_zero(r);
1725         }
1726     } else {
1727         invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
1728         const uint32_t idx = bnc->broadcast_list.size();
1729         r = bnc->broadcast_list.insert_at(offset, idx);
1730         assert_zero(r);
1731     }
1732 }
1733 
1734 // This is only exported for tests.
toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc,const void * key,uint32_t keylen,const void * data,uint32_t datalen,enum ft_msg_type type,MSN msn,XIDS xids,bool is_fresh,const toku::comparator & cmp)1735 void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp)
1736 {
1737     DBT k, v;
1738     ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids);
1739     bnc_insert_msg(bnc, msg, is_fresh, cmp);
1740 }
1741 
1742 // append a msg to a nonleaf node's child buffer
ft_append_msg_to_child_buffer(const toku::comparator & cmp,FTNODE node,int childnum,const ft_msg & msg,bool is_fresh)1743 static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node,
1744                                           int childnum, const ft_msg &msg, bool is_fresh) {
1745     paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
1746     bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp);
1747     node->set_dirty();
1748 }
1749 
1750 // This is only exported for tests.
toku_ft_append_to_child_buffer(const toku::comparator & cmp,FTNODE node,int childnum,enum ft_msg_type type,MSN msn,XIDS xids,bool is_fresh,const DBT * key,const DBT * val)1751 void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1752     ft_msg msg(key, val, type, msn, xids);
1753     ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1754 }
1755 
ft_nonleaf_msg_once_to_child(const toku::comparator & cmp,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1756 static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1757 // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint.  So now we are just putting it in the buffer here.
1758 // Also we don't worry about the node getting overfull here.  It's the caller's problem.
1759 {
1760     unsigned int childnum = (target_childnum >= 0
1761                              ? target_childnum
1762                              : toku_ftnode_which_child(node, msg.kdbt(), cmp));
1763     ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1764     NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1765     bnc->flow[0] += flow_deltas[0];
1766     bnc->flow[1] += flow_deltas[1];
1767 }
1768 
1769 // TODO: Remove me, I'm boring.
ft_compare_pivot(const toku::comparator & cmp,const DBT * key,const DBT * pivot)1770 static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) {
1771     return cmp(key, pivot);
1772 }
1773 
1774 /* Find the leftmost child that may contain the key.
1775  * If the key exists it will be in the child whose number
1776  * is the return value of this function.
1777  */
toku_ftnode_which_child(FTNODE node,const DBT * k,const toku::comparator & cmp)1778 int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1779     // a funny case of no pivots
1780     if (node->n_children <= 1) return 0;
1781 
1782     DBT pivot;
1783 
1784     // check the last key to optimize seq insertions
1785     int n = node->n_children-1;
1786     int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
1787     if (c > 0) return n;
1788 
1789     // binary search the pivots
1790     int lo = 0;
1791     int hi = n-1; // skip the last one, we checked it above
1792     int mi;
1793     while (lo < hi) {
1794         mi = (lo + hi) / 2;
1795         c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1796         if (c > 0) {
1797             lo = mi+1;
1798             continue;
1799         }
1800         if (c < 0) {
1801             hi = mi;
1802             continue;
1803         }
1804         return mi;
1805     }
1806     return lo;
1807 }
1808 
1809 // Used for HOT.
toku_ftnode_hot_next_child(FTNODE node,const DBT * k,const toku::comparator & cmp)1810 int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1811     DBT pivot;
1812     int low = 0;
1813     int hi = node->n_children - 1;
1814     int mi;
1815     while (low < hi) {
1816         mi = (low + hi) / 2;
1817         int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1818         if (r > 0) {
1819             low = mi + 1;
1820         } else if (r < 0) {
1821             hi = mi;
1822         } else {
1823             // if they were exactly equal, then we want the sub-tree under
1824             // the next pivot.
1825             return mi + 1;
1826         }
1827     }
1828     invariant(low == hi);
1829     return low;
1830 }
1831 
toku_ftnode_save_ct_pair(CACHEKEY UU (key),void * value_data,PAIR p)1832 void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
1833     FTNODE CAST_FROM_VOIDP(node, value_data);
1834     node->ct_pair = p;
1835 }
1836 
1837 static void
ft_nonleaf_msg_all(const toku::comparator & cmp,FTNODE node,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1838 ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1839 // Effect: Put the message into a nonleaf node.  We put it into all children, possibly causing the children to become reactive.
1840 //  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
1841 //  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
1842 {
1843     for (int i = 0; i < node->n_children; i++) {
1844         ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas);
1845     }
1846 }
1847 
1848 static void
ft_nonleaf_put_msg(const toku::comparator & cmp,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1849 ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1850 // Effect: Put the message into a nonleaf node.  We may put it into a child, possibly causing the child to become reactive.
1851 //  We don't do the splitting and merging.  That's up to the caller after doing all the puts it wants to do.
1852 //  The re_array[i] gets set to the reactivity of any modified child i.         (And there may be several such children.)
1853 //
1854 {
1855 
1856     //
1857     // see comments in toku_ft_leaf_apply_msg
1858     // to understand why we handle setting
1859     // node->max_msn_applied_to_node_on_disk here,
1860     // and don't do it in toku_ftnode_put_msg
1861     //
1862     MSN msg_msn = msg.msn();
1863     invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
1864     node->max_msn_applied_to_node_on_disk = msg_msn;
1865 
1866     if (ft_msg_type_applies_once(msg.type())) {
1867         ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
1868     } else if (ft_msg_type_applies_all(msg.type())) {
1869         ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas);
1870     } else {
1871         paranoid_invariant(ft_msg_type_does_nothing(msg.type()));
1872     }
1873 }
1874 
1875 // Garbage collect one leaf entry.
1876 static void
ft_basement_node_gc_once(BASEMENTNODE bn,uint32_t index,void * keyp,uint32_t keylen,LEAFENTRY leaf_entry,txn_gc_info * gc_info,STAT64INFO_S * delta)1877 ft_basement_node_gc_once(BASEMENTNODE bn,
1878                           uint32_t index,
1879                           void* keyp,
1880                           uint32_t keylen,
1881                           LEAFENTRY leaf_entry,
1882                           txn_gc_info *gc_info,
1883                           STAT64INFO_S * delta)
1884 {
1885     paranoid_invariant(leaf_entry);
1886 
1887     // Don't run garbage collection on non-mvcc leaf entries.
1888     if (leaf_entry->type != LE_MVCC) {
1889         goto exit;
1890     }
1891 
1892     // Don't run garbage collection if this leafentry decides it's not worth it.
1893     if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
1894         goto exit;
1895     }
1896 
1897     LEAFENTRY new_leaf_entry;
1898     new_leaf_entry = NULL;
1899 
1900     // The mempool doesn't free itself.  When it allocates new memory,
1901     // this pointer will be set to the older memory that must now be
1902     // freed.
1903     void * maybe_free;
1904     maybe_free = NULL;
1905 
1906     // These will represent the number of bytes and rows changed as
1907     // part of the garbage collection.
1908     int64_t numbytes_delta;
1909     int64_t numrows_delta;
1910     toku_le_garbage_collect(leaf_entry,
1911                             &bn->data_buffer,
1912                             index,
1913                             keyp,
1914                             keylen,
1915                             gc_info,
1916                             &new_leaf_entry,
1917                             &numbytes_delta);
1918 
1919     numrows_delta = 0;
1920     if (new_leaf_entry) {
1921         numrows_delta = 0;
1922     } else {
1923         numrows_delta = -1;
1924     }
1925 
1926     // If we created a new mempool buffer we must free the
1927     // old/original buffer.
1928     if (maybe_free) {
1929         toku_free(maybe_free);
1930     }
1931 
1932     // Update stats.
1933     bn->stat64_delta.numrows += numrows_delta;
1934     bn->stat64_delta.numbytes += numbytes_delta;
1935     delta->numrows += numrows_delta;
1936     delta->numbytes += numbytes_delta;
1937 
1938 exit:
1939     return;
1940 }
1941 
1942 // Garbage collect all leaf entries for a given basement node.
1943 static void
basement_node_gc_all_les(BASEMENTNODE bn,txn_gc_info * gc_info,STAT64INFO_S * delta)1944 basement_node_gc_all_les(BASEMENTNODE bn,
1945                          txn_gc_info *gc_info,
1946                          STAT64INFO_S * delta)
1947 {
1948     int r = 0;
1949     uint32_t index = 0;
1950     uint32_t num_leafentries_before;
1951     while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1952         void* keyp = NULL;
1953         uint32_t keylen = 0;
1954         LEAFENTRY leaf_entry;
1955         r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp);
1956         assert_zero(r);
1957         ft_basement_node_gc_once(
1958             bn,
1959             index,
1960             keyp,
1961             keylen,
1962             leaf_entry,
1963             gc_info,
1964             delta
1965             );
1966         // Check if the leaf entry was deleted or not.
1967         if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1968             ++index;
1969         }
1970     }
1971 }
1972 
1973 // Garbage collect all leaf entires in all basement nodes.
1974 static void
ft_leaf_gc_all_les(FT ft,FTNODE node,txn_gc_info * gc_info)1975 ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
1976 {
1977     toku_ftnode_assert_fully_in_memory(node);
1978     paranoid_invariant_zero(node->height);
1979     // Loop through each leaf entry, garbage collecting as we go.
1980     for (int i = 0; i < node->n_children; ++i) {
1981         // Perform the garbage collection.
1982         BASEMENTNODE bn = BLB(node, i);
1983         STAT64INFO_S delta;
1984         delta.numrows = 0;
1985         delta.numbytes = 0;
1986         basement_node_gc_all_les(bn, gc_info, &delta);
1987         toku_ft_update_stats(&ft->in_memory_stats, delta);
1988     }
1989 }
1990 
toku_ftnode_leaf_run_gc(FT ft,FTNODE node)1991 void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
1992     TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1993     if (logger) {
1994         TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
1995         txn_manager_state txn_state_for_gc(txn_manager);
1996         txn_state_for_gc.init();
1997         TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1998 
1999         // Perform full garbage collection.
2000         //
2001         // - txn_state_for_gc
2002         //     a fresh snapshot of the transaction system.
2003         // - oldest_referenced_xid_for_simple_gc
2004         //     the oldest xid in any live list as of right now - suitible for simple gc
2005         // - node->oldest_referenced_xid_known
2006         //     the last known oldest referenced xid for this node and any unapplied messages.
2007         //     it is a lower bound on the actual oldest referenced xid - but becasue there
2008         //     may be abort messages above us, we need to be careful to only use this value
2009         //     for implicit promotion (as opposed to the oldest referenced xid for simple gc)
2010         //
2011         // The node has its own oldest referenced xid because it must be careful not to implicitly promote
2012         // provisional entries for transactions that are no longer live, but may have abort messages
2013         // somewhere above us in the tree.
2014         txn_gc_info gc_info(&txn_state_for_gc,
2015                             oldest_referenced_xid_for_simple_gc,
2016                             node->oldest_referenced_xid_known,
2017                             true);
2018         ft_leaf_gc_all_les(ft, node, &gc_info);
2019     }
2020 }
2021 
toku_ftnode_put_msg(const toku::comparator & cmp,ft_update_func update_fun,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,txn_gc_info * gc_info,size_t flow_deltas[],STAT64INFO stats_to_update,int64_t * logical_rows_delta)2022 void toku_ftnode_put_msg(
2023     const toku::comparator &cmp,
2024     ft_update_func update_fun,
2025     FTNODE node,
2026     int target_childnum,
2027     const ft_msg &msg,
2028     bool is_fresh,
2029     txn_gc_info* gc_info,
2030     size_t flow_deltas[],
2031     STAT64INFO stats_to_update,
2032     int64_t* logical_rows_delta) {
2033 // Effect: Push message into the subtree rooted at NODE.
2034 //   If NODE is a leaf, then
2035 //   put message into leaf, applying it to the leafentries
2036 //   If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
2037 //   The node may become overfull.  That's not our problem.
2038     toku_ftnode_assert_fully_in_memory(node);
2039     //
2040     // see comments in toku_ft_leaf_apply_msg
2041     // to understand why we don't handle setting
2042     // node->max_msn_applied_to_node_on_disk here,
2043     // and instead defer to these functions
2044     //
2045     if (node->height==0) {
2046         toku_ft_leaf_apply_msg(
2047             cmp,
2048             update_fun,
2049             node,
2050             target_childnum, msg,
2051             gc_info,
2052             nullptr,
2053             stats_to_update,
2054             logical_rows_delta);
2055     } else {
2056         ft_nonleaf_put_msg(
2057             cmp,
2058             node,
2059             target_childnum,
2060             msg,
2061             is_fresh,
2062             flow_deltas);
2063     }
2064 }
2065 
2066 // Effect: applies the message to the leaf if the appropriate basement node is
2067 //           in memory. This function is called during message injection and/or
2068 //           flushing, so the entire node MUST be in memory.
toku_ft_leaf_apply_msg(const toku::comparator & cmp,ft_update_func update_fun,FTNODE node,int target_childnum,const ft_msg & msg,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)2069 void toku_ft_leaf_apply_msg(
2070     const toku::comparator& cmp,
2071     ft_update_func update_fun,
2072     FTNODE node,
2073     int target_childnum,  // which child to inject to, or -1 if unknown
2074     const ft_msg& msg,
2075     txn_gc_info* gc_info,
2076     uint64_t* workdone,
2077     STAT64INFO stats_to_update,
2078     int64_t* logical_rows_delta) {
2079 
2080     VERIFY_NODE(t, node);
2081     toku_ftnode_assert_fully_in_memory(node);
2082 
2083     //
2084     // Because toku_ft_leaf_apply_msg is called with the intent of permanently
2085     // applying a message to a leaf node (meaning the message is permanently applied
2086     // and will be purged from the system after this call, as opposed to
2087     // toku_apply_ancestors_messages_to_node, which applies a message
2088     // for a query, but the message may still reside in the system and
2089     // be reapplied later), we mark the node as dirty and
2090     // take the opportunity to update node->max_msn_applied_to_node_on_disk.
2091     //
2092     node->set_dirty();
2093 
2094     //
2095     // we cannot blindly update node->max_msn_applied_to_node_on_disk,
2096     // we must check to see if the msn is greater that the one already stored,
2097     // because the message may have already been applied earlier (via
2098     // toku_apply_ancestors_messages_to_node) to answer a query
2099     //
2100     // This is why we handle node->max_msn_applied_to_node_on_disk both here
2101     // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg.
2102     //
2103     MSN msg_msn = msg.msn();
2104     if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
2105         node->max_msn_applied_to_node_on_disk = msg_msn;
2106     }
2107 
2108     if (ft_msg_type_applies_once(msg.type())) {
2109         unsigned int childnum = (target_childnum >= 0
2110                                  ? target_childnum
2111                                  : toku_ftnode_which_child(node, msg.kdbt(), cmp));
2112         BASEMENTNODE bn = BLB(node, childnum);
2113         if (msg.msn().msn > bn->max_msn_applied.msn) {
2114             bn->max_msn_applied = msg.msn();
2115             toku_ft_bn_apply_msg(
2116                 cmp,
2117                 update_fun,
2118                 bn,
2119                 msg,
2120                 gc_info,
2121                 workdone,
2122                 stats_to_update,
2123                 logical_rows_delta);
2124         } else {
2125             toku_ft_status_note_msn_discard();
2126         }
2127     } else if (ft_msg_type_applies_all(msg.type())) {
2128         for (int childnum=0; childnum<node->n_children; childnum++) {
2129             if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
2130                 BLB(node, childnum)->max_msn_applied = msg.msn();
2131                 toku_ft_bn_apply_msg(
2132                     cmp,
2133                     update_fun,
2134                     BLB(node, childnum),
2135                     msg,
2136                     gc_info,
2137                     workdone,
2138                     stats_to_update,
2139                     logical_rows_delta);
2140             } else {
2141                 toku_ft_status_note_msn_discard();
2142             }
2143         }
2144     } else if (!ft_msg_type_does_nothing(msg.type())) {
2145         invariant(ft_msg_type_does_nothing(msg.type()));
2146     }
2147     VERIFY_NODE(t, node);
2148 }
2149 
2150