1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include "ft/ft.h"
40 #include "ft/ft-cachetable-wrappers.h"
41 #include "ft/ft-internal.h"
42 #include "ft/ft-flusher.h"
43 #include "ft/ft-flusher-internal.h"
44 #include "ft/node.h"
45 #include "ft/serialize/block_table.h"
46 #include "ft/serialize/ft_node-serialize.h"
47 #include "portability/toku_assert.h"
48 #include "portability/toku_atomic.h"
49 #include "util/status.h"
50 #include "util/context.h"
51 
52 
toku_ft_flusher_get_status(FT_FLUSHER_STATUS status)53 void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
54     fl_status.init();
55     *status = fl_status;
56 }
57 
58 //
59 // For test purposes only.
60 // These callbacks are never used in production code, only as a way
61 //  to test the system (for example, by causing crashes at predictable times).
62 //
63 static void (*flusher_thread_callback)(int, void*) = NULL;
64 static void *flusher_thread_callback_extra = NULL;
65 
toku_flusher_thread_set_callback(void (* callback_f)(int,void *),void * extra)66 void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
67                                       void* extra) {
68     flusher_thread_callback = callback_f;
69     flusher_thread_callback_extra = extra;
70 }
71 
call_flusher_thread_callback(int flt_state)72 static void call_flusher_thread_callback(int flt_state) {
73     if (flusher_thread_callback) {
74         flusher_thread_callback(flt_state, flusher_thread_callback_extra);
75     }
76 }
77 
78 static int
find_heaviest_child(FTNODE node)79 find_heaviest_child(FTNODE node)
80 {
81     int max_child = 0;
82     uint64_t max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
83 
84     invariant(node->n_children > 0);
85     for (int i = 1; i < node->n_children; i++) {
86         uint64_t bytes_in_buf = toku_bnc_nbytesinbuf(BNC(node, i));
87         uint64_t workdone = BP_WORKDONE(node, i);
88         if (workdone > 0) {
89             invariant(bytes_in_buf > 0);
90         }
91         uint64_t this_weight = bytes_in_buf + workdone;
92         if (max_weight < this_weight) {
93             max_child = i;
94             max_weight = this_weight;
95         }
96     }
97     return max_child;
98 }
99 
100 static void
update_flush_status(FTNODE child,int cascades)101 update_flush_status(FTNODE child, int cascades) {
102     FL_STATUS_VAL(FT_FLUSHER_FLUSH_TOTAL)++;
103     if (cascades > 0) {
104         FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES)++;
105         switch (cascades) {
106         case 1:
107             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
108         case 2:
109             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
110         case 3:
111             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
112         case 4:
113             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
114         case 5:
115             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
116         default:
117             FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
118         }
119     }
120     bool flush_needs_io = false;
121     for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
122         if (BP_STATE(child, i) == PT_ON_DISK) {
123             flush_needs_io = true;
124         }
125     }
126     if (flush_needs_io) {
127         FL_STATUS_VAL(FT_FLUSHER_FLUSH_NEEDED_IO)++;
128     } else {
129         FL_STATUS_VAL(FT_FLUSHER_FLUSH_IN_MEMORY)++;
130     }
131 }
132 
133 static void
maybe_destroy_child_blbs(FTNODE node,FTNODE child,FT ft)134 maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT ft)
135 {
136     // If the node is already fully in memory, as in upgrade, we don't
137     // need to destroy the basement nodes because they are all equally
138     // up to date.
139     if (child->n_children > 1 &&
140         child->height == 0 &&
141         !child->dirty()) {
142         for (int i = 0; i < child->n_children; ++i) {
143             if (BP_STATE(child, i) == PT_AVAIL &&
144                 node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn)
145             {
146                 toku_evict_bn_from_memory(child, i, ft);
147             }
148         }
149     }
150 }
151 
152 static void
153 ft_merge_child(
154     FT ft,
155     FTNODE node,
156     int childnum_to_merge,
157     bool *did_react,
158     struct flusher_advice *fa);
159 
160 static int
pick_heaviest_child(FT UU (ft),FTNODE parent,void * UU (extra))161 pick_heaviest_child(FT UU(ft),
162                     FTNODE parent,
163                     void* UU(extra))
164 {
165     int childnum = find_heaviest_child(parent);
166     paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
167     return childnum;
168 }
169 
170 bool
dont_destroy_basement_nodes(void * UU (extra))171 dont_destroy_basement_nodes(void* UU(extra))
172 {
173     return false;
174 }
175 
176 static bool
do_destroy_basement_nodes(void * UU (extra))177 do_destroy_basement_nodes(void* UU(extra))
178 {
179     return true;
180 }
181 
182 bool
always_recursively_flush(FTNODE UU (child),void * UU (extra))183 always_recursively_flush(FTNODE UU(child), void* UU(extra))
184 {
185     return true;
186 }
187 
188 bool
never_recursively_flush(FTNODE UU (child),void * UU (extra))189 never_recursively_flush(FTNODE UU(child), void* UU(extra))
190 {
191     return false;
192 }
193 
194 /**
195  * Flusher thread ("normal" flushing) implementation.
196  */
197 struct flush_status_update_extra {
198     int cascades;
199     uint32_t nodesize;
200 };
201 
202 static bool
recurse_if_child_is_gorged(FTNODE child,void * extra)203 recurse_if_child_is_gorged(FTNODE child, void* extra)
204 {
205     struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
206     return toku_ftnode_nonleaf_is_gorged(child, fste->nodesize);
207 }
208 
209 int
default_pick_child_after_split(FT UU (ft),FTNODE UU (parent),int UU (childnuma),int UU (childnumb),void * UU (extra))210 default_pick_child_after_split(FT UU(ft),
211                                FTNODE UU(parent),
212                                int UU(childnuma),
213                                int UU(childnumb),
214                                void* UU(extra))
215 {
216     return -1;
217 }
218 
219 void
default_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * UU (extra))220 default_merge_child(struct flusher_advice *fa,
221                     FT ft,
222                     FTNODE parent,
223                     int childnum,
224                     FTNODE child,
225                     void* UU(extra))
226 {
227     //
228     // There is probably a way to pass FTNODE child
229     // into ft_merge_child, but for simplicity for now,
230     // we are just going to unpin child and
231     // let ft_merge_child pin it again
232     //
233     toku_unpin_ftnode(ft, child);
234     //
235     //
236     // it is responsibility of ft_merge_child to unlock parent
237     //
238     bool did_react;
239     ft_merge_child(ft, parent, childnum, &did_react, fa);
240 }
241 
242 void
flusher_advice_init(struct flusher_advice * fa,FA_PICK_CHILD pick_child,FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,FA_MAYBE_MERGE_CHILD maybe_merge_child,FA_UPDATE_STATUS update_status,FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,void * extra)243 flusher_advice_init(
244     struct flusher_advice *fa,
245     FA_PICK_CHILD pick_child,
246     FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
247     FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
248     FA_MAYBE_MERGE_CHILD maybe_merge_child,
249     FA_UPDATE_STATUS update_status,
250     FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
251     void* extra
252     )
253 {
254     fa->pick_child = pick_child;
255     fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
256     fa->should_recursively_flush = should_recursively_flush;
257     fa->maybe_merge_child = maybe_merge_child;
258     fa->update_status = update_status;
259     fa->pick_child_after_split = pick_child_after_split;
260     fa->extra = extra;
261 }
262 
263 static void
flt_update_status(FTNODE child,int UU (dirtied),void * extra)264 flt_update_status(FTNODE child,
265                  int UU(dirtied),
266                  void* extra)
267 {
268     struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
269     update_flush_status(child, fste->cascades);
270     // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
271     // cascades to increase.  If not it doesn't matter.
272     fste->cascades++;
273 }
274 
275 static void
flt_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)276 flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
277 {
278     fste->cascades = 0;
279     fste->nodesize = nodesize;
280     flusher_advice_init(fa,
281                         pick_heaviest_child,
282                         dont_destroy_basement_nodes,
283                         recurse_if_child_is_gorged,
284                         default_merge_child,
285                         flt_update_status,
286                         default_pick_child_after_split,
287                         fste);
288 }
289 
290 struct ctm_extra {
291     bool is_last_child;
292     DBT target_key;
293 };
294 
295 static int
ctm_pick_child(FT ft,FTNODE parent,void * extra)296 ctm_pick_child(FT ft,
297                FTNODE parent,
298                void* extra)
299 {
300     struct ctm_extra* ctme = (struct ctm_extra *) extra;
301     int childnum;
302     if (parent->height == 1 && ctme->is_last_child) {
303         childnum = parent->n_children - 1;
304     } else {
305         childnum = toku_ftnode_which_child(parent, &ctme->target_key, ft->cmp);
306     }
307     return childnum;
308 }
309 
310 static void
ctm_update_status(FTNODE UU (child),int dirtied,void * UU (extra))311 ctm_update_status(
312     FTNODE UU(child),
313     int dirtied,
314     void* UU(extra)
315     )
316 {
317     FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
318 }
319 
320 static void
ctm_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)321 ctm_maybe_merge_child(struct flusher_advice *fa,
322                       FT ft,
323                       FTNODE parent,
324                       int childnum,
325                       FTNODE child,
326                       void *extra)
327 {
328     if (child->height == 0) {
329         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
330     }
331     default_merge_child(fa, ft, parent, childnum, child, extra);
332 }
333 
334 static void
ct_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)335 ct_maybe_merge_child(struct flusher_advice *fa,
336                      FT ft,
337                      FTNODE parent,
338                      int childnum,
339                      FTNODE child,
340                      void* extra)
341 {
342     if (child->height > 0) {
343         default_merge_child(fa, ft, parent, childnum, child, extra);
344     }
345     else {
346         struct ctm_extra ctme;
347         paranoid_invariant(parent->n_children > 1);
348         int pivot_to_save;
349         //
350         // we have two cases, one where the childnum
351         // is the last child, and therefore the pivot we
352         // save is not of the pivot which we wish to descend
353         // and another where it is not the last child,
354         // so the pivot is sufficient for identifying the leaf
355         // to be merged
356         //
357         if (childnum == (parent->n_children - 1)) {
358             ctme.is_last_child = true;
359             pivot_to_save = childnum - 1;
360         }
361         else {
362             ctme.is_last_child = false;
363             pivot_to_save = childnum;
364         }
365         toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
366 
367         // at this point, ctme is properly setup, now we can do the merge
368         struct flusher_advice new_fa;
369         flusher_advice_init(
370             &new_fa,
371             ctm_pick_child,
372             dont_destroy_basement_nodes,
373             always_recursively_flush,
374             ctm_maybe_merge_child,
375             ctm_update_status,
376             default_pick_child_after_split,
377             &ctme);
378 
379         toku_unpin_ftnode(ft, parent);
380         toku_unpin_ftnode(ft, child);
381 
382         FTNODE root_node = NULL;
383         {
384             uint32_t fullhash;
385             CACHEKEY root;
386             toku_calculate_root_offset_pointer(ft, &root, &fullhash);
387             ftnode_fetch_extra bfe;
388             bfe.create_for_full_read(ft);
389             toku_pin_ftnode(ft, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, &root_node, true);
390             toku_ftnode_assert_fully_in_memory(root_node);
391         }
392 
393         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
394         (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
395 
396         toku_ft_flush_some_child(ft, root_node, &new_fa);
397 
398         (void) toku_sync_fetch_and_sub(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
399 
400         toku_destroy_dbt(&ctme.target_key);
401     }
402 }
403 
404 static void
ct_update_status(FTNODE child,int dirtied,void * extra)405 ct_update_status(FTNODE child,
406                  int dirtied,
407                  void* extra)
408 {
409     struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
410     update_flush_status(child, fste->cascades);
411     FL_STATUS_VAL(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
412     // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
413     fste->cascades++;
414 }
415 
416 static void
ct_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)417 ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
418 {
419     fste->cascades = 0;
420     fste->nodesize = nodesize;
421     flusher_advice_init(fa,
422                         pick_heaviest_child,
423                         do_destroy_basement_nodes,
424                         recurse_if_child_is_gorged,
425                         ct_maybe_merge_child,
426                         ct_update_status,
427                         default_pick_child_after_split,
428                         fste);
429 }
430 
431 //
432 // This returns true if the node MAY be reactive,
433 // false is we are absolutely sure that it is NOT reactive.
434 // The reason for inaccuracy is that the node may be
435 // a leaf node that is not entirely in memory. If so, then
436 // we cannot be sure if the node is reactive.
437 //
ft_ftnode_may_be_reactive(FT ft,FTNODE node)438 static bool ft_ftnode_may_be_reactive(FT ft, FTNODE node)
439 {
440     if (node->height == 0) {
441         return true;
442     } else {
443         return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout) != RE_STABLE;
444     }
445 }
446 
447 /* NODE is a node with a child.
448  * childnum was split into two nodes childa, and childb.  childa is the same as the original child.  childb is a new child.
449  * We must slide things around, & move things from the old table to the new tables.
450  * Requires: the CHILDNUMth buffer of node is empty.
451  * We don't push anything down to children.  We split the node, and things land wherever they land.
452  * We must delete the old buffer (but the old child is already deleted.)
453  * On return, the new children and node STAY PINNED.
454  */
455 static void
handle_split_of_child(FT ft,FTNODE node,int childnum,FTNODE childa,FTNODE childb,DBT * splitk)456 handle_split_of_child(
457     FT ft,
458     FTNODE node,
459     int childnum,
460     FTNODE childa,
461     FTNODE childb,
462     DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
463     )
464 {
465     paranoid_invariant(node->height>0);
466     paranoid_invariant(0 <= childnum);
467     paranoid_invariant(childnum < node->n_children);
468     toku_ftnode_assert_fully_in_memory(node);
469     toku_ftnode_assert_fully_in_memory(childa);
470     toku_ftnode_assert_fully_in_memory(childb);
471     NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
472     paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
473     WHEN_NOT_GCOV(
474         if (toku_ft_debug_mode) {
475             printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
476             printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
477             for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
478             printf("\n");
479         }
480     )
481 
482     node->set_dirty();
483 
484     XREALLOC_N(node->n_children+1, node->bp);
485     // Slide the children over.
486     // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
487     // this moves node->bp[6] through node->bp[9] over to
488     // node->bp[7] through node->bp[10]
489     for (int cnum=node->n_children; cnum>childnum+1; cnum--) {
490         node->bp[cnum] = node->bp[cnum-1];
491     }
492     memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
493     node->n_children++;
494 
495     paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->blocknum.b); // use the same child
496 
497     // We never set the rightmost blocknum to be the root.
498     // Instead, we wait for the root to split and let promotion initialize the rightmost
499     // blocknum to be the first non-root leaf node on the right extreme to receive an insert.
500     BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
501     invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
502     if (childa->blocknum.b == rightmost_blocknum.b) {
503         // The rightmost leaf (a) split into (a) and (b). We want (b) to swap pair values
504         // with (a), now that it is the new rightmost leaf. This keeps the rightmost blocknum
505         // constant, the same the way we keep the root blocknum constant.
506         toku_ftnode_swap_pair_values(childa, childb);
507         BP_BLOCKNUM(node, childnum) = childa->blocknum;
508     }
509 
510     BP_BLOCKNUM(node, childnum+1) = childb->blocknum;
511     BP_WORKDONE(node, childnum+1) = 0;
512     BP_STATE(node,childnum+1) = PT_AVAIL;
513 
514     NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
515     for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
516         // just split the flows in half for now, can't guess much better
517         // at the moment
518         new_bnc->flow[i] = old_bnc->flow[i] / 2;
519         old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
520     }
521     set_BNC(node, childnum+1, new_bnc);
522 
523     // Insert the new split key , sliding the other keys over
524     node->pivotkeys.insert_at(splitk, childnum);
525 
526     WHEN_NOT_GCOV(
527         if (toku_ft_debug_mode) {
528             printf("%s:%d splitkeys:", __FILE__, __LINE__);
529             for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
530             printf("\n");
531         }
532     )
533 
534     /* Keep pushing to the children, but not if the children would require a pushdown */
535     toku_ftnode_assert_fully_in_memory(node);
536     toku_ftnode_assert_fully_in_memory(childa);
537     toku_ftnode_assert_fully_in_memory(childb);
538 
539     VERIFY_NODE(t, node);
540     VERIFY_NODE(t, childa);
541     VERIFY_NODE(t, childb);
542 }
543 
544 static void
verify_all_in_mempool(FTNODE UU ()node)545 verify_all_in_mempool(FTNODE UU() node)
546 {
547 #ifdef TOKU_DEBUG_PARANOID
548     if (node->height==0) {
549         for (int i = 0; i < node->n_children; i++) {
550             invariant(BP_STATE(node,i) == PT_AVAIL);
551             BLB_DATA(node, i)->verify_mempool();
552         }
553     }
554 #endif
555 }
556 
557 static uint64_t
ftleaf_disk_size(FTNODE node)558 ftleaf_disk_size(FTNODE node)
559 // Effect: get the disk size of a leafentry
560 {
561     paranoid_invariant(node->height == 0);
562     toku_ftnode_assert_fully_in_memory(node);
563     uint64_t retval = 0;
564     for (int i = 0; i < node->n_children; i++) {
565         retval += BLB_DATA(node, i)->get_disk_size();
566     }
567     return retval;
568 }
569 
570 static void
ftleaf_get_split_loc(FTNODE node,enum split_mode split_mode,int * num_left_bns,int * num_left_les)571 ftleaf_get_split_loc(
572     FTNODE node,
573     enum split_mode split_mode,
574     int *num_left_bns,   // which basement within leaf
575     int *num_left_les    // which key within basement
576     )
577 // Effect: Find the location within a leaf node where we want to perform a split
578 // num_left_bns is how many basement nodes (which OMT) should be split to the left.
579 // num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
580 {
581     switch (split_mode) {
582     case SPLIT_LEFT_HEAVY: {
583         *num_left_bns = node->n_children;
584         *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
585         if (*num_left_les == 0) {
586             *num_left_bns = node->n_children - 1;
587             *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
588         }
589         goto exit;
590     }
591     case SPLIT_RIGHT_HEAVY: {
592         *num_left_bns = 1;
593         *num_left_les = BLB_DATA(node, 0)->num_klpairs() ? 1 : 0;
594         goto exit;
595     }
596     case SPLIT_EVENLY: {
597         paranoid_invariant(node->height == 0);
598         // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
599         uint64_t sumlesizes = ftleaf_disk_size(node);
600         uint32_t size_so_far = 0;
601         for (int i = 0; i < node->n_children; i++) {
602             bn_data* bd = BLB_DATA(node, i);
603             uint32_t n_leafentries = bd->num_klpairs();
604             for (uint32_t j=0; j < n_leafentries; j++) {
605                 size_t size_this_le;
606                 int rr = bd->fetch_klpair_disksize(j, &size_this_le);
607                 invariant_zero(rr);
608                 size_so_far += size_this_le;
609                 if (size_so_far >= sumlesizes/2) {
610                     *num_left_bns = i + 1;
611                     *num_left_les = j + 1;
612                     if (*num_left_bns == node->n_children &&
613                         (unsigned int) *num_left_les == n_leafentries) {
614                         // need to correct for when we're splitting after the
615                         // last element, that makes no sense
616                         if (*num_left_les > 1) {
617                             (*num_left_les)--;
618                         } else if (*num_left_bns > 1) {
619                             (*num_left_bns)--;
620                             *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
621                         } else {
622                             // we are trying to split a leaf with only one
623                             // leafentry in it
624                             abort();
625                         }
626                     }
627                     goto exit;
628                 }
629             }
630         }
631     }
632     }
633     abort();
634 exit:
635     return;
636 }
637 
638 static void
move_leafentries(BASEMENTNODE dest_bn,BASEMENTNODE src_bn,uint32_t lbi,uint32_t ube)639 move_leafentries(
640     BASEMENTNODE dest_bn,
641     BASEMENTNODE src_bn,
642     uint32_t lbi, //lower bound inclusive
643     uint32_t ube //upper bound exclusive
644     )
645 //Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
646 {
647     invariant(ube == src_bn->data_buffer.num_klpairs());
648     src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
649 }
650 
ftnode_finalize_split(FTNODE node,FTNODE B,MSN max_msn_applied_to_node)651 static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
652 // Effect: Finalizes a split by updating some bits and dirtying both nodes
653     toku_ftnode_assert_fully_in_memory(node);
654     toku_ftnode_assert_fully_in_memory(B);
655     verify_all_in_mempool(node);
656     verify_all_in_mempool(B);
657 
658     node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
659     B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
660 
661     // The new node in the split inherits the oldest known reference xid
662     B->oldest_referenced_xid_known = node->oldest_referenced_xid_known;
663 
664     node->set_dirty();
665     B->set_dirty();
666 }
667 
668 void
ftleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,bool create_new_node,enum split_mode split_mode,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)669 ftleaf_split(
670     FT ft,
671     FTNODE node,
672     FTNODE *nodea,
673     FTNODE *nodeb,
674     DBT *splitk,
675     bool create_new_node,
676     enum split_mode split_mode,
677     uint32_t num_dependent_nodes,
678     FTNODE* dependent_nodes)
679 // Effect: Split a leaf node.
680 // Argument "node" is node to be split.
681 // Upon return:
682 //   nodea and nodeb point to new nodes that result from split of "node"
683 //   nodea is the left node that results from the split
684 //   splitk is the right-most key of nodea
685 {
686 
687     paranoid_invariant(node->height == 0);
688     FL_STATUS_VAL(FT_FLUSHER_SPLIT_LEAF)++;
689     if (node->n_children) {
690         // First move all the accumulated stat64info deltas into the first basement.
691         // After the split, either both nodes or neither node will be included in the next checkpoint.
692         // The accumulated stats in the dictionary will be correct in either case.
693         // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
694         // correct information for a basement that is divided between two leafnodes (i.e. when split is
695         // not on a basement boundary).
696         STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
697         BASEMENTNODE bn = BLB(node,0);
698         bn->stat64_delta = delta_for_leafnode;
699     }
700 
701 
702     FTNODE B = nullptr;
703     uint32_t fullhash;
704     BLOCKNUM name;
705 
706     if (create_new_node) {
707         // put value in cachetable and do checkpointing
708         // of dependent nodes
709         //
710         // We do this here, before evaluating the last_bn_on_left
711         // and last_le_on_left_within_bn because this operation
712         // may write to disk the dependent nodes.
713         // While doing so, we may rebalance the leaf node
714         // we are splitting, thereby invalidating the
715         // values of last_bn_on_left and last_le_on_left_within_bn.
716         // So, we must call this before evaluating
717         // those two values
718         cachetable_put_empty_node_with_dep_nodes(
719             ft,
720             num_dependent_nodes,
721             dependent_nodes,
722             &name,
723             &fullhash,
724             &B
725             );
726         // GCC 4.8 seems to get confused and think B is maybe uninitialized at link time.
727         // TODO(leif): figure out why it thinks this and actually fix it.
728         invariant_notnull(B);
729     }
730 
731 
732     paranoid_invariant(node->height==0);
733     toku_ftnode_assert_fully_in_memory(node);
734     verify_all_in_mempool(node);
735     MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
736 
737     // variables that say where we will do the split.
738     // After the split, there will be num_left_bns basement nodes in the left node,
739     // and the last basement node in the left node will have num_left_les leafentries.
740     int num_left_bns;
741     int num_left_les;
742     ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
743     {
744         // did we split right on the boundary between basement nodes?
745         const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) BLB_DATA(node, num_left_bns - 1)->num_klpairs());
746         // Now we know where we are going to break it
747         // the two nodes will have a total of n_children+1 basement nodes
748         // and n_children-1 pivots
749         // the left node, node, will have last_bn_on_left+1 basement nodes
750         // the right node, B, will have n_children-last_bn_on_left basement nodes
751         // the pivots of node will be the first last_bn_on_left pivots that originally exist
752         // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist
753 
754         // Note: The basements will not be rebalanced.  Only the mempool of the basement that is split
755         //       (if split_on_boundary is false) will be affected.  All other mempools will remain intact. ???
756 
757         //set up the basement nodes in the new node
758         int num_children_in_node = num_left_bns;
759         // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
760         // while it's not on the boundary, we do need node->n_children
761         // children in B.
762         int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
763         if (num_children_in_b == 0) {
764             // for uneven split, make sure we have at least 1 bn
765             paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
766             num_children_in_b = 1;
767         }
768         paranoid_invariant(num_children_in_node > 0);
769         if (create_new_node) {
770             toku_initialize_empty_ftnode(
771                 B,
772                 name,
773                 0,
774                 num_children_in_b,
775                 ft->h->layout_version,
776                 ft->h->flags);
777             B->fullhash = fullhash;
778         }
779         else {
780             B = *nodeb;
781             REALLOC_N(num_children_in_b,   B->bp);
782             B->n_children = num_children_in_b;
783             for (int i = 0; i < num_children_in_b; i++) {
784                 BP_BLOCKNUM(B,i).b = 0;
785                 BP_STATE(B,i) = PT_AVAIL;
786                 BP_WORKDONE(B,i) = 0;
787                 set_BLB(B, i, toku_create_empty_bn());
788             }
789         }
790 
791         // now move all the data
792 
793         int curr_src_bn_index = num_left_bns - 1;
794         int curr_dest_bn_index = 0;
795 
796         // handle the move of a subset of data in last_bn_on_left from node to B
797         if (!split_on_boundary) {
798             BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
799             destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
800             set_BNULL(B, curr_dest_bn_index);
801             set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
802             move_leafentries(BLB(B, curr_dest_bn_index),
803                              BLB(node, curr_src_bn_index),
804                              num_left_les,         // first row to be moved to B
805                              BLB_DATA(node, curr_src_bn_index)->num_klpairs()  // number of rows in basement to be split
806                              );
807             BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
808             curr_dest_bn_index++;
809         }
810         curr_src_bn_index++;
811 
812         paranoid_invariant(B->n_children >= curr_dest_bn_index);
813         paranoid_invariant(node->n_children >= curr_src_bn_index);
814 
815         // move the rest of the basement nodes
816         for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
817             destroy_basement_node(BLB(B, curr_dest_bn_index));
818             set_BNULL(B, curr_dest_bn_index);
819             B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
820         }
821         if (curr_dest_bn_index < B->n_children) {
822             // B already has an empty basement node here.
823             BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
824         }
825 
826         //
827         // now handle the pivots
828         //
829 
830         // the child index in the original node that corresponds to the
831         // first node in the right node of the split
832         int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
833         node->pivotkeys.split_at(split_idx, &B->pivotkeys);
834         if (split_on_boundary && num_left_bns < node->n_children && splitk) {
835             toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
836         } else if (splitk) {
837             bn_data* bd = BLB_DATA(node, num_left_bns - 1);
838             uint32_t keylen;
839             void *key;
840             int rr = bd->fetch_key_and_len(bd->num_klpairs() - 1, &keylen, &key);
841             invariant_zero(rr);
842             toku_memdup_dbt(splitk, key, keylen);
843         }
844 
845         node->n_children = num_children_in_node;
846         REALLOC_N(num_children_in_node, node->bp);
847     }
848 
849     ftnode_finalize_split(node, B, max_msn_applied_to_node);
850     *nodea = node;
851     *nodeb = B;
852 }    // end of ftleaf_split()
853 
854 void
ft_nonleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)855 ft_nonleaf_split(
856     FT ft,
857     FTNODE node,
858     FTNODE *nodea,
859     FTNODE *nodeb,
860     DBT *splitk,
861     uint32_t num_dependent_nodes,
862     FTNODE* dependent_nodes)
863 {
864     //VERIFY_NODE(t,node);
865     FL_STATUS_VAL(FT_FLUSHER_SPLIT_NONLEAF)++;
866     toku_ftnode_assert_fully_in_memory(node);
867     int old_n_children = node->n_children;
868     int n_children_in_a = old_n_children/2;
869     int n_children_in_b = old_n_children-n_children_in_a;
870     MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
871     FTNODE B;
872     paranoid_invariant(node->height>0);
873     paranoid_invariant(node->n_children>=2); // Otherwise, how do we split?	 We need at least two children to split. */
874     create_new_ftnode_with_dep_nodes(ft, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
875     {
876         /* The first n_children_in_a go into node a.
877          * That means that the first n_children_in_a-1 keys go into node a.
878          * The splitter key is key number n_children_in_a */
879         for (int i = n_children_in_a; i<old_n_children; i++) {
880             int targchild = i-n_children_in_a;
881             // TODO: Figure out better way to handle this
882             // the problem is that create_new_ftnode_with_dep_nodes for B creates
883             // all the data structures, whereas we really don't want it to fill
884             // in anything for the bp's.
885             // Now we have to go free what it just created so we can
886             // slide the bp over
887             destroy_nonleaf_childinfo(BNC(B, targchild));
888             // now move the bp over
889             B->bp[targchild] = node->bp[i];
890             memset(&node->bp[i], 0, sizeof(node->bp[0]));
891         }
892 
893         // the split key for our parent is the rightmost pivot key in node
894         node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
895         toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
896         node->pivotkeys.delete_at(n_children_in_a - 1);
897 
898         node->n_children = n_children_in_a;
899         REALLOC_N(node->n_children, node->bp);
900     }
901 
902     ftnode_finalize_split(node, B, max_msn_applied_to_node);
903     *nodea = node;
904     *nodeb = B;
905 }
906 
907 //
908 // responsibility of ft_split_child is to take locked FTNODEs node and child
909 // and do the following:
910 //  - split child,
911 //  - fix node,
912 //  - release lock on node
913 //  - possibly flush either new children created from split, otherwise unlock children
914 //
915 static void
ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode,struct flusher_advice * fa)916 ft_split_child(
917     FT ft,
918     FTNODE node,
919     int childnum,
920     FTNODE child,
921     enum split_mode split_mode,
922     struct flusher_advice *fa)
923 {
924     paranoid_invariant(node->height>0);
925     paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
926     FTNODE nodea, nodeb;
927     DBT splitk;
928 
929     // for test
930     call_flusher_thread_callback(flt_flush_before_split);
931 
932     FTNODE dep_nodes[2];
933     dep_nodes[0] = node;
934     dep_nodes[1] = child;
935     if (child->height==0) {
936         ftleaf_split(ft, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
937     } else {
938         ft_nonleaf_split(ft, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
939     }
940     // printf("%s:%d child did split\n", __FILE__, __LINE__);
941     handle_split_of_child (ft, node, childnum, nodea, nodeb, &splitk);
942 
943     // for test
944     call_flusher_thread_callback(flt_flush_during_split);
945 
946     // at this point, the split is complete
947     // now we need to unlock node,
948     // and possibly continue
949     // flushing one of the children
950     int picked_child = fa->pick_child_after_split(ft, node, childnum, childnum + 1, fa->extra);
951     toku_unpin_ftnode(ft, node);
952     if (picked_child == childnum ||
953         (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
954         toku_unpin_ftnode(ft, nodeb);
955         toku_ft_flush_some_child(ft, nodea, fa);
956     }
957     else if (picked_child == childnum + 1 ||
958              (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
959         toku_unpin_ftnode(ft, nodea);
960         toku_ft_flush_some_child(ft, nodeb, fa);
961     }
962     else {
963         toku_unpin_ftnode(ft, nodea);
964         toku_unpin_ftnode(ft, nodeb);
965     }
966 
967     toku_destroy_dbt(&splitk);
968 }
969 
bring_node_fully_into_memory(FTNODE node,FT ft)970 static void bring_node_fully_into_memory(FTNODE node, FT ft) {
971     if (!toku_ftnode_fully_in_memory(node)) {
972         ftnode_fetch_extra bfe;
973         bfe.create_for_full_read(ft);
974         toku_cachetable_pf_pinned_pair(
975             node,
976             toku_ftnode_pf_callback,
977             &bfe,
978             ft->cf,
979             node->blocknum,
980             toku_cachetable_hash(ft->cf, node->blocknum)
981             );
982     }
983 }
984 
985 static void
flush_this_child(FT ft,FTNODE node,FTNODE child,int childnum,struct flusher_advice * fa)986 flush_this_child(
987     FT ft,
988     FTNODE node,
989     FTNODE child,
990     int childnum,
991     struct flusher_advice *fa)
992 // Effect: Push everything in the CHILDNUMth buffer of node down into the child.
993 {
994     update_flush_status(child, 0);
995     toku_ftnode_assert_fully_in_memory(node);
996     if (fa->should_destroy_basement_nodes(fa)) {
997         maybe_destroy_child_blbs(node, child, ft);
998     }
999     bring_node_fully_into_memory(child, ft);
1000     toku_ftnode_assert_fully_in_memory(child);
1001     paranoid_invariant(node->height>0);
1002     paranoid_invariant(child->blocknum.b!=0);
1003     // VERIFY_NODE does not work off client thread as of now
1004     //VERIFY_NODE(t, child);
1005     node->set_dirty();
1006     child->set_dirty();
1007 
1008     BP_WORKDONE(node, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1009     NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1010     set_BNC(node, childnum, toku_create_empty_nl());
1011 
1012     // now we have a bnc to flush to the child. pass down the parent's
1013     // oldest known referenced xid as we flush down to the child.
1014     toku_bnc_flush_to_child(ft, bnc, child, node->oldest_referenced_xid_known);
1015     destroy_nonleaf_childinfo(bnc);
1016 }
1017 
1018 static void
merge_leaf_nodes(FTNODE a,FTNODE b)1019 merge_leaf_nodes(FTNODE a, FTNODE b)
1020 {
1021     FL_STATUS_VAL(FT_FLUSHER_MERGE_LEAF)++;
1022     toku_ftnode_assert_fully_in_memory(a);
1023     toku_ftnode_assert_fully_in_memory(b);
1024     paranoid_invariant(a->height == 0);
1025     paranoid_invariant(b->height == 0);
1026     paranoid_invariant(a->n_children > 0);
1027     paranoid_invariant(b->n_children > 0);
1028 
1029     // Mark nodes as dirty before moving basements from b to a.
1030     // This way, whatever deltas are accumulated in the basements are
1031     // applied to the in_memory_stats in the header if they have not already
1032     // been (if nodes are clean).
1033     // TODO(leif): this is no longer the way in_memory_stats is
1034     // maintained. verify that it's ok to move this just before the unpin
1035     // and then do that.
1036     a->set_dirty();
1037     b->set_dirty();
1038 
1039     bn_data* a_last_bd = BLB_DATA(a, a->n_children-1);
1040     // this bool states if the last basement node in a has any items or not
1041     // If it does, then it stays in the merge. If it does not, the last basement node
1042     // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
1043     const bool a_has_tail = a_last_bd->num_klpairs() > 0;
1044 
1045     int num_children = a->n_children + b->n_children;
1046     if (!a_has_tail) {
1047         int lastchild = a->n_children - 1;
1048         BASEMENTNODE bn = BLB(a, lastchild);
1049 
1050         // verify that last basement in a is empty, then destroy mempool
1051         size_t used_space = a_last_bd->get_disk_size();
1052         invariant_zero(used_space);
1053         destroy_basement_node(bn);
1054         set_BNULL(a, lastchild);
1055         num_children--;
1056         if (lastchild < a->pivotkeys.num_pivots()) {
1057             a->pivotkeys.delete_at(lastchild);
1058         }
1059     } else {
1060         // fill in pivot for what used to be max of node 'a', if it is needed
1061         uint32_t keylen;
1062         void *key;
1063         int r = a_last_bd->fetch_key_and_len(a_last_bd->num_klpairs() - 1, &keylen, &key);
1064         invariant_zero(r);
1065         DBT pivotkey;
1066         toku_fill_dbt(&pivotkey, key, keylen);
1067         a->pivotkeys.replace_at(&pivotkey, a->n_children - 1);
1068     }
1069 
1070     // realloc basement nodes in `a'
1071     REALLOC_N(num_children, a->bp);
1072 
1073     // move each basement node from b to a
1074     uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
1075     for (int i = 0; i < b->n_children; i++) {
1076         a->bp[i + offset] = b->bp[i];
1077         memset(&b->bp[i], 0, sizeof(b->bp[0]));
1078     }
1079 
1080     // append b's pivots to a's pivots
1081     a->pivotkeys.append(b->pivotkeys);
1082 
1083     // now that all the data has been moved from b to a, we can destroy the data in b
1084     a->n_children = num_children;
1085     b->pivotkeys.destroy();
1086     b->n_children = 0;
1087 }
1088 
balance_leaf_nodes(FTNODE a,FTNODE b,DBT * splitk)1089 static void balance_leaf_nodes(
1090     FTNODE a,
1091     FTNODE b,
1092     DBT *splitk)
1093 // Effect:
1094 //  If b is bigger then move stuff from b to a until b is the smaller.
1095 //  If a is bigger then move stuff from a to b until a is the smaller.
1096 {
1097     FL_STATUS_VAL(FT_FLUSHER_BALANCE_LEAF)++;
1098     // first merge all the data into a
1099     merge_leaf_nodes(a,b);
1100     // now split them
1101     // because we are not creating a new node, we can pass in no dependent nodes
1102     ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
1103 }
1104 
1105 static void
maybe_merge_pinned_leaf_nodes(FTNODE a,FTNODE b,const DBT * parent_splitk,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1106 maybe_merge_pinned_leaf_nodes(
1107     FTNODE a,
1108     FTNODE b,
1109     const DBT *parent_splitk,
1110     bool *did_merge,
1111     bool *did_rebalance,
1112     DBT *splitk,
1113     uint32_t nodesize
1114     )
1115 // Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
1116 //	   (We do this if the resulting node is not fissible)
1117 //	   or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
1118 //	   (If a and be are already evenly distributed, we may do nothing.)
1119 {
1120     unsigned int sizea = toku_serialize_ftnode_size(a);
1121     unsigned int sizeb = toku_serialize_ftnode_size(b);
1122     uint32_t num_leafentries = toku_ftnode_leaf_num_entries(a) + toku_ftnode_leaf_num_entries(b);
1123     if (num_leafentries > 1 && (sizea + sizeb)*4 > (nodesize*3)) {
1124         // the combined size is more than 3/4 of a node, so don't merge them.
1125         *did_merge = false;
1126         if (sizea*4 > nodesize && sizeb*4 > nodesize) {
1127             // no need to do anything if both are more than 1/4 of a node.
1128             *did_rebalance = false;
1129             toku_clone_dbt(splitk, *parent_splitk);
1130             return;
1131         }
1132         // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
1133         *did_rebalance = true;
1134         balance_leaf_nodes(a, b, splitk);
1135     } else {
1136         // we are merging them.
1137         *did_merge = true;
1138         *did_rebalance = false;
1139         toku_init_dbt(splitk);
1140         merge_leaf_nodes(a, b);
1141     }
1142 }
1143 
1144 static void
maybe_merge_pinned_nonleaf_nodes(const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk)1145 maybe_merge_pinned_nonleaf_nodes(
1146     const DBT *parent_splitk,
1147     FTNODE a,
1148     FTNODE b,
1149     bool *did_merge,
1150     bool *did_rebalance,
1151     DBT *splitk)
1152 {
1153     toku_ftnode_assert_fully_in_memory(a);
1154     toku_ftnode_assert_fully_in_memory(b);
1155     invariant_notnull(parent_splitk->data);
1156 
1157     int old_n_children = a->n_children;
1158     int new_n_children = old_n_children + b->n_children;
1159 
1160     XREALLOC_N(new_n_children, a->bp);
1161     memcpy(a->bp + old_n_children, b->bp, b->n_children * sizeof(b->bp[0]));
1162     memset(b->bp, 0, b->n_children * sizeof(b->bp[0]));
1163 
1164     a->pivotkeys.insert_at(parent_splitk, old_n_children - 1);
1165     a->pivotkeys.append(b->pivotkeys);
1166     a->n_children = new_n_children;
1167     b->n_children = 0;
1168 
1169     a->set_dirty();
1170     b->set_dirty();
1171 
1172     *did_merge = true;
1173     *did_rebalance = false;
1174     toku_init_dbt(splitk);
1175 
1176     FL_STATUS_VAL(FT_FLUSHER_MERGE_NONLEAF)++;
1177 }
1178 
1179 static void
maybe_merge_pinned_nodes(FTNODE parent,const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1180 maybe_merge_pinned_nodes(
1181     FTNODE parent,
1182     const DBT *parent_splitk,
1183     FTNODE a,
1184     FTNODE b,
1185     bool *did_merge,
1186     bool *did_rebalance,
1187     DBT *splitk,
1188     uint32_t nodesize
1189     )
1190 // Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
1191 //	   (We do this if the resulting node is not fissible)
1192 //	   or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
1193 //	   (If a and be are already evenly distributed, we may do nothing.)
1194 //  If we distribute:
1195 //    For leaf nodes, we distribute the leafentries evenly.
1196 //    For nonleaf nodes, we distribute the children evenly.  That may leave one or both of the nodes overfull, but that's OK.
1197 //  If we distribute, we set *splitk to a malloced pivot key.
1198 // Parameters:
1199 //  t			The FT.
1200 //  parent		The parent of the two nodes to be split.
1201 //  parent_splitk	The pivot key between a and b.	 This is either free()'d or returned in *splitk.
1202 //  a			The first node to merge.
1203 //  b			The second node to merge.
1204 //  logger		The logger.
1205 //  did_merge		(OUT):	Did the two nodes actually get merged?
1206 //  splitk		(OUT):	If the two nodes did not get merged, the new pivot key between the two nodes.
1207 {
1208     MSN msn_max;
1209     paranoid_invariant(a->height == b->height);
1210     toku_ftnode_assert_fully_in_memory(parent);
1211     toku_ftnode_assert_fully_in_memory(a);
1212     toku_ftnode_assert_fully_in_memory(b);
1213     parent->set_dirty();   // just to make sure
1214     {
1215         MSN msna = a->max_msn_applied_to_node_on_disk;
1216         MSN msnb = b->max_msn_applied_to_node_on_disk;
1217         msn_max = (msna.msn > msnb.msn) ? msna : msnb;
1218     }
1219     if (a->height == 0) {
1220         maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
1221     } else {
1222         maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
1223     }
1224     if (*did_merge || *did_rebalance) {
1225         // accurate for leaf nodes because all msgs above have been
1226         // applied, accurate for non-leaf nodes because buffer immediately
1227         // above each node has been flushed
1228         a->max_msn_applied_to_node_on_disk = msn_max;
1229         b->max_msn_applied_to_node_on_disk = msn_max;
1230     }
1231 }
1232 
merge_remove_key_callback(BLOCKNUM * bp,bool for_checkpoint,void * extra)1233 static void merge_remove_key_callback(BLOCKNUM *bp, bool for_checkpoint, void *extra) {
1234     FT ft = (FT) extra;
1235     ft->blocktable.free_blocknum(bp, ft, for_checkpoint);
1236 }
1237 
1238 //
1239 // Takes as input a locked node and a childnum_to_merge
1240 // As output, two of node's children are merged or rebalanced, and node is unlocked
1241 //
1242 static void
ft_merge_child(FT ft,FTNODE node,int childnum_to_merge,bool * did_react,struct flusher_advice * fa)1243 ft_merge_child(
1244     FT ft,
1245     FTNODE node,
1246     int childnum_to_merge,
1247     bool *did_react,
1248     struct flusher_advice *fa)
1249 {
1250     // this function should not be called
1251     // if the child is not mergable
1252     paranoid_invariant(node->n_children > 1);
1253     toku_ftnode_assert_fully_in_memory(node);
1254 
1255     int childnuma,childnumb;
1256     if (childnum_to_merge > 0) {
1257         childnuma = childnum_to_merge-1;
1258         childnumb = childnum_to_merge;
1259     } else {
1260         childnuma = childnum_to_merge;
1261         childnumb = childnum_to_merge+1;
1262     }
1263     paranoid_invariant(0 <= childnuma);
1264     paranoid_invariant(childnuma+1 == childnumb);
1265     paranoid_invariant(childnumb < node->n_children);
1266 
1267     paranoid_invariant(node->height>0);
1268 
1269     // We suspect that at least one of the children is fusible, but they might not be.
1270     // for test
1271     call_flusher_thread_callback(flt_flush_before_merge);
1272 
1273     FTNODE childa, childb;
1274     {
1275         uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnuma);
1276         ftnode_fetch_extra bfe;
1277         bfe.create_for_full_read(ft);
1278         toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa, true);
1279     }
1280     // for test
1281     call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
1282     {
1283         FTNODE dep_nodes[2];
1284         dep_nodes[0] = node;
1285         dep_nodes[1] = childa;
1286         uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnumb);
1287         ftnode_fetch_extra bfe;
1288         bfe.create_for_full_read(ft);
1289         toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb, true);
1290     }
1291 
1292     if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
1293         flush_this_child(ft, node, childa, childnuma, fa);
1294     }
1295     if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
1296         flush_this_child(ft, node, childb, childnumb, fa);
1297     }
1298 
1299     // now we have both children pinned in main memory, and cachetable locked,
1300     // so no checkpoints will occur.
1301 
1302     bool did_merge, did_rebalance;
1303     {
1304         DBT splitk;
1305         toku_init_dbt(&splitk);
1306         const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
1307         maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
1308         //toku_verify_estimates(t,childa);
1309         // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
1310         *did_react = (bool)(did_merge || did_rebalance);
1311 
1312         if (did_merge) {
1313             invariant_null(splitk.data);
1314             NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
1315             NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
1316             for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
1317                 remaining_bnc->flow[i] += merged_bnc->flow[i];
1318             }
1319             destroy_nonleaf_childinfo(merged_bnc);
1320             set_BNULL(node, childnumb);
1321             node->n_children--;
1322             memmove(&node->bp[childnumb],
1323                     &node->bp[childnumb+1],
1324                     (node->n_children-childnumb)*sizeof(node->bp[0]));
1325             REALLOC_N(node->n_children, node->bp);
1326             node->pivotkeys.delete_at(childnuma);
1327 
1328             // Handle a merge of the rightmost leaf node.
1329             BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
1330             if (did_merge && childb->blocknum.b == rightmost_blocknum.b) {
1331                 invariant(childb->blocknum.b != ft->h->root_blocknum.b);
1332                 toku_ftnode_swap_pair_values(childa, childb);
1333                 BP_BLOCKNUM(node, childnuma) = childa->blocknum;
1334             }
1335 
1336             paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->blocknum.b);
1337             childa->set_dirty();  // just to make sure
1338             childb->set_dirty();  // just to make sure
1339         } else {
1340             // flow will be inaccurate for a while, oh well.  the children
1341             // are leaves in this case so it's not a huge deal (we're
1342             // pretty far down the tree)
1343 
1344             // If we didn't merge the nodes, then we need the correct pivot.
1345             invariant_notnull(splitk.data);
1346             node->pivotkeys.replace_at(&splitk, childnuma);
1347             node->set_dirty();
1348         }
1349         toku_destroy_dbt(&splitk);
1350     }
1351     //
1352     // now we possibly flush the children
1353     //
1354     if (did_merge) {
1355         // for test
1356         call_flusher_thread_callback(flt_flush_before_unpin_remove);
1357 
1358         // merge_remove_key_callback will free the blocknum
1359         int rrb = toku_cachetable_unpin_and_remove(
1360             ft->cf,
1361             childb->ct_pair,
1362             merge_remove_key_callback,
1363             ft
1364             );
1365         assert_zero(rrb);
1366 
1367         // for test
1368         call_flusher_thread_callback(ft_flush_aflter_merge);
1369 
1370         // unlock the parent
1371         paranoid_invariant(node->dirty());
1372         toku_unpin_ftnode(ft, node);
1373     }
1374     else {
1375         // for test
1376         call_flusher_thread_callback(ft_flush_aflter_rebalance);
1377 
1378         // unlock the parent
1379         paranoid_invariant(node->dirty());
1380         toku_unpin_ftnode(ft, node);
1381         toku_unpin_ftnode(ft, childb);
1382     }
1383     if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
1384         toku_ft_flush_some_child(ft, childa, fa);
1385     }
1386     else {
1387         toku_unpin_ftnode(ft, childa);
1388     }
1389 }
1390 
toku_ft_flush_some_child(FT ft,FTNODE parent,struct flusher_advice * fa)1391 void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
1392 // Effect: This function does the following:
1393 //   - Pick a child of parent (the heaviest child),
1394 //   - flush from parent to child,
1395 //   - possibly split/merge child.
1396 //   - if child is gorged, recursively proceed with child
1397 //  Note that parent is already locked
1398 //  Upon exit of this function, parent is unlocked and no new
1399 //  new nodes (such as a child) remain locked
1400 {
1401     int dirtied = 0;
1402     NONLEAF_CHILDINFO bnc = NULL;
1403     paranoid_invariant(parent->height>0);
1404     toku_ftnode_assert_fully_in_memory(parent);
1405     TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1406 
1407     // pick the child we want to flush to
1408     int childnum = fa->pick_child(ft, parent, fa->extra);
1409 
1410     // for test
1411     call_flusher_thread_callback(flt_flush_before_child_pin);
1412 
1413     // get the child into memory
1414     BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
1415     ft->blocktable.verify_blocknum_allocated(targetchild);
1416     uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1417     FTNODE child;
1418     ftnode_fetch_extra bfe;
1419     // Note that we don't read the entire node into memory yet.
1420     // The idea is let's try to do the minimum work before releasing the parent lock
1421     bfe.create_for_min_read(ft);
1422     toku_pin_ftnode_with_dep_nodes(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child, true);
1423 
1424     // for test
1425     call_flusher_thread_callback(ft_flush_aflter_child_pin);
1426 
1427     if (fa->should_destroy_basement_nodes(fa)) {
1428         maybe_destroy_child_blbs(parent, child, ft);
1429     }
1430 
1431     //Note that at this point, we don't have the entire child in.
1432     // Let's do a quick check to see if the child may be reactive
1433     // If the child cannot be reactive, then we can safely unlock
1434     // the parent before finishing reading in the entire child node.
1435     bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1436 
1437     paranoid_invariant(child->blocknum.b!=0);
1438 
1439     // only do the following work if there is a flush to perform
1440     if (toku_bnc_n_entries(BNC(parent, childnum)) > 0 || parent->height == 1) {
1441         if (!parent->dirty()) {
1442             dirtied++;
1443             parent->set_dirty();
1444         }
1445         // detach buffer
1446         BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1447         bnc = BNC(parent, childnum);
1448         NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1449         memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1450         set_BNC(parent, childnum, new_bnc);
1451     }
1452 
1453     //
1454     // at this point, the buffer has been detached from the parent
1455     // and a new empty buffer has been placed in its stead
1456     // so, if we are absolutely sure that the child is not
1457     // reactive, we can unpin the parent
1458     //
1459     if (!may_child_be_reactive) {
1460         toku_unpin_ftnode(ft, parent);
1461         parent = NULL;
1462     }
1463 
1464     //
1465     // now, if necessary, read/decompress the rest of child into memory,
1466     // so that we can proceed and apply the flush
1467     //
1468     bring_node_fully_into_memory(child, ft);
1469 
1470     // It is possible after reading in the entire child,
1471     // that we now know that the child is not reactive
1472     // if so, we can unpin parent right now
1473     // we won't be splitting/merging child
1474     // and we have already replaced the bnc
1475     // for the root with a fresh one
1476     enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
1477     if (parent && child_re == RE_STABLE) {
1478         toku_unpin_ftnode(ft, parent);
1479         parent = NULL;
1480     }
1481 
1482     // from above, we know at this point that either the bnc
1483     // is detached from the parent (which may be unpinned),
1484     // and we have to apply the flush, or there was no data
1485     // in the buffer to flush, and as a result, flushing is not necessary
1486     // and bnc is NULL
1487     if (bnc != NULL) {
1488         if (!child->dirty()) {
1489             dirtied++;
1490             child->set_dirty();
1491         }
1492         // do the actual flush
1493         toku_bnc_flush_to_child(
1494             ft,
1495             bnc,
1496             child,
1497             parent_oldest_referenced_xid_known
1498             );
1499         destroy_nonleaf_childinfo(bnc);
1500     }
1501 
1502     fa->update_status(child, dirtied, fa->extra);
1503     // let's get the reactivity of the child again,
1504     // it is possible that the flush got rid of some values
1505     // and now the parent is no longer reactive
1506     child_re = toku_ftnode_get_reactivity(ft, child);
1507     // if the parent has been unpinned above, then
1508     // this is our only option, even if the child is not stable
1509     // if the child is not stable, we'll handle it the next
1510     // time we need to flush to the child
1511     if (!parent ||
1512         child_re == RE_STABLE ||
1513         (child_re == RE_FUSIBLE && parent->n_children == 1)
1514         )
1515     {
1516         if (parent) {
1517             toku_unpin_ftnode(ft, parent);
1518             parent = NULL;
1519         }
1520         //
1521         // it is the responsibility of toku_ft_flush_some_child to unpin child
1522         //
1523         if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
1524             toku_ft_flush_some_child(ft, child, fa);
1525         }
1526         else {
1527             toku_unpin_ftnode(ft, child);
1528         }
1529     }
1530     else if (child_re == RE_FISSIBLE) {
1531         //
1532         // it is responsibility of `ft_split_child` to unlock nodes of
1533         // parent and child as it sees fit
1534         //
1535         paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1536         ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
1537     }
1538     else if (child_re == RE_FUSIBLE) {
1539         //
1540         // it is responsibility of `maybe_merge_child to unlock nodes of
1541         // parent and child as it sees fit
1542         //
1543         paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1544         fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
1545     }
1546     else {
1547         abort();
1548     }
1549 }
1550 
toku_bnc_flush_to_child(FT ft,NONLEAF_CHILDINFO bnc,FTNODE child,TXNID parent_oldest_referenced_xid_known)1551 void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known) {
1552     paranoid_invariant(bnc);
1553 
1554     TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1555     TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
1556     TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
1557 
1558     txn_manager_state txn_state_for_gc(txn_manager);
1559     bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
1560     if (do_garbage_collection) {
1561         txn_state_for_gc.init();
1562         oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1563     }
1564     txn_gc_info gc_info(&txn_state_for_gc,
1565                         oldest_referenced_xid_for_simple_gc,
1566                         child->oldest_referenced_xid_known,
1567                         true);
1568     struct flush_msg_fn {
1569         FT ft;
1570         FTNODE child;
1571         NONLEAF_CHILDINFO bnc;
1572         txn_gc_info *gc_info;
1573 
1574         STAT64INFO_S stats_delta;
1575         int64_t logical_rows_delta = 0;
1576         size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
1577 
1578         flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
1579             ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
1580             stats_delta = { 0, 0 };
1581         }
1582         int operator()(const ft_msg &msg, bool is_fresh) {
1583             size_t flow_deltas[] = { 0, 0 };
1584             size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
1585             if (remaining_memsize <= bnc->flow[0]) {
1586                 // this message is in the current checkpoint's worth of
1587                 // the end of the message buffer
1588                 flow_deltas[0] = memsize_in_buffer;
1589             } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
1590                 // this message is in the last checkpoint's worth of the
1591                 // end of the message buffer
1592                 flow_deltas[1] = memsize_in_buffer;
1593             }
1594             toku_ftnode_put_msg(
1595                 ft->cmp,
1596                 ft->update_fun,
1597                 child,
1598                 -1,
1599                 msg,
1600                 is_fresh,
1601                 gc_info,
1602                 flow_deltas,
1603                 &stats_delta,
1604                 &logical_rows_delta);
1605             remaining_memsize -= memsize_in_buffer;
1606             return 0;
1607         }
1608     } flush_fn(ft, child, bnc, &gc_info);
1609     bnc->msg_buffer.iterate(flush_fn);
1610 
1611     child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1612 
1613     invariant(flush_fn.remaining_memsize == 0);
1614     if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
1615         toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
1616     }
1617     toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
1618     if (do_garbage_collection) {
1619         size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
1620         // may be misleading if there's a broadcast message in there
1621         toku_ft_status_note_msg_bytes_out(buffsize);
1622     }
1623 }
1624 
1625 static void
update_cleaner_status(FTNODE node,int childnum)1626 update_cleaner_status(
1627     FTNODE node,
1628     int childnum)
1629 {
1630     FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
1631     if (node->height == 1) {
1632         FL_STATUS_VAL(FT_FLUSHER_CLEANER_H1_NODES)++;
1633     } else {
1634         FL_STATUS_VAL(FT_FLUSHER_CLEANER_HGT1_NODES)++;
1635     }
1636 
1637     unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
1638     if (nbytesinbuf == 0) {
1639         FL_STATUS_VAL(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
1640     } else {
1641         if (nbytesinbuf > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
1642             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
1643         }
1644         if (nbytesinbuf < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
1645             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
1646         }
1647         FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
1648 
1649         uint64_t workdone = BP_WORKDONE(node, childnum);
1650         if (workdone > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
1651             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
1652         }
1653         if (workdone < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
1654             FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
1655         }
1656         FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
1657     }
1658 }
1659 
1660 static void
dummy_update_status(FTNODE UU (child),int UU (dirtied),void * UU (extra))1661 dummy_update_status(
1662     FTNODE UU(child),
1663     int UU(dirtied),
1664     void* UU(extra)
1665     )
1666 {
1667 }
1668 
1669 static int
dummy_pick_heaviest_child(FT UU (h),FTNODE UU (parent),void * UU (extra))1670 dummy_pick_heaviest_child(FT UU(h),
1671                     FTNODE UU(parent),
1672                     void* UU(extra))
1673 {
1674     abort();
1675     return -1;
1676 }
1677 
toku_ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode)1678 void toku_ft_split_child(
1679     FT ft,
1680     FTNODE node,
1681     int childnum,
1682     FTNODE child,
1683     enum split_mode split_mode
1684     )
1685 {
1686     struct flusher_advice fa;
1687     flusher_advice_init(
1688         &fa,
1689         dummy_pick_heaviest_child,
1690         dont_destroy_basement_nodes,
1691         never_recursively_flush,
1692         default_merge_child,
1693         dummy_update_status,
1694         default_pick_child_after_split,
1695         NULL
1696         );
1697     ft_split_child(
1698         ft,
1699         node,
1700         childnum, // childnum to split
1701         child,
1702         split_mode,
1703         &fa
1704         );
1705 }
1706 
toku_ft_merge_child(FT ft,FTNODE node,int childnum)1707 void toku_ft_merge_child(
1708     FT ft,
1709     FTNODE node,
1710     int childnum
1711     )
1712 {
1713     struct flusher_advice fa;
1714     flusher_advice_init(
1715         &fa,
1716         dummy_pick_heaviest_child,
1717         dont_destroy_basement_nodes,
1718         never_recursively_flush,
1719         default_merge_child,
1720         dummy_update_status,
1721         default_pick_child_after_split,
1722         NULL
1723         );
1724     bool did_react;
1725     ft_merge_child(
1726         ft,
1727         node,
1728         childnum, // childnum to merge
1729         &did_react,
1730         &fa
1731         );
1732 }
1733 
1734 int
toku_ftnode_cleaner_callback(void * ftnode_pv,BLOCKNUM blocknum,uint32_t fullhash,void * extraargs)1735 toku_ftnode_cleaner_callback(
1736     void *ftnode_pv,
1737     BLOCKNUM blocknum,
1738     uint32_t fullhash,
1739     void *extraargs)
1740 {
1741     FTNODE node = (FTNODE) ftnode_pv;
1742     invariant(node->blocknum.b == blocknum.b);
1743     invariant(node->fullhash == fullhash);
1744     invariant(node->height > 0);   // we should never pick a leaf node (for now at least)
1745     FT ft = (FT) extraargs;
1746     bring_node_fully_into_memory(node, ft);
1747     int childnum = find_heaviest_child(node);
1748     update_cleaner_status(node, childnum);
1749 
1750     // Either toku_ft_flush_some_child will unlock the node, or we do it here.
1751     if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
1752         struct flusher_advice fa;
1753         struct flush_status_update_extra fste;
1754         ct_flusher_advice_init(&fa, &fste, ft->h->nodesize);
1755         toku_ft_flush_some_child(ft, node, &fa);
1756     } else {
1757         toku_unpin_ftnode(ft, node);
1758     }
1759     return 0;
1760 }
1761 
1762 struct flusher_extra {
1763     FT ft;
1764     FTNODE node;
1765     NONLEAF_CHILDINFO bnc;
1766     TXNID parent_oldest_referenced_xid_known;
1767 };
1768 
1769 //
1770 // This is the function that gets called by a
1771 // background thread. Its purpose is to complete
1772 // a flush, and possibly do a split/merge.
1773 //
flush_node_fun(void * fe_v)1774 static void flush_node_fun(void *fe_v)
1775 {
1776     toku::context flush_ctx(CTX_FLUSH);
1777     struct flusher_extra* fe = (struct flusher_extra *) fe_v;
1778     // The node that has been placed on the background
1779     // thread may not be fully in memory. Some message
1780     // buffers may be compressed. Before performing
1781     // any operations, we must first make sure
1782     // the node is fully in memory
1783     //
1784     // If we have a bnc, that means fe->node is a child, and we've already
1785     // destroyed its basement nodes if necessary, so we now need to either
1786     // read them back in, or just do the regular partial fetch.  If we
1787     // don't, that means fe->node is a parent, so we need to do this anyway.
1788     bring_node_fully_into_memory(fe->node,fe->ft);
1789     fe->node->set_dirty();
1790 
1791     struct flusher_advice fa;
1792     struct flush_status_update_extra fste;
1793     flt_flusher_advice_init(&fa, &fste, fe->ft->h->nodesize);
1794 
1795     if (fe->bnc) {
1796         // In this case, we have a bnc to flush to a node
1797 
1798         // for test purposes
1799         call_flusher_thread_callback(flt_flush_before_applying_inbox);
1800 
1801         toku_bnc_flush_to_child(
1802             fe->ft,
1803             fe->bnc,
1804             fe->node,
1805             fe->parent_oldest_referenced_xid_known
1806             );
1807         destroy_nonleaf_childinfo(fe->bnc);
1808 
1809         // after the flush has completed, now check to see if the node needs flushing
1810         // If so, call toku_ft_flush_some_child on the node (because this flush intends to
1811         // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
1812         // responsibility of the flush to unlock the node. otherwise, we unlock it here.
1813         if (fe->node->height > 0 && toku_ftnode_nonleaf_is_gorged(fe->node, fe->ft->h->nodesize)) {
1814             toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1815         }
1816         else {
1817             toku_unpin_ftnode(fe->ft,fe->node);
1818         }
1819     }
1820     else {
1821         // In this case, we were just passed a node with no
1822         // bnc, which means we are tasked with flushing some
1823         // buffer in the node.
1824         // It is the responsibility of flush some child to unlock the node
1825         toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1826     }
1827     remove_background_job_from_cf(fe->ft->cf);
1828     toku_free(fe);
1829 }
1830 
1831 static void
place_node_and_bnc_on_background_thread(FT ft,FTNODE node,NONLEAF_CHILDINFO bnc,TXNID parent_oldest_referenced_xid_known)1832 place_node_and_bnc_on_background_thread(
1833     FT ft,
1834     FTNODE node,
1835     NONLEAF_CHILDINFO bnc,
1836     TXNID parent_oldest_referenced_xid_known)
1837 {
1838     struct flusher_extra *XMALLOC(fe);
1839     fe->ft = ft;
1840     fe->node = node;
1841     fe->bnc = bnc;
1842     fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1843     cachefile_kibbutz_enq(ft->cf, flush_node_fun, fe);
1844 }
1845 
1846 //
1847 // This takes as input a gorged, locked,  non-leaf node named parent
1848 // and sets up a flush to be done in the background.
1849 // The flush is setup like this:
1850 //  - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
1851 //  - if we successfully pin the child, and the child does not need to be split or merged
1852 //     then we detach the buffer, place the child and buffer onto a background thread, and
1853 //     have the flush complete in the background, and unlock the parent. The child will be
1854 //     unlocked on the background thread
1855 //  - if any of the above does not happen (child cannot be locked,
1856 //     child needs to be split/merged), then we place the parent on the background thread.
1857 //     The parent will be unlocked on the background thread
1858 //
toku_ft_flush_node_on_background_thread(FT ft,FTNODE parent)1859 void toku_ft_flush_node_on_background_thread(FT ft, FTNODE parent)
1860 {
1861     toku::context flush_ctx(CTX_FLUSH);
1862     TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1863     //
1864     // first let's see if we can detach buffer on client thread
1865     // and pick the child we want to flush to
1866     //
1867     int childnum = find_heaviest_child(parent);
1868     paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
1869     //
1870     // see if we can pin the child
1871     //
1872     FTNODE child;
1873     uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1874     int r = toku_maybe_pin_ftnode_clean(ft, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
1875     if (r != 0) {
1876         // In this case, we could not lock the child, so just place the parent on the background thread
1877         // In the callback, we will use toku_ft_flush_some_child, which checks to
1878         // see if we should blow away the old basement nodes.
1879         place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1880     }
1881     else {
1882         //
1883         // successfully locked child
1884         //
1885         bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1886         if (!may_child_be_reactive) {
1887             // We're going to unpin the parent, so before we do, we must
1888             // check to see if we need to blow away the basement nodes to
1889             // keep the MSN invariants intact.
1890             maybe_destroy_child_blbs(parent, child, ft);
1891 
1892             //
1893             // can detach buffer and unpin root here
1894             //
1895             parent->set_dirty();
1896             BP_WORKDONE(parent, childnum) = 0;  // this buffer is drained, no work has been done by its contents
1897             NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
1898             NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1899             memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1900             set_BNC(parent, childnum, new_bnc);
1901 
1902             //
1903             // at this point, the buffer has been detached from the parent
1904             // and a new empty buffer has been placed in its stead
1905             // so, because we know for sure the child is not
1906             // reactive, we can unpin the parent
1907             //
1908             place_node_and_bnc_on_background_thread(ft, child, bnc, parent_oldest_referenced_xid_known);
1909             toku_unpin_ftnode(ft, parent);
1910         }
1911         else {
1912             // because the child may be reactive, we need to
1913             // put parent on background thread.
1914             // As a result, we unlock the child here.
1915             toku_unpin_ftnode(ft, child);
1916             // Again, we'll have the parent on the background thread, so
1917             // we don't need to destroy the basement nodes yet.
1918             place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1919         }
1920     }
1921 }
1922 
1923 #include <toku_race_tools.h>
1924 void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
1925 void
toku_ft_flusher_helgrind_ignore(void)1926 toku_ft_flusher_helgrind_ignore(void) {
1927     TOKU_VALGRIND_HG_DISABLE_CHECKING(&fl_status, sizeof fl_status);
1928 }
1929