1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include "ft/ft.h"
41 #include "ft/ft-cachetable-wrappers.h"
42 #include "ft/ft-internal.h"
43 #include "ft/ft-flusher.h"
44 #include "ft/ft-flusher-internal.h"
45 #include "ft/node.h"
46 #include "ft/serialize/block_table.h"
47 #include "ft/serialize/ft_node-serialize.h"
48 #include "portability/toku_assert.h"
49 #include "portability/toku_atomic.h"
50 #include "util/status.h"
51 #include "util/context.h"
52 
53 
toku_ft_flusher_get_status(FT_FLUSHER_STATUS status)54 void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
55     fl_status.init();
56     *status = fl_status;
57 }
58 
59 //
60 // For test purposes only.
61 // These callbacks are never used in production code, only as a way
62 //  to test the system (for example, by causing crashes at predictable times).
63 //
64 static void (*flusher_thread_callback)(int, void*) = NULL;
65 static void *flusher_thread_callback_extra = NULL;
66 
toku_flusher_thread_set_callback(void (* callback_f)(int,void *),void * extra)67 void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
68                                       void* extra) {
69     flusher_thread_callback = callback_f;
70     flusher_thread_callback_extra = extra;
71 }
72 
call_flusher_thread_callback(int flt_state)73 static void call_flusher_thread_callback(int flt_state) {
74     if (flusher_thread_callback) {
75         flusher_thread_callback(flt_state, flusher_thread_callback_extra);
76     }
77 }
78 
79 static int
find_heaviest_child(FTNODE node)80 find_heaviest_child(FTNODE node)
81 {
82     int max_child = 0;
83     uint64_t max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
84 
85     invariant(node->n_children > 0);
86     for (int i = 1; i < node->n_children; i++) {
87         uint64_t bytes_in_buf = toku_bnc_nbytesinbuf(BNC(node, i));
88         uint64_t workdone = BP_WORKDONE(node, i);
89         if (workdone > 0) {
90             invariant(bytes_in_buf > 0);
91         }
92         uint64_t this_weight = bytes_in_buf + workdone;
93         if (max_weight < this_weight) {
94             max_child = i;
95             max_weight = this_weight;
96         }
97     }
98     return max_child;
99 }
100 
101 static void
update_flush_status(FTNODE child,int cascades)102 update_flush_status(FTNODE child, int cascades) {
103     FL_STATUS_VAL(FT_FLUSHER_FLUSH_TOTAL)++;
104     if (cascades > 0) {
105         FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES)++;
106         switch (cascades) {
107         case 1:
108             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
109         case 2:
110             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
111         case 3:
112             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
113         case 4:
114             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
115         case 5:
116             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
117         default:
118             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
119         }
120     }
121     bool flush_needs_io = false;
122     for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
123         if (BP_STATE(child, i) == PT_ON_DISK) {
124             flush_needs_io = true;
125         }
126     }
127     if (flush_needs_io) {
128         FL_STATUS_VAL(FT_FLUSHER_FLUSH_NEEDED_IO)++;
129     } else {
130         FL_STATUS_VAL(FT_FLUSHER_FLUSH_IN_MEMORY)++;
131     }
132 }
133 
134 static void
maybe_destroy_child_blbs(FTNODE node,FTNODE child,FT ft)135 maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT ft)
136 {
137     // If the node is already fully in memory, as in upgrade, we don't
138     // need to destroy the basement nodes because they are all equally
139     // up to date.
140     if (child->n_children > 1 &&
141         child->height == 0 &&
142         !child->dirty()) {
143         for (int i = 0; i < child->n_children; ++i) {
144             if (BP_STATE(child, i) == PT_AVAIL &&
145                 node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn)
146             {
147                 toku_evict_bn_from_memory(child, i, ft);
148             }
149         }
150     }
151 }
152 
153 static void
154 ft_merge_child(
155     FT ft,
156     FTNODE node,
157     int childnum_to_merge,
158     bool *did_react,
159     struct flusher_advice *fa);
160 
161 static int
pick_heaviest_child(FT UU (ft),FTNODE parent,void * UU (extra))162 pick_heaviest_child(FT UU(ft),
163                     FTNODE parent,
164                     void* UU(extra))
165 {
166     int childnum = find_heaviest_child(parent);
167     paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
168     return childnum;
169 }
170 
171 bool
dont_destroy_basement_nodes(void * UU (extra))172 dont_destroy_basement_nodes(void* UU(extra))
173 {
174     return false;
175 }
176 
177 static bool
do_destroy_basement_nodes(void * UU (extra))178 do_destroy_basement_nodes(void* UU(extra))
179 {
180     return true;
181 }
182 
183 bool
always_recursively_flush(FTNODE UU (child),void * UU (extra))184 always_recursively_flush(FTNODE UU(child), void* UU(extra))
185 {
186     return true;
187 }
188 
189 bool
never_recursively_flush(FTNODE UU (child),void * UU (extra))190 never_recursively_flush(FTNODE UU(child), void* UU(extra))
191 {
192     return false;
193 }
194 
195 /**
196  * Flusher thread ("normal" flushing) implementation.
197  */
198 struct flush_status_update_extra {
199     int cascades;
200     uint32_t nodesize;
201 };
202 
203 static bool
recurse_if_child_is_gorged(FTNODE child,void * extra)204 recurse_if_child_is_gorged(FTNODE child, void* extra)
205 {
206     struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
207     return toku_ftnode_nonleaf_is_gorged(child, fste->nodesize);
208 }
209 
210 int
default_pick_child_after_split(FT UU (ft),FTNODE UU (parent),int UU (childnuma),int UU (childnumb),void * UU (extra))211 default_pick_child_after_split(FT UU(ft),
212                                FTNODE UU(parent),
213                                int UU(childnuma),
214                                int UU(childnumb),
215                                void* UU(extra))
216 {
217     return -1;
218 }
219 
220 void
default_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * UU (extra))221 default_merge_child(struct flusher_advice *fa,
222                     FT ft,
223                     FTNODE parent,
224                     int childnum,
225                     FTNODE child,
226                     void* UU(extra))
227 {
228     //
229     // There is probably a way to pass FTNODE child
230     // into ft_merge_child, but for simplicity for now,
231     // we are just going to unpin child and
232     // let ft_merge_child pin it again
233     //
234     toku_unpin_ftnode(ft, child);
235     //
236     //
237     // it is responsibility of ft_merge_child to unlock parent
238     //
239     bool did_react;
240     ft_merge_child(ft, parent, childnum, &did_react, fa);
241 }
242 
243 void
flusher_advice_init(struct flusher_advice * fa,FA_PICK_CHILD pick_child,FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,FA_MAYBE_MERGE_CHILD maybe_merge_child,FA_UPDATE_STATUS update_status,FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,void * extra)244 flusher_advice_init(
245     struct flusher_advice *fa,
246     FA_PICK_CHILD pick_child,
247     FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
248     FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
249     FA_MAYBE_MERGE_CHILD maybe_merge_child,
250     FA_UPDATE_STATUS update_status,
251     FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
252     void* extra
253     )
254 {
255     fa->pick_child = pick_child;
256     fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
257     fa->should_recursively_flush = should_recursively_flush;
258     fa->maybe_merge_child = maybe_merge_child;
259     fa->update_status = update_status;
260     fa->pick_child_after_split = pick_child_after_split;
261     fa->extra = extra;
262 }
263 
264 static void
flt_update_status(FTNODE child,int UU (dirtied),void * extra)265 flt_update_status(FTNODE child,
266                  int UU(dirtied),
267                  void* extra)
268 {
269     struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
270     update_flush_status(child, fste->cascades);
271     // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
272     // cascades to increase.  If not it doesn't matter.
273     fste->cascades++;
274 }
275 
276 static void
flt_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)277 flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
278 {
279     fste->cascades = 0;
280     fste->nodesize = nodesize;
281     flusher_advice_init(fa,
282                         pick_heaviest_child,
283                         dont_destroy_basement_nodes,
284                         recurse_if_child_is_gorged,
285                         default_merge_child,
286                         flt_update_status,
287                         default_pick_child_after_split,
288                         fste);
289 }
290 
291 struct ctm_extra {
292     bool is_last_child;
293     DBT target_key;
294 };
295 
296 static int
ctm_pick_child(FT ft,FTNODE parent,void * extra)297 ctm_pick_child(FT ft,
298                FTNODE parent,
299                void* extra)
300 {
301     struct ctm_extra* ctme = (struct ctm_extra *) extra;
302     int childnum;
303     if (parent->height == 1 && ctme->is_last_child) {
304         childnum = parent->n_children - 1;
305     } else {
306         childnum = toku_ftnode_which_child(parent, &ctme->target_key, ft->cmp);
307     }
308     return childnum;
309 }
310 
311 static void
ctm_update_status(FTNODE UU (child),int dirtied,void * UU (extra))312 ctm_update_status(
313     FTNODE UU(child),
314     int dirtied,
315     void* UU(extra)
316     )
317 {
318     FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
319 }
320 
321 static void
ctm_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)322 ctm_maybe_merge_child(struct flusher_advice *fa,
323                       FT ft,
324                       FTNODE parent,
325                       int childnum,
326                       FTNODE child,
327                       void *extra)
328 {
329     if (child->height == 0) {
330         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
331     }
332     default_merge_child(fa, ft, parent, childnum, child, extra);
333 }
334 
335 static void
ct_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)336 ct_maybe_merge_child(struct flusher_advice *fa,
337                      FT ft,
338                      FTNODE parent,
339                      int childnum,
340                      FTNODE child,
341                      void* extra)
342 {
343     if (child->height > 0) {
344         default_merge_child(fa, ft, parent, childnum, child, extra);
345     }
346     else {
347         struct ctm_extra ctme;
348         paranoid_invariant(parent->n_children > 1);
349         int pivot_to_save;
350         //
351         // we have two cases, one where the childnum
352         // is the last child, and therefore the pivot we
353         // save is not of the pivot which we wish to descend
354         // and another where it is not the last child,
355         // so the pivot is sufficient for identifying the leaf
356         // to be merged
357         //
358         if (childnum == (parent->n_children - 1)) {
359             ctme.is_last_child = true;
360             pivot_to_save = childnum - 1;
361         }
362         else {
363             ctme.is_last_child = false;
364             pivot_to_save = childnum;
365         }
366         toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
367 
368         // at this point, ctme is properly setup, now we can do the merge
369         struct flusher_advice new_fa;
370         flusher_advice_init(
371             &new_fa,
372             ctm_pick_child,
373             dont_destroy_basement_nodes,
374             always_recursively_flush,
375             ctm_maybe_merge_child,
376             ctm_update_status,
377             default_pick_child_after_split,
378             &ctme);
379 
380         toku_unpin_ftnode(ft, parent);
381         toku_unpin_ftnode(ft, child);
382 
383         FTNODE root_node = NULL;
384         {
385             uint32_t fullhash;
386             CACHEKEY root;
387             toku_calculate_root_offset_pointer(ft, &root, &fullhash);
388             ftnode_fetch_extra bfe;
389             bfe.create_for_full_read(ft);
390             toku_pin_ftnode(ft, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, &root_node, true);
391             toku_ftnode_assert_fully_in_memory(root_node);
392         }
393 
394         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
395         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
396 
397         toku_ft_flush_some_child(ft, root_node, &new_fa);
398 
399         (void) toku_sync_fetch_and_sub(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
400 
401         toku_destroy_dbt(&ctme.target_key);
402     }
403 }
404 
405 static void
ct_update_status(FTNODE child,int dirtied,void * extra)406 ct_update_status(FTNODE child,
407                  int dirtied,
408                  void* extra)
409 {
410     struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
411     update_flush_status(child, fste->cascades);
412     FL_STATUS_VAL(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
413     // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
414     fste->cascades++;
415 }
416 
417 static void
ct_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)418 ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
419 {
420     fste->cascades = 0;
421     fste->nodesize = nodesize;
422     flusher_advice_init(fa,
423                         pick_heaviest_child,
424                         do_destroy_basement_nodes,
425                         recurse_if_child_is_gorged,
426                         ct_maybe_merge_child,
427                         ct_update_status,
428                         default_pick_child_after_split,
429                         fste);
430 }
431 
432 //
433 // This returns true if the node MAY be reactive,
434 // false is we are absolutely sure that it is NOT reactive.
435 // The reason for inaccuracy is that the node may be
436 // a leaf node that is not entirely in memory. If so, then
437 // we cannot be sure if the node is reactive.
438 //
ft_ftnode_may_be_reactive(FT ft,FTNODE node)439 static bool ft_ftnode_may_be_reactive(FT ft, FTNODE node)
440 {
441     if (node->height == 0) {
442         return true;
443     } else {
444         return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout) != RE_STABLE;
445     }
446 }
447 
448 /* NODE is a node with a child.
449  * childnum was split into two nodes childa, and childb.  childa is the same as the original child.  childb is a new child.
450  * We must slide things around, & move things from the old table to the new tables.
451  * Requires: the CHILDNUMth buffer of node is empty.
452  * We don't push anything down to children.  We split the node, and things land wherever they land.
453  * We must delete the old buffer (but the old child is already deleted.)
454  * On return, the new children and node STAY PINNED.
455  */
456 static void
handle_split_of_child(FT ft,FTNODE node,int childnum,FTNODE childa,FTNODE childb,DBT * splitk)457 handle_split_of_child(
458     FT ft,
459     FTNODE node,
460     int childnum,
461     FTNODE childa,
462     FTNODE childb,
463     DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
464     )
465 {
466     paranoid_invariant(node->height>0);
467     paranoid_invariant(0 <= childnum);
468     paranoid_invariant(childnum < node->n_children);
469     toku_ftnode_assert_fully_in_memory(node);
470     toku_ftnode_assert_fully_in_memory(childa);
471     toku_ftnode_assert_fully_in_memory(childb);
472     NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
473     paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
474     WHEN_NOT_GCOV(
475         if (toku_ft_debug_mode) {
476             printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
477             printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
478             for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
479             printf("\n");
480         }
481     )
482 
483     node->set_dirty();
484 
485     XREALLOC_N(node->n_children+1, node->bp);
486     // Slide the children over.
487     // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
488     // this moves node->bp[6] through node->bp[9] over to
489     // node->bp[7] through node->bp[10]
490     for (int cnum=node->n_children; cnum>childnum+1; cnum--) {
491         node->bp[cnum] = node->bp[cnum-1];
492     }
493     memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
494     node->n_children++;
495 
496     paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->blocknum.b); // use the same child
497 
498     // We never set the rightmost blocknum to be the root.
499     // Instead, we wait for the root to split and let promotion initialize the rightmost
500     // blocknum to be the first non-root leaf node on the right extreme to receive an insert.
501     BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
502     invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
503     if (childa->blocknum.b == rightmost_blocknum.b) {
504         // The rightmost leaf (a) split into (a) and (b). We want (b) to swap pair values
505         // with (a), now that it is the new rightmost leaf. This keeps the rightmost blocknum
506         // constant, the same the way we keep the root blocknum constant.
507         toku_ftnode_swap_pair_values(childa, childb);
508         BP_BLOCKNUM(node, childnum) = childa->blocknum;
509     }
510 
511     BP_BLOCKNUM(node, childnum+1) = childb->blocknum;
512     BP_WORKDONE(node, childnum+1) = 0;
513     BP_STATE(node,childnum+1) = PT_AVAIL;
514 
515     NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
516     for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
517         // just split the flows in half for now, can't guess much better
518         // at the moment
519         new_bnc->flow[i] = old_bnc->flow[i] / 2;
520         old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
521     }
522     set_BNC(node, childnum+1, new_bnc);
523 
524     // Insert the new split key , sliding the other keys over
525     node->pivotkeys.insert_at(splitk, childnum);
526 
527     WHEN_NOT_GCOV(
528         if (toku_ft_debug_mode) {
529             printf("%s:%d splitkeys:", __FILE__, __LINE__);
530             for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
531             printf("\n");
532         }
533     )
534 
535     /* Keep pushing to the children, but not if the children would require a pushdown */
536     toku_ftnode_assert_fully_in_memory(node);
537     toku_ftnode_assert_fully_in_memory(childa);
538     toku_ftnode_assert_fully_in_memory(childb);
539 
540     VERIFY_NODE(t, node);
541     VERIFY_NODE(t, childa);
542     VERIFY_NODE(t, childb);
543 }
544 
545 static void
verify_all_in_mempool(FTNODE UU ()node)546 verify_all_in_mempool(FTNODE UU() node)
547 {
548 #ifdef TOKU_DEBUG_PARANOID
549     if (node->height==0) {
550         for (int i = 0; i < node->n_children; i++) {
551             invariant(BP_STATE(node,i) == PT_AVAIL);
552             BLB_DATA(node, i)->verify_mempool();
553         }
554     }
555 #endif
556 }
557 
558 static uint64_t
ftleaf_disk_size(FTNODE node)559 ftleaf_disk_size(FTNODE node)
560 // Effect: get the disk size of a leafentry
561 {
562     paranoid_invariant(node->height == 0);
563     toku_ftnode_assert_fully_in_memory(node);
564     uint64_t retval = 0;
565     for (int i = 0; i < node->n_children; i++) {
566         retval += BLB_DATA(node, i)->get_disk_size();
567     }
568     return retval;
569 }
570 
571 static void
ftleaf_get_split_loc(FTNODE node,enum split_mode split_mode,int * num_left_bns,int * num_left_les)572 ftleaf_get_split_loc(
573     FTNODE node,
574     enum split_mode split_mode,
575     int *num_left_bns,   // which basement within leaf
576     int *num_left_les    // which key within basement
577     )
578 // Effect: Find the location within a leaf node where we want to perform a split
579 // num_left_bns is how many basement nodes (which OMT) should be split to the left.
580 // num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
581 {
582     switch (split_mode) {
583     case SPLIT_LEFT_HEAVY: {
584         *num_left_bns = node->n_children;
585         *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
586         if (*num_left_les == 0) {
587             *num_left_bns = node->n_children - 1;
588             *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
589         }
590         goto exit;
591     }
592     case SPLIT_RIGHT_HEAVY: {
593         *num_left_bns = 1;
594         *num_left_les = BLB_DATA(node, 0)->num_klpairs() ? 1 : 0;
595         goto exit;
596     }
597     case SPLIT_EVENLY: {
598         paranoid_invariant(node->height == 0);
599         // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
600         uint64_t sumlesizes = ftleaf_disk_size(node);
601         uint32_t size_so_far = 0;
602         for (int i = 0; i < node->n_children; i++) {
603             bn_data* bd = BLB_DATA(node, i);
604             uint32_t n_leafentries = bd->num_klpairs();
605             for (uint32_t j=0; j < n_leafentries; j++) {
606                 size_t size_this_le;
607                 int rr = bd->fetch_klpair_disksize(j, &size_this_le);
608                 invariant_zero(rr);
609                 size_so_far += size_this_le;
610                 if (size_so_far >= sumlesizes/2) {
611                     *num_left_bns = i + 1;
612                     *num_left_les = j + 1;
613                     if (*num_left_bns == node->n_children &&
614                         (unsigned int) *num_left_les == n_leafentries) {
615                         // need to correct for when we're splitting after the
616                         // last element, that makes no sense
617                         if (*num_left_les > 1) {
618                             (*num_left_les)--;
619                         } else if (*num_left_bns > 1) {
620                             (*num_left_bns)--;
621                             *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
622                         } else {
623                             // we are trying to split a leaf with only one
624                             // leafentry in it
625                             abort();
626                         }
627                     }
628                     goto exit;
629                 }
630             }
631         }
632     }
633     }
634     abort();
635 exit:
636     return;
637 }
638 
639 static void
move_leafentries(BASEMENTNODE dest_bn,BASEMENTNODE src_bn,uint32_t lbi,uint32_t ube)640 move_leafentries(
641     BASEMENTNODE dest_bn,
642     BASEMENTNODE src_bn,
643     uint32_t lbi, //lower bound inclusive
644     uint32_t ube //upper bound exclusive
645     )
646 //Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
647 {
648     invariant(ube == src_bn->data_buffer.num_klpairs());
649     src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
650 }
651 
ftnode_finalize_split(FTNODE node,FTNODE B,MSN max_msn_applied_to_node)652 static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
653 // Effect: Finalizes a split by updating some bits and dirtying both nodes
654     toku_ftnode_assert_fully_in_memory(node);
655     toku_ftnode_assert_fully_in_memory(B);
656     verify_all_in_mempool(node);
657     verify_all_in_mempool(B);
658 
659     node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
660     B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
661 
662     // The new node in the split inherits the oldest known reference xid
663     B->oldest_referenced_xid_known = node->oldest_referenced_xid_known;
664 
665     node->set_dirty();
666     B->set_dirty();
667 }
668 
669 void
ftleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,bool create_new_node,enum split_mode split_mode,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)670 ftleaf_split(
671     FT ft,
672     FTNODE node,
673     FTNODE *nodea,
674     FTNODE *nodeb,
675     DBT *splitk,
676     bool create_new_node,
677     enum split_mode split_mode,
678     uint32_t num_dependent_nodes,
679     FTNODE* dependent_nodes)
680 // Effect: Split a leaf node.
681 // Argument "node" is node to be split.
682 // Upon return:
683 //   nodea and nodeb point to new nodes that result from split of "node"
684 //   nodea is the left node that results from the split
685 //   splitk is the right-most key of nodea
686 {
687 
688     paranoid_invariant(node->height == 0);
689     FL_STATUS_VAL(FT_FLUSHER_SPLIT_LEAF)++;
690     if (node->n_children) {
691         // First move all the accumulated stat64info deltas into the first basement.
692         // After the split, either both nodes or neither node will be included in the next checkpoint.
693         // The accumulated stats in the dictionary will be correct in either case.
694         // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
695         // correct information for a basement that is divided between two leafnodes (i.e. when split is
696         // not on a basement boundary).
697         STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
698         BASEMENTNODE bn = BLB(node,0);
699         bn->stat64_delta = delta_for_leafnode;
700     }
701 
702 
703     FTNODE B = nullptr;
704     uint32_t fullhash;
705     BLOCKNUM name;
706 
707     if (create_new_node) {
708         // put value in cachetable and do checkpointing
709         // of dependent nodes
710         //
711         // We do this here, before evaluating the last_bn_on_left
712         // and last_le_on_left_within_bn because this operation
713         // may write to disk the dependent nodes.
714         // While doing so, we may rebalance the leaf node
715         // we are splitting, thereby invalidating the
716         // values of last_bn_on_left and last_le_on_left_within_bn.
717         // So, we must call this before evaluating
718         // those two values
719         cachetable_put_empty_node_with_dep_nodes(
720             ft,
721             num_dependent_nodes,
722             dependent_nodes,
723             &name,
724             &fullhash,
725             &B
726             );
727         // GCC 4.8 seems to get confused and think B is maybe uninitialized at link time.
728         // TODO(leif): figure out why it thinks this and actually fix it.
729         invariant_notnull(B);
730     }
731 
732 
733     paranoid_invariant(node->height==0);
734     toku_ftnode_assert_fully_in_memory(node);
735     verify_all_in_mempool(node);
736     MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
737 
738     // variables that say where we will do the split.
739     // After the split, there will be num_left_bns basement nodes in the left node,
740     // and the last basement node in the left node will have num_left_les leafentries.
741     int num_left_bns;
742     int num_left_les;
743     ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
744     {
745         // did we split right on the boundary between basement nodes?
746         const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) BLB_DATA(node, num_left_bns - 1)->num_klpairs());
747         // Now we know where we are going to break it
748         // the two nodes will have a total of n_children+1 basement nodes
749         // and n_children-1 pivots
750         // the left node, node, will have last_bn_on_left+1 basement nodes
751         // the right node, B, will have n_children-last_bn_on_left basement nodes
752         // the pivots of node will be the first last_bn_on_left pivots that originally exist
753         // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist
754 
755         // Note: The basements will not be rebalanced.  Only the mempool of the basement that is split
756         //       (if split_on_boundary is false) will be affected.  All other mempools will remain intact. ???
757 
758         //set up the basement nodes in the new node
759         int num_children_in_node = num_left_bns;
760         // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
761         // while it's not on the boundary, we do need node->n_children
762         // children in B.
763         int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
764         if (num_children_in_b == 0) {
765             // for uneven split, make sure we have at least 1 bn
766             paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
767             num_children_in_b = 1;
768         }
769         paranoid_invariant(num_children_in_node > 0);
770         if (create_new_node) {
771             toku_initialize_empty_ftnode(
772                 B,
773                 name,
774                 0,
775                 num_children_in_b,
776                 ft->h->layout_version,
777                 ft->h->flags);
778             B->fullhash = fullhash;
779         }
780         else {
781             B = *nodeb;
782             REALLOC_N(num_children_in_b,   B->bp);
783             B->n_children = num_children_in_b;
784             for (int i = 0; i < num_children_in_b; i++) {
785                 BP_BLOCKNUM(B,i).b = 0;
786                 BP_STATE(B,i) = PT_AVAIL;
787                 BP_WORKDONE(B,i) = 0;
788                 set_BLB(B, i, toku_create_empty_bn());
789             }
790         }
791 
792         // now move all the data
793 
794         int curr_src_bn_index = num_left_bns - 1;
795         int curr_dest_bn_index = 0;
796 
797         // handle the move of a subset of data in last_bn_on_left from node to B
798         if (!split_on_boundary) {
799             BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
800             destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
801             set_BNULL(B, curr_dest_bn_index);
802             set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
803             move_leafentries(BLB(B, curr_dest_bn_index),
804                              BLB(node, curr_src_bn_index),
805                              num_left_les,         // first row to be moved to B
806                              BLB_DATA(node, curr_src_bn_index)->num_klpairs()  // number of rows in basement to be split
807                              );
808             BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
809             curr_dest_bn_index++;
810         }
811         curr_src_bn_index++;
812 
813         paranoid_invariant(B->n_children >= curr_dest_bn_index);
814         paranoid_invariant(node->n_children >= curr_src_bn_index);
815 
816         // move the rest of the basement nodes
817         for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
818             destroy_basement_node(BLB(B, curr_dest_bn_index));
819             set_BNULL(B, curr_dest_bn_index);
820             B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
821         }
822         if (curr_dest_bn_index < B->n_children) {
823             // B already has an empty basement node here.
824             BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
825         }
826 
827         //
828         // now handle the pivots
829         //
830 
831         // the child index in the original node that corresponds to the
832         // first node in the right node of the split
833         int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
834         node->pivotkeys.split_at(split_idx, &B->pivotkeys);
835         if (split_on_boundary && num_left_bns < node->n_children && splitk) {
836             toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
837         } else if (splitk) {
838             bn_data* bd = BLB_DATA(node, num_left_bns - 1);
839             uint32_t keylen;
840             void *key;
841             int rr = bd->fetch_key_and_len(bd->num_klpairs() - 1, &keylen, &key);
842             invariant_zero(rr);
843             toku_memdup_dbt(splitk, key, keylen);
844         }
845 
846         node->n_children = num_children_in_node;
847         REALLOC_N(num_children_in_node, node->bp);
848     }
849 
850     ftnode_finalize_split(node, B, max_msn_applied_to_node);
851     *nodea = node;
852     *nodeb = B;
853 }    // end of ftleaf_split()
854 
855 void
ft_nonleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)856 ft_nonleaf_split(
857     FT ft,
858     FTNODE node,
859     FTNODE *nodea,
860     FTNODE *nodeb,
861     DBT *splitk,
862     uint32_t num_dependent_nodes,
863     FTNODE* dependent_nodes)
864 {
865     //VERIFY_NODE(t,node);
866     FL_STATUS_VAL(FT_FLUSHER_SPLIT_NONLEAF)++;
867     toku_ftnode_assert_fully_in_memory(node);
868     int old_n_children = node->n_children;
869     int n_children_in_a = old_n_children/2;
870     int n_children_in_b = old_n_children-n_children_in_a;
871     MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
872     FTNODE B;
873     paranoid_invariant(node->height>0);
874     paranoid_invariant(node->n_children>=2); // Otherwise, how do we split?	 We need at least two children to split. */
875     create_new_ftnode_with_dep_nodes(ft, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
876     {
877         /* The first n_children_in_a go into node a.
878          * That means that the first n_children_in_a-1 keys go into node a.
879          * The splitter key is key number n_children_in_a */
880         for (int i = n_children_in_a; i<old_n_children; i++) {
881             int targchild = i-n_children_in_a;
882             // TODO: Figure out better way to handle this
883             // the problem is that create_new_ftnode_with_dep_nodes for B creates
884             // all the data structures, whereas we really don't want it to fill
885             // in anything for the bp's.
886             // Now we have to go free what it just created so we can
887             // slide the bp over
888             destroy_nonleaf_childinfo(BNC(B, targchild));
889             // now move the bp over
890             B->bp[targchild] = node->bp[i];
891             memset(&node->bp[i], 0, sizeof(node->bp[0]));
892         }
893 
894         // the split key for our parent is the rightmost pivot key in node
895         node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
896         toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
897         node->pivotkeys.delete_at(n_children_in_a - 1);
898 
899         node->n_children = n_children_in_a;
900         REALLOC_N(node->n_children, node->bp);
901     }
902 
903     ftnode_finalize_split(node, B, max_msn_applied_to_node);
904     *nodea = node;
905     *nodeb = B;
906 }
907 
908 //
909 // responsibility of ft_split_child is to take locked FTNODEs node and child
910 // and do the following:
911 //  - split child,
912 //  - fix node,
913 //  - release lock on node
914 //  - possibly flush either new children created from split, otherwise unlock children
915 //
916 static void
ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode,struct flusher_advice * fa)917 ft_split_child(
918     FT ft,
919     FTNODE node,
920     int childnum,
921     FTNODE child,
922     enum split_mode split_mode,
923     struct flusher_advice *fa)
924 {
925     paranoid_invariant(node->height>0);
926     paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
927     FTNODE nodea, nodeb;
928     DBT splitk;
929 
930     // for test
931     call_flusher_thread_callback(flt_flush_before_split);
932 
933     FTNODE dep_nodes[2];
934     dep_nodes[0] = node;
935     dep_nodes[1] = child;
936     if (child->height==0) {
937         ftleaf_split(ft, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
938     } else {
939         ft_nonleaf_split(ft, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
940     }
941     // printf("%s:%d child did split\n", __FILE__, __LINE__);
942     handle_split_of_child (ft, node, childnum, nodea, nodeb, &splitk);
943 
944     // for test
945     call_flusher_thread_callback(flt_flush_during_split);
946 
947     // at this point, the split is complete
948     // now we need to unlock node,
949     // and possibly continue
950     // flushing one of the children
951     int picked_child = fa->pick_child_after_split(ft, node, childnum, childnum + 1, fa->extra);
952     toku_unpin_ftnode(ft, node);
953     if (picked_child == childnum ||
954         (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
955         toku_unpin_ftnode(ft, nodeb);
956         toku_ft_flush_some_child(ft, nodea, fa);
957     }
958     else if (picked_child == childnum + 1 ||
959              (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
960         toku_unpin_ftnode(ft, nodea);
961         toku_ft_flush_some_child(ft, nodeb, fa);
962     }
963     else {
964         toku_unpin_ftnode(ft, nodea);
965         toku_unpin_ftnode(ft, nodeb);
966     }
967 
968     toku_destroy_dbt(&splitk);
969 }
970 
bring_node_fully_into_memory(FTNODE node,FT ft)971 static void bring_node_fully_into_memory(FTNODE node, FT ft) {
972     if (!toku_ftnode_fully_in_memory(node)) {
973         ftnode_fetch_extra bfe;
974         bfe.create_for_full_read(ft);
975         toku_cachetable_pf_pinned_pair(
976             node,
977             toku_ftnode_pf_callback,
978             &bfe,
979             ft->cf,
980             node->blocknum,
981             toku_cachetable_hash(ft->cf, node->blocknum)
982             );
983     }
984 }
985 
986 static void
flush_this_child(FT ft,FTNODE node,FTNODE child,int childnum,struct flusher_advice * fa)987 flush_this_child(
988     FT ft,
989     FTNODE node,
990     FTNODE child,
991     int childnum,
992     struct flusher_advice *fa)
993 // Effect: Push everything in the CHILDNUMth buffer of node down into the child.
994 {
995     update_flush_status(child, 0);
996     toku_ftnode_assert_fully_in_memory(node);
997     if (fa->should_destroy_basement_nodes(fa)) {
998         maybe_destroy_child_blbs(node, child, ft);
999     }
1000     bring_node_fully_into_memory(child, ft);
1001     toku_ftnode_assert_fully_in_memory(child);
1002     paranoid_invariant(node->height>0);
1003     paranoid_invariant(child->blocknum.b!=0);
1004     // VERIFY_NODE does not work off client thread as of now
1005     //VERIFY_NODE(t, child);
1006     node->set_dirty();
1007     child->set_dirty();
1008 
1009     BP_WORKDONE(node, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1010     NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1011     set_BNC(node, childnum, toku_create_empty_nl());
1012 
1013     // now we have a bnc to flush to the child. pass down the parent's
1014     // oldest known referenced xid as we flush down to the child.
1015     toku_bnc_flush_to_child(ft, bnc, child, node->oldest_referenced_xid_known);
1016     destroy_nonleaf_childinfo(bnc);
1017 }
1018 
1019 static void
merge_leaf_nodes(FTNODE a,FTNODE b)1020 merge_leaf_nodes(FTNODE a, FTNODE b)
1021 {
1022     FL_STATUS_VAL(FT_FLUSHER_MERGE_LEAF)++;
1023     toku_ftnode_assert_fully_in_memory(a);
1024     toku_ftnode_assert_fully_in_memory(b);
1025     paranoid_invariant(a->height == 0);
1026     paranoid_invariant(b->height == 0);
1027     paranoid_invariant(a->n_children > 0);
1028     paranoid_invariant(b->n_children > 0);
1029 
1030     // Mark nodes as dirty before moving basements from b to a.
1031     // This way, whatever deltas are accumulated in the basements are
1032     // applied to the in_memory_stats in the header if they have not already
1033     // been (if nodes are clean).
1034     // TODO(leif): this is no longer the way in_memory_stats is
1035     // maintained. verify that it's ok to move this just before the unpin
1036     // and then do that.
1037     a->set_dirty();
1038     b->set_dirty();
1039 
1040     bn_data* a_last_bd = BLB_DATA(a, a->n_children-1);
1041     // this bool states if the last basement node in a has any items or not
1042     // If it does, then it stays in the merge. If it does not, the last basement node
1043     // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
1044     const bool a_has_tail = a_last_bd->num_klpairs() > 0;
1045 
1046     int num_children = a->n_children + b->n_children;
1047     if (!a_has_tail) {
1048         int lastchild = a->n_children - 1;
1049         BASEMENTNODE bn = BLB(a, lastchild);
1050 
1051         // verify that last basement in a is empty, then destroy mempool
1052         size_t used_space = a_last_bd->get_disk_size();
1053         invariant_zero(used_space);
1054         destroy_basement_node(bn);
1055         set_BNULL(a, lastchild);
1056         num_children--;
1057         if (lastchild < a->pivotkeys.num_pivots()) {
1058             a->pivotkeys.delete_at(lastchild);
1059         }
1060     } else {
1061         // fill in pivot for what used to be max of node 'a', if it is needed
1062         uint32_t keylen;
1063         void *key;
1064         int r = a_last_bd->fetch_key_and_len(a_last_bd->num_klpairs() - 1, &keylen, &key);
1065         invariant_zero(r);
1066         DBT pivotkey;
1067         toku_fill_dbt(&pivotkey, key, keylen);
1068         a->pivotkeys.replace_at(&pivotkey, a->n_children - 1);
1069     }
1070 
1071     // realloc basement nodes in `a'
1072     REALLOC_N(num_children, a->bp);
1073 
1074     // move each basement node from b to a
1075     uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
1076     for (int i = 0; i < b->n_children; i++) {
1077         a->bp[i + offset] = b->bp[i];
1078         memset(&b->bp[i], 0, sizeof(b->bp[0]));
1079     }
1080 
1081     // append b's pivots to a's pivots
1082     a->pivotkeys.append(b->pivotkeys);
1083 
1084     // now that all the data has been moved from b to a, we can destroy the data in b
1085     a->n_children = num_children;
1086     b->pivotkeys.destroy();
1087     b->n_children = 0;
1088 }
1089 
balance_leaf_nodes(FTNODE a,FTNODE b,DBT * splitk)1090 static void balance_leaf_nodes(
1091     FTNODE a,
1092     FTNODE b,
1093     DBT *splitk)
1094 // Effect:
1095 //  If b is bigger then move stuff from b to a until b is the smaller.
1096 //  If a is bigger then move stuff from a to b until a is the smaller.
1097 {
1098     FL_STATUS_VAL(FT_FLUSHER_BALANCE_LEAF)++;
1099     // first merge all the data into a
1100     merge_leaf_nodes(a,b);
1101     // now split them
1102     // because we are not creating a new node, we can pass in no dependent nodes
1103     ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
1104 }
1105 
1106 static void
maybe_merge_pinned_leaf_nodes(FTNODE a,FTNODE b,const DBT * parent_splitk,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1107 maybe_merge_pinned_leaf_nodes(
1108     FTNODE a,
1109     FTNODE b,
1110     const DBT *parent_splitk,
1111     bool *did_merge,
1112     bool *did_rebalance,
1113     DBT *splitk,
1114     uint32_t nodesize
1115     )
1116 // Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
1117 //	   (We do this if the resulting node is not fissible)
1118 //	   or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
1119 //	   (If a and be are already evenly distributed, we may do nothing.)
1120 {
1121     unsigned int sizea = toku_serialize_ftnode_size(a);
1122     unsigned int sizeb = toku_serialize_ftnode_size(b);
1123     uint32_t num_leafentries = toku_ftnode_leaf_num_entries(a) + toku_ftnode_leaf_num_entries(b);
1124     if (num_leafentries > 1 && (sizea + sizeb)*4 > (nodesize*3)) {
1125         // the combined size is more than 3/4 of a node, so don't merge them.
1126         *did_merge = false;
1127         if (sizea*4 > nodesize && sizeb*4 > nodesize) {
1128             // no need to do anything if both are more than 1/4 of a node.
1129             *did_rebalance = false;
1130             toku_clone_dbt(splitk, *parent_splitk);
1131             return;
1132         }
1133         // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
1134         *did_rebalance = true;
1135         balance_leaf_nodes(a, b, splitk);
1136     } else {
1137         // we are merging them.
1138         *did_merge = true;
1139         *did_rebalance = false;
1140         toku_init_dbt(splitk);
1141         merge_leaf_nodes(a, b);
1142     }
1143 }
1144 
1145 static void
maybe_merge_pinned_nonleaf_nodes(const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk)1146 maybe_merge_pinned_nonleaf_nodes(
1147     const DBT *parent_splitk,
1148     FTNODE a,
1149     FTNODE b,
1150     bool *did_merge,
1151     bool *did_rebalance,
1152     DBT *splitk)
1153 {
1154     toku_ftnode_assert_fully_in_memory(a);
1155     toku_ftnode_assert_fully_in_memory(b);
1156     invariant_notnull(parent_splitk->data);
1157 
1158     int old_n_children = a->n_children;
1159     int new_n_children = old_n_children + b->n_children;
1160 
1161     XREALLOC_N(new_n_children, a->bp);
1162     memcpy(a->bp + old_n_children, b->bp, b->n_children * sizeof(b->bp[0]));
1163     memset(b->bp, 0, b->n_children * sizeof(b->bp[0]));
1164 
1165     a->pivotkeys.insert_at(parent_splitk, old_n_children - 1);
1166     a->pivotkeys.append(b->pivotkeys);
1167     a->n_children = new_n_children;
1168     b->n_children = 0;
1169 
1170     a->set_dirty();
1171     b->set_dirty();
1172 
1173     *did_merge = true;
1174     *did_rebalance = false;
1175     toku_init_dbt(splitk);
1176 
1177     FL_STATUS_VAL(FT_FLUSHER_MERGE_NONLEAF)++;
1178 }
1179 
1180 static void
maybe_merge_pinned_nodes(FTNODE parent,const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1181 maybe_merge_pinned_nodes(
1182     FTNODE parent,
1183     const DBT *parent_splitk,
1184     FTNODE a,
1185     FTNODE b,
1186     bool *did_merge,
1187     bool *did_rebalance,
1188     DBT *splitk,
1189     uint32_t nodesize
1190     )
1191 // Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
1192 //	   (We do this if the resulting node is not fissible)
1193 //	   or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
1194 //	   (If a and be are already evenly distributed, we may do nothing.)
1195 //  If we distribute:
1196 //    For leaf nodes, we distribute the leafentries evenly.
1197 //    For nonleaf nodes, we distribute the children evenly.  That may leave one or both of the nodes overfull, but that's OK.
1198 //  If we distribute, we set *splitk to a malloced pivot key.
1199 // Parameters:
1200 //  t			The FT.
1201 //  parent		The parent of the two nodes to be split.
1202 //  parent_splitk	The pivot key between a and b.	 This is either free()'d or returned in *splitk.
1203 //  a			The first node to merge.
1204 //  b			The second node to merge.
1205 //  logger		The logger.
1206 //  did_merge		(OUT):	Did the two nodes actually get merged?
1207 //  splitk		(OUT):	If the two nodes did not get merged, the new pivot key between the two nodes.
1208 {
1209     MSN msn_max;
1210     paranoid_invariant(a->height == b->height);
1211     toku_ftnode_assert_fully_in_memory(parent);
1212     toku_ftnode_assert_fully_in_memory(a);
1213     toku_ftnode_assert_fully_in_memory(b);
1214     parent->set_dirty();   // just to make sure
1215     {
1216         MSN msna = a->max_msn_applied_to_node_on_disk;
1217         MSN msnb = b->max_msn_applied_to_node_on_disk;
1218         msn_max = (msna.msn > msnb.msn) ? msna : msnb;
1219     }
1220     if (a->height == 0) {
1221         maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
1222     } else {
1223         maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
1224     }
1225     if (*did_merge || *did_rebalance) {
1226         // accurate for leaf nodes because all msgs above have been
1227         // applied, accurate for non-leaf nodes because buffer immediately
1228         // above each node has been flushed
1229         a->max_msn_applied_to_node_on_disk = msn_max;
1230         b->max_msn_applied_to_node_on_disk = msn_max;
1231     }
1232 }
1233 
merge_remove_key_callback(BLOCKNUM * bp,bool for_checkpoint,void * extra)1234 static void merge_remove_key_callback(BLOCKNUM *bp, bool for_checkpoint, void *extra) {
1235     FT ft = (FT) extra;
1236     ft->blocktable.free_blocknum(bp, ft, for_checkpoint);
1237 }
1238 
1239 //
1240 // Takes as input a locked node and a childnum_to_merge
1241 // As output, two of node's children are merged or rebalanced, and node is unlocked
1242 //
1243 static void
ft_merge_child(FT ft,FTNODE node,int childnum_to_merge,bool * did_react,struct flusher_advice * fa)1244 ft_merge_child(
1245     FT ft,
1246     FTNODE node,
1247     int childnum_to_merge,
1248     bool *did_react,
1249     struct flusher_advice *fa)
1250 {
1251     // this function should not be called
1252     // if the child is not mergable
1253     paranoid_invariant(node->n_children > 1);
1254     toku_ftnode_assert_fully_in_memory(node);
1255 
1256     int childnuma,childnumb;
1257     if (childnum_to_merge > 0) {
1258         childnuma = childnum_to_merge-1;
1259         childnumb = childnum_to_merge;
1260     } else {
1261         childnuma = childnum_to_merge;
1262         childnumb = childnum_to_merge+1;
1263     }
1264     paranoid_invariant(0 <= childnuma);
1265     paranoid_invariant(childnuma+1 == childnumb);
1266     paranoid_invariant(childnumb < node->n_children);
1267 
1268     paranoid_invariant(node->height>0);
1269 
1270     // We suspect that at least one of the children is fusible, but they might not be.
1271     // for test
1272     call_flusher_thread_callback(flt_flush_before_merge);
1273 
1274     FTNODE childa, childb;
1275     {
1276         uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnuma);
1277         ftnode_fetch_extra bfe;
1278         bfe.create_for_full_read(ft);
1279         toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa, true);
1280     }
1281     // for test
1282     call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
1283     {
1284         FTNODE dep_nodes[2];
1285         dep_nodes[0] = node;
1286         dep_nodes[1] = childa;
1287         uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnumb);
1288         ftnode_fetch_extra bfe;
1289         bfe.create_for_full_read(ft);
1290         toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb, true);
1291     }
1292 
1293     if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
1294         flush_this_child(ft, node, childa, childnuma, fa);
1295     }
1296     if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
1297         flush_this_child(ft, node, childb, childnumb, fa);
1298     }
1299 
1300     // now we have both children pinned in main memory, and cachetable locked,
1301     // so no checkpoints will occur.
1302 
1303     bool did_merge, did_rebalance;
1304     {
1305         DBT splitk;
1306         toku_init_dbt(&splitk);
1307         const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
1308         maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
1309         //toku_verify_estimates(t,childa);
1310         // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
1311         *did_react = (bool)(did_merge || did_rebalance);
1312 
1313         if (did_merge) {
1314             invariant_null(splitk.data);
1315             NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
1316             NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
1317             for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
1318                 remaining_bnc->flow[i] += merged_bnc->flow[i];
1319             }
1320             destroy_nonleaf_childinfo(merged_bnc);
1321             set_BNULL(node, childnumb);
1322             node->n_children--;
1323             memmove(&node->bp[childnumb],
1324                     &node->bp[childnumb+1],
1325                     (node->n_children-childnumb)*sizeof(node->bp[0]));
1326             REALLOC_N(node->n_children, node->bp);
1327             node->pivotkeys.delete_at(childnuma);
1328 
1329             // Handle a merge of the rightmost leaf node.
1330             BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
1331             if (did_merge && childb->blocknum.b == rightmost_blocknum.b) {
1332                 invariant(childb->blocknum.b != ft->h->root_blocknum.b);
1333                 toku_ftnode_swap_pair_values(childa, childb);
1334                 BP_BLOCKNUM(node, childnuma) = childa->blocknum;
1335             }
1336 
1337             paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->blocknum.b);
1338             childa->set_dirty();  // just to make sure
1339             childb->set_dirty();  // just to make sure
1340         } else {
1341             // flow will be inaccurate for a while, oh well.  the children
1342             // are leaves in this case so it's not a huge deal (we're
1343             // pretty far down the tree)
1344 
1345             // If we didn't merge the nodes, then we need the correct pivot.
1346             invariant_notnull(splitk.data);
1347             node->pivotkeys.replace_at(&splitk, childnuma);
1348             node->set_dirty();
1349         }
1350         toku_destroy_dbt(&splitk);
1351     }
1352     //
1353     // now we possibly flush the children
1354     //
1355     if (did_merge) {
1356         // for test
1357         call_flusher_thread_callback(flt_flush_before_unpin_remove);
1358 
1359         // merge_remove_key_callback will free the blocknum
1360         int rrb = toku_cachetable_unpin_and_remove(
1361             ft->cf,
1362             childb->ct_pair,
1363             merge_remove_key_callback,
1364             ft
1365             );
1366         assert_zero(rrb);
1367 
1368         // for test
1369         call_flusher_thread_callback(ft_flush_aflter_merge);
1370 
1371         // unlock the parent
1372         paranoid_invariant(node->dirty());
1373         toku_unpin_ftnode(ft, node);
1374     }
1375     else {
1376         // for test
1377         call_flusher_thread_callback(ft_flush_aflter_rebalance);
1378 
1379         // unlock the parent
1380         paranoid_invariant(node->dirty());
1381         toku_unpin_ftnode(ft, node);
1382         toku_unpin_ftnode(ft, childb);
1383     }
1384     if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
1385         toku_ft_flush_some_child(ft, childa, fa);
1386     }
1387     else {
1388         toku_unpin_ftnode(ft, childa);
1389     }
1390 }
1391 
toku_ft_flush_some_child(FT ft,FTNODE parent,struct flusher_advice * fa)1392 void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
1393 // Effect: This function does the following:
1394 //   - Pick a child of parent (the heaviest child),
1395 //   - flush from parent to child,
1396 //   - possibly split/merge child.
1397 //   - if child is gorged, recursively proceed with child
1398 //  Note that parent is already locked
1399 //  Upon exit of this function, parent is unlocked and no new
1400 //  new nodes (such as a child) remain locked
1401 {
1402     int dirtied = 0;
1403     NONLEAF_CHILDINFO bnc = NULL;
1404     paranoid_invariant(parent->height>0);
1405     toku_ftnode_assert_fully_in_memory(parent);
1406     TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1407 
1408     // pick the child we want to flush to
1409     int childnum = fa->pick_child(ft, parent, fa->extra);
1410 
1411     // for test
1412     call_flusher_thread_callback(flt_flush_before_child_pin);
1413 
1414     // get the child into memory
1415     BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
1416     ft->blocktable.verify_blocknum_allocated(targetchild);
1417     uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1418     FTNODE child;
1419     ftnode_fetch_extra bfe;
1420     // Note that we don't read the entire node into memory yet.
1421     // The idea is let's try to do the minimum work before releasing the parent lock
1422     bfe.create_for_min_read(ft);
1423     toku_pin_ftnode_with_dep_nodes(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child, true);
1424 
1425     // for test
1426     call_flusher_thread_callback(ft_flush_aflter_child_pin);
1427 
1428     if (fa->should_destroy_basement_nodes(fa)) {
1429         maybe_destroy_child_blbs(parent, child, ft);
1430     }
1431 
1432     //Note that at this point, we don't have the entire child in.
1433     // Let's do a quick check to see if the child may be reactive
1434     // If the child cannot be reactive, then we can safely unlock
1435     // the parent before finishing reading in the entire child node.
1436     bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1437 
1438     paranoid_invariant(child->blocknum.b!=0);
1439 
1440     // only do the following work if there is a flush to perform
1441     if (toku_bnc_n_entries(BNC(parent, childnum)) > 0 || parent->height == 1) {
1442         if (!parent->dirty()) {
1443             dirtied++;
1444             parent->set_dirty();
1445         }
1446         // detach buffer
1447         BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1448         bnc = BNC(parent, childnum);
1449         NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1450         memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1451         set_BNC(parent, childnum, new_bnc);
1452     }
1453 
1454     //
1455     // at this point, the buffer has been detached from the parent
1456     // and a new empty buffer has been placed in its stead
1457     // so, if we are absolutely sure that the child is not
1458     // reactive, we can unpin the parent
1459     //
1460     if (!may_child_be_reactive) {
1461         toku_unpin_ftnode(ft, parent);
1462         parent = NULL;
1463     }
1464 
1465     //
1466     // now, if necessary, read/decompress the rest of child into memory,
1467     // so that we can proceed and apply the flush
1468     //
1469     bring_node_fully_into_memory(child, ft);
1470 
1471     // It is possible after reading in the entire child,
1472     // that we now know that the child is not reactive
1473     // if so, we can unpin parent right now
1474     // we won't be splitting/merging child
1475     // and we have already replaced the bnc
1476     // for the root with a fresh one
1477     enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
1478     if (parent && child_re == RE_STABLE) {
1479         toku_unpin_ftnode(ft, parent);
1480         parent = NULL;
1481     }
1482 
1483     // from above, we know at this point that either the bnc
1484     // is detached from the parent (which may be unpinned),
1485     // and we have to apply the flush, or there was no data
1486     // in the buffer to flush, and as a result, flushing is not necessary
1487     // and bnc is NULL
1488     if (bnc != NULL) {
1489         if (!child->dirty()) {
1490             dirtied++;
1491             child->set_dirty();
1492         }
1493         // do the actual flush
1494         toku_bnc_flush_to_child(
1495             ft,
1496             bnc,
1497             child,
1498             parent_oldest_referenced_xid_known
1499             );
1500         destroy_nonleaf_childinfo(bnc);
1501     }
1502 
1503     fa->update_status(child, dirtied, fa->extra);
1504     // let's get the reactivity of the child again,
1505     // it is possible that the flush got rid of some values
1506     // and now the parent is no longer reactive
1507     child_re = toku_ftnode_get_reactivity(ft, child);
1508     // if the parent has been unpinned above, then
1509     // this is our only option, even if the child is not stable
1510     // if the child is not stable, we'll handle it the next
1511     // time we need to flush to the child
1512     if (!parent ||
1513         child_re == RE_STABLE ||
1514         (child_re == RE_FUSIBLE && parent->n_children == 1)
1515         )
1516     {
1517         if (parent) {
1518             toku_unpin_ftnode(ft, parent);
1519             parent = NULL;
1520         }
1521         //
1522         // it is the responsibility of toku_ft_flush_some_child to unpin child
1523         //
1524         if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
1525             toku_ft_flush_some_child(ft, child, fa);
1526         }
1527         else {
1528             toku_unpin_ftnode(ft, child);
1529         }
1530     }
1531     else if (child_re == RE_FISSIBLE) {
1532         //
1533         // it is responsibility of `ft_split_child` to unlock nodes of
1534         // parent and child as it sees fit
1535         //
1536         paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1537         ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
1538     }
1539     else if (child_re == RE_FUSIBLE) {
1540         //
1541         // it is responsibility of `maybe_merge_child to unlock nodes of
1542         // parent and child as it sees fit
1543         //
1544         paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1545         fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
1546     }
1547     else {
1548         abort();
1549     }
1550 }
1551 
toku_bnc_flush_to_child(FT ft,NONLEAF_CHILDINFO bnc,FTNODE child,TXNID parent_oldest_referenced_xid_known)1552 void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known) {
1553     paranoid_invariant(bnc);
1554 
1555     TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1556     TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
1557     TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
1558 
1559     txn_manager_state txn_state_for_gc(txn_manager);
1560     bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
1561     if (do_garbage_collection) {
1562         txn_state_for_gc.init();
1563         oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1564     }
1565     txn_gc_info gc_info(&txn_state_for_gc,
1566                         oldest_referenced_xid_for_simple_gc,
1567                         child->oldest_referenced_xid_known,
1568                         true);
1569     struct flush_msg_fn {
1570         FT ft;
1571         FTNODE child;
1572         NONLEAF_CHILDINFO bnc;
1573         txn_gc_info *gc_info;
1574 
1575         STAT64INFO_S stats_delta;
1576         int64_t logical_rows_delta = 0;
1577         size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
1578 
1579         flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
1580             ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
1581             stats_delta = { 0, 0 };
1582         }
1583         int operator()(const ft_msg &msg, bool is_fresh) {
1584             size_t flow_deltas[] = { 0, 0 };
1585             size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
1586             if (remaining_memsize <= bnc->flow[0]) {
1587                 // this message is in the current checkpoint's worth of
1588                 // the end of the message buffer
1589                 flow_deltas[0] = memsize_in_buffer;
1590             } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
1591                 // this message is in the last checkpoint's worth of the
1592                 // end of the message buffer
1593                 flow_deltas[1] = memsize_in_buffer;
1594             }
1595             toku_ftnode_put_msg(
1596                 ft->cmp,
1597                 ft->update_fun,
1598                 child,
1599                 -1,
1600                 msg,
1601                 is_fresh,
1602                 gc_info,
1603                 flow_deltas,
1604                 &stats_delta,
1605                 &logical_rows_delta);
1606             remaining_memsize -= memsize_in_buffer;
1607             return 0;
1608         }
1609     } flush_fn(ft, child, bnc, &gc_info);
1610     bnc->msg_buffer.iterate(flush_fn);
1611 
1612     child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1613 
1614     invariant(flush_fn.remaining_memsize == 0);
1615     if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
1616         toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
1617     }
1618     toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
1619     if (do_garbage_collection) {
1620         size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
1621         // may be misleading if there's a broadcast message in there
1622         toku_ft_status_note_msg_bytes_out(buffsize);
1623     }
1624 }
1625 
1626 static void
update_cleaner_status(FTNODE node,int childnum)1627 update_cleaner_status(
1628     FTNODE node,
1629     int childnum)
1630 {
1631     FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
1632     if (node->height == 1) {
1633         FL_STATUS_VAL(FT_FLUSHER_CLEANER_H1_NODES)++;
1634     } else {
1635         FL_STATUS_VAL(FT_FLUSHER_CLEANER_HGT1_NODES)++;
1636     }
1637 
1638     unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
1639     if (nbytesinbuf == 0) {
1640         FL_STATUS_VAL(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
1641     } else {
1642         if (nbytesinbuf > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
1643             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
1644         }
1645         if (nbytesinbuf < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
1646             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
1647         }
1648         FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
1649 
1650         uint64_t workdone = BP_WORKDONE(node, childnum);
1651         if (workdone > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
1652             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
1653         }
1654         if (workdone < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
1655             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
1656         }
1657         FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
1658     }
1659 }
1660 
1661 static void
dummy_update_status(FTNODE UU (child),int UU (dirtied),void * UU (extra))1662 dummy_update_status(
1663     FTNODE UU(child),
1664     int UU(dirtied),
1665     void* UU(extra)
1666     )
1667 {
1668 }
1669 
1670 static int
dummy_pick_heaviest_child(FT UU (h),FTNODE UU (parent),void * UU (extra))1671 dummy_pick_heaviest_child(FT UU(h),
1672                     FTNODE UU(parent),
1673                     void* UU(extra))
1674 {
1675     abort();
1676     return -1;
1677 }
1678 
toku_ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode)1679 void toku_ft_split_child(
1680     FT ft,
1681     FTNODE node,
1682     int childnum,
1683     FTNODE child,
1684     enum split_mode split_mode
1685     )
1686 {
1687     struct flusher_advice fa;
1688     flusher_advice_init(
1689         &fa,
1690         dummy_pick_heaviest_child,
1691         dont_destroy_basement_nodes,
1692         never_recursively_flush,
1693         default_merge_child,
1694         dummy_update_status,
1695         default_pick_child_after_split,
1696         NULL
1697         );
1698     ft_split_child(
1699         ft,
1700         node,
1701         childnum, // childnum to split
1702         child,
1703         split_mode,
1704         &fa
1705         );
1706 }
1707 
toku_ft_merge_child(FT ft,FTNODE node,int childnum)1708 void toku_ft_merge_child(
1709     FT ft,
1710     FTNODE node,
1711     int childnum
1712     )
1713 {
1714     struct flusher_advice fa;
1715     flusher_advice_init(
1716         &fa,
1717         dummy_pick_heaviest_child,
1718         dont_destroy_basement_nodes,
1719         never_recursively_flush,
1720         default_merge_child,
1721         dummy_update_status,
1722         default_pick_child_after_split,
1723         NULL
1724         );
1725     bool did_react;
1726     ft_merge_child(
1727         ft,
1728         node,
1729         childnum, // childnum to merge
1730         &did_react,
1731         &fa
1732         );
1733 }
1734 
1735 int
toku_ftnode_cleaner_callback(void * ftnode_pv,BLOCKNUM blocknum,uint32_t fullhash,void * extraargs)1736 toku_ftnode_cleaner_callback(
1737     void *ftnode_pv,
1738     BLOCKNUM blocknum,
1739     uint32_t fullhash,
1740     void *extraargs)
1741 {
1742     FTNODE node = (FTNODE) ftnode_pv;
1743     invariant(node->blocknum.b == blocknum.b);
1744     invariant(node->fullhash == fullhash);
1745     invariant(node->height > 0);   // we should never pick a leaf node (for now at least)
1746     FT ft = (FT) extraargs;
1747     bring_node_fully_into_memory(node, ft);
1748     int childnum = find_heaviest_child(node);
1749     update_cleaner_status(node, childnum);
1750 
1751     // Either toku_ft_flush_some_child will unlock the node, or we do it here.
1752     if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
1753         struct flusher_advice fa;
1754         struct flush_status_update_extra fste;
1755         ct_flusher_advice_init(&fa, &fste, ft->h->nodesize);
1756         toku_ft_flush_some_child(ft, node, &fa);
1757     } else {
1758         toku_unpin_ftnode(ft, node);
1759     }
1760     return 0;
1761 }
1762 
1763 struct flusher_extra {
1764     FT ft;
1765     FTNODE node;
1766     NONLEAF_CHILDINFO bnc;
1767     TXNID parent_oldest_referenced_xid_known;
1768 };
1769 
1770 //
1771 // This is the function that gets called by a
1772 // background thread. Its purpose is to complete
1773 // a flush, and possibly do a split/merge.
1774 //
flush_node_fun(void * fe_v)1775 static void flush_node_fun(void *fe_v)
1776 {
1777     toku::context flush_ctx(CTX_FLUSH);
1778     struct flusher_extra* fe = (struct flusher_extra *) fe_v;
1779     // The node that has been placed on the background
1780     // thread may not be fully in memory. Some message
1781     // buffers may be compressed. Before performing
1782     // any operations, we must first make sure
1783     // the node is fully in memory
1784     //
1785     // If we have a bnc, that means fe->node is a child, and we've already
1786     // destroyed its basement nodes if necessary, so we now need to either
1787     // read them back in, or just do the regular partial fetch.  If we
1788     // don't, that means fe->node is a parent, so we need to do this anyway.
1789     bring_node_fully_into_memory(fe->node,fe->ft);
1790     fe->node->set_dirty();
1791 
1792     struct flusher_advice fa;
1793     struct flush_status_update_extra fste;
1794     flt_flusher_advice_init(&fa, &fste, fe->ft->h->nodesize);
1795 
1796     if (fe->bnc) {
1797         // In this case, we have a bnc to flush to a node
1798 
1799         // for test purposes
1800         call_flusher_thread_callback(flt_flush_before_applying_inbox);
1801 
1802         toku_bnc_flush_to_child(
1803             fe->ft,
1804             fe->bnc,
1805             fe->node,
1806             fe->parent_oldest_referenced_xid_known
1807             );
1808         destroy_nonleaf_childinfo(fe->bnc);
1809 
1810         // after the flush has completed, now check to see if the node needs flushing
1811         // If so, call toku_ft_flush_some_child on the node (because this flush intends to
1812         // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
1813         // responsibility of the flush to unlock the node. otherwise, we unlock it here.
1814         if (fe->node->height > 0 && toku_ftnode_nonleaf_is_gorged(fe->node, fe->ft->h->nodesize)) {
1815             toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1816         }
1817         else {
1818             toku_unpin_ftnode(fe->ft,fe->node);
1819         }
1820     }
1821     else {
1822         // In this case, we were just passed a node with no
1823         // bnc, which means we are tasked with flushing some
1824         // buffer in the node.
1825         // It is the responsibility of flush some child to unlock the node
1826         toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1827     }
1828     remove_background_job_from_cf(fe->ft->cf);
1829     toku_free(fe);
1830 }
1831 
1832 static void
place_node_and_bnc_on_background_thread(FT ft,FTNODE node,NONLEAF_CHILDINFO bnc,TXNID parent_oldest_referenced_xid_known)1833 place_node_and_bnc_on_background_thread(
1834     FT ft,
1835     FTNODE node,
1836     NONLEAF_CHILDINFO bnc,
1837     TXNID parent_oldest_referenced_xid_known)
1838 {
1839     struct flusher_extra *XMALLOC(fe);
1840     fe->ft = ft;
1841     fe->node = node;
1842     fe->bnc = bnc;
1843     fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1844     cachefile_kibbutz_enq(ft->cf, flush_node_fun, fe);
1845 }
1846 
1847 //
1848 // This takes as input a gorged, locked,  non-leaf node named parent
1849 // and sets up a flush to be done in the background.
1850 // The flush is setup like this:
1851 //  - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
1852 //  - if we successfully pin the child, and the child does not need to be split or merged
1853 //     then we detach the buffer, place the child and buffer onto a background thread, and
1854 //     have the flush complete in the background, and unlock the parent. The child will be
1855 //     unlocked on the background thread
1856 //  - if any of the above does not happen (child cannot be locked,
1857 //     child needs to be split/merged), then we place the parent on the background thread.
1858 //     The parent will be unlocked on the background thread
1859 //
toku_ft_flush_node_on_background_thread(FT ft,FTNODE parent)1860 void toku_ft_flush_node_on_background_thread(FT ft, FTNODE parent)
1861 {
1862     toku::context flush_ctx(CTX_FLUSH);
1863     TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1864     //
1865     // first let's see if we can detach buffer on client thread
1866     // and pick the child we want to flush to
1867     //
1868     int childnum = find_heaviest_child(parent);
1869     paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
1870     //
1871     // see if we can pin the child
1872     //
1873     FTNODE child;
1874     uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1875     int r = toku_maybe_pin_ftnode_clean(ft, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
1876     if (r != 0) {
1877         // In this case, we could not lock the child, so just place the parent on the background thread
1878         // In the callback, we will use toku_ft_flush_some_child, which checks to
1879         // see if we should blow away the old basement nodes.
1880         place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1881     }
1882     else {
1883         //
1884         // successfully locked child
1885         //
1886         bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1887         if (!may_child_be_reactive) {
1888             // We're going to unpin the parent, so before we do, we must
1889             // check to see if we need to blow away the basement nodes to
1890             // keep the MSN invariants intact.
1891             maybe_destroy_child_blbs(parent, child, ft);
1892 
1893             //
1894             // can detach buffer and unpin root here
1895             //
1896             parent->set_dirty();
1897             BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1898             NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
1899             NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1900             memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1901             set_BNC(parent, childnum, new_bnc);
1902 
1903             //
1904             // at this point, the buffer has been detached from the parent
1905             // and a new empty buffer has been placed in its stead
1906             // so, because we know for sure the child is not
1907             // reactive, we can unpin the parent
1908             //
1909             place_node_and_bnc_on_background_thread(ft, child, bnc, parent_oldest_referenced_xid_known);
1910             toku_unpin_ftnode(ft, parent);
1911         }
1912         else {
1913             // because the child may be reactive, we need to
1914             // put parent on background thread.
1915             // As a result, we unlock the child here.
1916             toku_unpin_ftnode(ft, child);
1917             // Again, we'll have the parent on the background thread, so
1918             // we don't need to destroy the basement nodes yet.
1919             place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1920         }
1921     }
1922 }
1923 
1924 #include <toku_race_tools.h>
1925 void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
1926 void
toku_ft_flusher_helgrind_ignore(void)1927 toku_ft_flusher_helgrind_ignore(void) {
1928     TOKU_VALGRIND_HG_DISABLE_CHECKING(&fl_status, sizeof fl_status);
1929 }
1930