1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include "ft/ft.h"
40 #include "ft/ft-cachetable-wrappers.h"
41 #include "ft/ft-internal.h"
42 #include "ft/ft-flusher.h"
43 #include "ft/ft-flusher-internal.h"
44 #include "ft/node.h"
45 #include "ft/serialize/block_table.h"
46 #include "ft/serialize/ft_node-serialize.h"
47 #include "portability/toku_assert.h"
48 #include "portability/toku_atomic.h"
49 #include "util/status.h"
50 #include "util/context.h"
51
52
toku_ft_flusher_get_status(FT_FLUSHER_STATUS status)53 void toku_ft_flusher_get_status(FT_FLUSHER_STATUS status) {
54 fl_status.init();
55 *status = fl_status;
56 }
57
58 //
59 // For test purposes only.
60 // These callbacks are never used in production code, only as a way
61 // to test the system (for example, by causing crashes at predictable times).
62 //
63 static void (*flusher_thread_callback)(int, void*) = NULL;
64 static void *flusher_thread_callback_extra = NULL;
65
toku_flusher_thread_set_callback(void (* callback_f)(int,void *),void * extra)66 void toku_flusher_thread_set_callback(void (*callback_f)(int, void*),
67 void* extra) {
68 flusher_thread_callback = callback_f;
69 flusher_thread_callback_extra = extra;
70 }
71
call_flusher_thread_callback(int flt_state)72 static void call_flusher_thread_callback(int flt_state) {
73 if (flusher_thread_callback) {
74 flusher_thread_callback(flt_state, flusher_thread_callback_extra);
75 }
76 }
77
78 static int
find_heaviest_child(FTNODE node)79 find_heaviest_child(FTNODE node)
80 {
81 int max_child = 0;
82 uint64_t max_weight = toku_bnc_nbytesinbuf(BNC(node, 0)) + BP_WORKDONE(node, 0);
83
84 invariant(node->n_children > 0);
85 for (int i = 1; i < node->n_children; i++) {
86 uint64_t bytes_in_buf = toku_bnc_nbytesinbuf(BNC(node, i));
87 uint64_t workdone = BP_WORKDONE(node, i);
88 if (workdone > 0) {
89 invariant(bytes_in_buf > 0);
90 }
91 uint64_t this_weight = bytes_in_buf + workdone;
92 if (max_weight < this_weight) {
93 max_child = i;
94 max_weight = this_weight;
95 }
96 }
97 return max_child;
98 }
99
100 static void
update_flush_status(FTNODE child,int cascades)101 update_flush_status(FTNODE child, int cascades) {
102 FL_STATUS_VAL(FT_FLUSHER_FLUSH_TOTAL)++;
103 if (cascades > 0) {
104 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES)++;
105 switch (cascades) {
106 case 1:
107 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_1)++; break;
108 case 2:
109 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_2)++; break;
110 case 3:
111 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_3)++; break;
112 case 4:
113 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_4)++; break;
114 case 5:
115 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_5)++; break;
116 default:
117 FL_STATUS_VAL(FT_FLUSHER_FLUSH_CASCADES_GT_5)++; break;
118 }
119 }
120 bool flush_needs_io = false;
121 for (int i = 0; !flush_needs_io && i < child->n_children; ++i) {
122 if (BP_STATE(child, i) == PT_ON_DISK) {
123 flush_needs_io = true;
124 }
125 }
126 if (flush_needs_io) {
127 FL_STATUS_VAL(FT_FLUSHER_FLUSH_NEEDED_IO)++;
128 } else {
129 FL_STATUS_VAL(FT_FLUSHER_FLUSH_IN_MEMORY)++;
130 }
131 }
132
133 static void
maybe_destroy_child_blbs(FTNODE node,FTNODE child,FT ft)134 maybe_destroy_child_blbs(FTNODE node, FTNODE child, FT ft)
135 {
136 // If the node is already fully in memory, as in upgrade, we don't
137 // need to destroy the basement nodes because they are all equally
138 // up to date.
139 if (child->n_children > 1 &&
140 child->height == 0 &&
141 !child->dirty()) {
142 for (int i = 0; i < child->n_children; ++i) {
143 if (BP_STATE(child, i) == PT_AVAIL &&
144 node->max_msn_applied_to_node_on_disk.msn < BLB_MAX_MSN_APPLIED(child, i).msn)
145 {
146 toku_evict_bn_from_memory(child, i, ft);
147 }
148 }
149 }
150 }
151
152 static void
153 ft_merge_child(
154 FT ft,
155 FTNODE node,
156 int childnum_to_merge,
157 bool *did_react,
158 struct flusher_advice *fa);
159
160 static int
pick_heaviest_child(FT UU (ft),FTNODE parent,void * UU (extra))161 pick_heaviest_child(FT UU(ft),
162 FTNODE parent,
163 void* UU(extra))
164 {
165 int childnum = find_heaviest_child(parent);
166 paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
167 return childnum;
168 }
169
170 bool
dont_destroy_basement_nodes(void * UU (extra))171 dont_destroy_basement_nodes(void* UU(extra))
172 {
173 return false;
174 }
175
176 static bool
do_destroy_basement_nodes(void * UU (extra))177 do_destroy_basement_nodes(void* UU(extra))
178 {
179 return true;
180 }
181
182 bool
always_recursively_flush(FTNODE UU (child),void * UU (extra))183 always_recursively_flush(FTNODE UU(child), void* UU(extra))
184 {
185 return true;
186 }
187
188 bool
never_recursively_flush(FTNODE UU (child),void * UU (extra))189 never_recursively_flush(FTNODE UU(child), void* UU(extra))
190 {
191 return false;
192 }
193
194 /**
195 * Flusher thread ("normal" flushing) implementation.
196 */
197 struct flush_status_update_extra {
198 int cascades;
199 uint32_t nodesize;
200 };
201
202 static bool
recurse_if_child_is_gorged(FTNODE child,void * extra)203 recurse_if_child_is_gorged(FTNODE child, void* extra)
204 {
205 struct flush_status_update_extra *fste = (flush_status_update_extra *)extra;
206 return toku_ftnode_nonleaf_is_gorged(child, fste->nodesize);
207 }
208
209 int
default_pick_child_after_split(FT UU (ft),FTNODE UU (parent),int UU (childnuma),int UU (childnumb),void * UU (extra))210 default_pick_child_after_split(FT UU(ft),
211 FTNODE UU(parent),
212 int UU(childnuma),
213 int UU(childnumb),
214 void* UU(extra))
215 {
216 return -1;
217 }
218
219 void
default_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * UU (extra))220 default_merge_child(struct flusher_advice *fa,
221 FT ft,
222 FTNODE parent,
223 int childnum,
224 FTNODE child,
225 void* UU(extra))
226 {
227 //
228 // There is probably a way to pass FTNODE child
229 // into ft_merge_child, but for simplicity for now,
230 // we are just going to unpin child and
231 // let ft_merge_child pin it again
232 //
233 toku_unpin_ftnode(ft, child);
234 //
235 //
236 // it is responsibility of ft_merge_child to unlock parent
237 //
238 bool did_react;
239 ft_merge_child(ft, parent, childnum, &did_react, fa);
240 }
241
242 void
flusher_advice_init(struct flusher_advice * fa,FA_PICK_CHILD pick_child,FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,FA_MAYBE_MERGE_CHILD maybe_merge_child,FA_UPDATE_STATUS update_status,FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,void * extra)243 flusher_advice_init(
244 struct flusher_advice *fa,
245 FA_PICK_CHILD pick_child,
246 FA_SHOULD_DESTROY_BN should_destroy_basement_nodes,
247 FA_SHOULD_RECURSIVELY_FLUSH should_recursively_flush,
248 FA_MAYBE_MERGE_CHILD maybe_merge_child,
249 FA_UPDATE_STATUS update_status,
250 FA_PICK_CHILD_AFTER_SPLIT pick_child_after_split,
251 void* extra
252 )
253 {
254 fa->pick_child = pick_child;
255 fa->should_destroy_basement_nodes = should_destroy_basement_nodes;
256 fa->should_recursively_flush = should_recursively_flush;
257 fa->maybe_merge_child = maybe_merge_child;
258 fa->update_status = update_status;
259 fa->pick_child_after_split = pick_child_after_split;
260 fa->extra = extra;
261 }
262
263 static void
flt_update_status(FTNODE child,int UU (dirtied),void * extra)264 flt_update_status(FTNODE child,
265 int UU(dirtied),
266 void* extra)
267 {
268 struct flush_status_update_extra *fste = (struct flush_status_update_extra *) extra;
269 update_flush_status(child, fste->cascades);
270 // If `toku_ft_flush_some_child` decides to recurse after this, we'll need
271 // cascades to increase. If not it doesn't matter.
272 fste->cascades++;
273 }
274
275 static void
flt_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)276 flt_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra *fste, uint32_t nodesize)
277 {
278 fste->cascades = 0;
279 fste->nodesize = nodesize;
280 flusher_advice_init(fa,
281 pick_heaviest_child,
282 dont_destroy_basement_nodes,
283 recurse_if_child_is_gorged,
284 default_merge_child,
285 flt_update_status,
286 default_pick_child_after_split,
287 fste);
288 }
289
290 struct ctm_extra {
291 bool is_last_child;
292 DBT target_key;
293 };
294
295 static int
ctm_pick_child(FT ft,FTNODE parent,void * extra)296 ctm_pick_child(FT ft,
297 FTNODE parent,
298 void* extra)
299 {
300 struct ctm_extra* ctme = (struct ctm_extra *) extra;
301 int childnum;
302 if (parent->height == 1 && ctme->is_last_child) {
303 childnum = parent->n_children - 1;
304 } else {
305 childnum = toku_ftnode_which_child(parent, &ctme->target_key, ft->cmp);
306 }
307 return childnum;
308 }
309
310 static void
ctm_update_status(FTNODE UU (child),int dirtied,void * UU (extra))311 ctm_update_status(
312 FTNODE UU(child),
313 int dirtied,
314 void* UU(extra)
315 )
316 {
317 FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_DIRTIED_FOR_LEAF_MERGE) += dirtied;
318 }
319
320 static void
ctm_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)321 ctm_maybe_merge_child(struct flusher_advice *fa,
322 FT ft,
323 FTNODE parent,
324 int childnum,
325 FTNODE child,
326 void *extra)
327 {
328 if (child->height == 0) {
329 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_COMPLETED), 1);
330 }
331 default_merge_child(fa, ft, parent, childnum, child, extra);
332 }
333
334 static void
ct_maybe_merge_child(struct flusher_advice * fa,FT ft,FTNODE parent,int childnum,FTNODE child,void * extra)335 ct_maybe_merge_child(struct flusher_advice *fa,
336 FT ft,
337 FTNODE parent,
338 int childnum,
339 FTNODE child,
340 void* extra)
341 {
342 if (child->height > 0) {
343 default_merge_child(fa, ft, parent, childnum, child, extra);
344 }
345 else {
346 struct ctm_extra ctme;
347 paranoid_invariant(parent->n_children > 1);
348 int pivot_to_save;
349 //
350 // we have two cases, one where the childnum
351 // is the last child, and therefore the pivot we
352 // save is not of the pivot which we wish to descend
353 // and another where it is not the last child,
354 // so the pivot is sufficient for identifying the leaf
355 // to be merged
356 //
357 if (childnum == (parent->n_children - 1)) {
358 ctme.is_last_child = true;
359 pivot_to_save = childnum - 1;
360 }
361 else {
362 ctme.is_last_child = false;
363 pivot_to_save = childnum;
364 }
365 toku_clone_dbt(&ctme.target_key, parent->pivotkeys.get_pivot(pivot_to_save));
366
367 // at this point, ctme is properly setup, now we can do the merge
368 struct flusher_advice new_fa;
369 flusher_advice_init(
370 &new_fa,
371 ctm_pick_child,
372 dont_destroy_basement_nodes,
373 always_recursively_flush,
374 ctm_maybe_merge_child,
375 ctm_update_status,
376 default_pick_child_after_split,
377 &ctme);
378
379 toku_unpin_ftnode(ft, parent);
380 toku_unpin_ftnode(ft, child);
381
382 FTNODE root_node = NULL;
383 {
384 uint32_t fullhash;
385 CACHEKEY root;
386 toku_calculate_root_offset_pointer(ft, &root, &fullhash);
387 ftnode_fetch_extra bfe;
388 bfe.create_for_full_read(ft);
389 toku_pin_ftnode(ft, root, fullhash, &bfe, PL_WRITE_EXPENSIVE, &root_node, true);
390 toku_ftnode_assert_fully_in_memory(root_node);
391 }
392
393 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_STARTED), 1);
394 (void) toku_sync_fetch_and_add(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
395
396 toku_ft_flush_some_child(ft, root_node, &new_fa);
397
398 (void) toku_sync_fetch_and_sub(&FL_STATUS_VAL(FT_FLUSHER_CLEANER_NUM_LEAF_MERGES_RUNNING), 1);
399
400 toku_destroy_dbt(&ctme.target_key);
401 }
402 }
403
404 static void
ct_update_status(FTNODE child,int dirtied,void * extra)405 ct_update_status(FTNODE child,
406 int dirtied,
407 void* extra)
408 {
409 struct flush_status_update_extra* fste = (struct flush_status_update_extra *) extra;
410 update_flush_status(child, fste->cascades);
411 FL_STATUS_VAL(FT_FLUSHER_CLEANER_NODES_DIRTIED) += dirtied;
412 // Incrementing this in case `toku_ft_flush_some_child` decides to recurse.
413 fste->cascades++;
414 }
415
416 static void
ct_flusher_advice_init(struct flusher_advice * fa,struct flush_status_update_extra * fste,uint32_t nodesize)417 ct_flusher_advice_init(struct flusher_advice *fa, struct flush_status_update_extra* fste, uint32_t nodesize)
418 {
419 fste->cascades = 0;
420 fste->nodesize = nodesize;
421 flusher_advice_init(fa,
422 pick_heaviest_child,
423 do_destroy_basement_nodes,
424 recurse_if_child_is_gorged,
425 ct_maybe_merge_child,
426 ct_update_status,
427 default_pick_child_after_split,
428 fste);
429 }
430
431 //
432 // This returns true if the node MAY be reactive,
433 // false is we are absolutely sure that it is NOT reactive.
434 // The reason for inaccuracy is that the node may be
435 // a leaf node that is not entirely in memory. If so, then
436 // we cannot be sure if the node is reactive.
437 //
ft_ftnode_may_be_reactive(FT ft,FTNODE node)438 static bool ft_ftnode_may_be_reactive(FT ft, FTNODE node)
439 {
440 if (node->height == 0) {
441 return true;
442 } else {
443 return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout) != RE_STABLE;
444 }
445 }
446
447 /* NODE is a node with a child.
448 * childnum was split into two nodes childa, and childb. childa is the same as the original child. childb is a new child.
449 * We must slide things around, & move things from the old table to the new tables.
450 * Requires: the CHILDNUMth buffer of node is empty.
451 * We don't push anything down to children. We split the node, and things land wherever they land.
452 * We must delete the old buffer (but the old child is already deleted.)
453 * On return, the new children and node STAY PINNED.
454 */
455 static void
handle_split_of_child(FT ft,FTNODE node,int childnum,FTNODE childa,FTNODE childb,DBT * splitk)456 handle_split_of_child(
457 FT ft,
458 FTNODE node,
459 int childnum,
460 FTNODE childa,
461 FTNODE childb,
462 DBT *splitk /* the data in the childsplitk is alloc'd and is consumed by this call. */
463 )
464 {
465 paranoid_invariant(node->height>0);
466 paranoid_invariant(0 <= childnum);
467 paranoid_invariant(childnum < node->n_children);
468 toku_ftnode_assert_fully_in_memory(node);
469 toku_ftnode_assert_fully_in_memory(childa);
470 toku_ftnode_assert_fully_in_memory(childb);
471 NONLEAF_CHILDINFO old_bnc = BNC(node, childnum);
472 paranoid_invariant(toku_bnc_nbytesinbuf(old_bnc)==0);
473 WHEN_NOT_GCOV(
474 if (toku_ft_debug_mode) {
475 printf("%s:%d Child %d splitting on %s\n", __FILE__, __LINE__, childnum, (char*)splitk->data);
476 printf("%s:%d oldsplitkeys:", __FILE__, __LINE__);
477 for(int i = 0; i < node->n_children - 1; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
478 printf("\n");
479 }
480 )
481
482 node->set_dirty();
483
484 XREALLOC_N(node->n_children+1, node->bp);
485 // Slide the children over.
486 // suppose n_children is 10 and childnum is 5, meaning node->childnum[5] just got split
487 // this moves node->bp[6] through node->bp[9] over to
488 // node->bp[7] through node->bp[10]
489 for (int cnum=node->n_children; cnum>childnum+1; cnum--) {
490 node->bp[cnum] = node->bp[cnum-1];
491 }
492 memset(&node->bp[childnum+1],0,sizeof(node->bp[0]));
493 node->n_children++;
494
495 paranoid_invariant(BP_BLOCKNUM(node, childnum).b==childa->blocknum.b); // use the same child
496
497 // We never set the rightmost blocknum to be the root.
498 // Instead, we wait for the root to split and let promotion initialize the rightmost
499 // blocknum to be the first non-root leaf node on the right extreme to receive an insert.
500 BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
501 invariant(ft->h->root_blocknum.b != rightmost_blocknum.b);
502 if (childa->blocknum.b == rightmost_blocknum.b) {
503 // The rightmost leaf (a) split into (a) and (b). We want (b) to swap pair values
504 // with (a), now that it is the new rightmost leaf. This keeps the rightmost blocknum
505 // constant, the same the way we keep the root blocknum constant.
506 toku_ftnode_swap_pair_values(childa, childb);
507 BP_BLOCKNUM(node, childnum) = childa->blocknum;
508 }
509
510 BP_BLOCKNUM(node, childnum+1) = childb->blocknum;
511 BP_WORKDONE(node, childnum+1) = 0;
512 BP_STATE(node,childnum+1) = PT_AVAIL;
513
514 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
515 for (unsigned int i = 0; i < (sizeof new_bnc->flow) / (sizeof new_bnc->flow[0]); ++i) {
516 // just split the flows in half for now, can't guess much better
517 // at the moment
518 new_bnc->flow[i] = old_bnc->flow[i] / 2;
519 old_bnc->flow[i] = (old_bnc->flow[i] + 1) / 2;
520 }
521 set_BNC(node, childnum+1, new_bnc);
522
523 // Insert the new split key , sliding the other keys over
524 node->pivotkeys.insert_at(splitk, childnum);
525
526 WHEN_NOT_GCOV(
527 if (toku_ft_debug_mode) {
528 printf("%s:%d splitkeys:", __FILE__, __LINE__);
529 for (int i = 0; i < node->n_children - 2; i++) printf(" %s", (char *) node->pivotkeys.get_pivot(i).data);
530 printf("\n");
531 }
532 )
533
534 /* Keep pushing to the children, but not if the children would require a pushdown */
535 toku_ftnode_assert_fully_in_memory(node);
536 toku_ftnode_assert_fully_in_memory(childa);
537 toku_ftnode_assert_fully_in_memory(childb);
538
539 VERIFY_NODE(t, node);
540 VERIFY_NODE(t, childa);
541 VERIFY_NODE(t, childb);
542 }
543
544 static void
verify_all_in_mempool(FTNODE UU ()node)545 verify_all_in_mempool(FTNODE UU() node)
546 {
547 #ifdef TOKU_DEBUG_PARANOID
548 if (node->height==0) {
549 for (int i = 0; i < node->n_children; i++) {
550 invariant(BP_STATE(node,i) == PT_AVAIL);
551 BLB_DATA(node, i)->verify_mempool();
552 }
553 }
554 #endif
555 }
556
557 static uint64_t
ftleaf_disk_size(FTNODE node)558 ftleaf_disk_size(FTNODE node)
559 // Effect: get the disk size of a leafentry
560 {
561 paranoid_invariant(node->height == 0);
562 toku_ftnode_assert_fully_in_memory(node);
563 uint64_t retval = 0;
564 for (int i = 0; i < node->n_children; i++) {
565 retval += BLB_DATA(node, i)->get_disk_size();
566 }
567 return retval;
568 }
569
570 static void
ftleaf_get_split_loc(FTNODE node,enum split_mode split_mode,int * num_left_bns,int * num_left_les)571 ftleaf_get_split_loc(
572 FTNODE node,
573 enum split_mode split_mode,
574 int *num_left_bns, // which basement within leaf
575 int *num_left_les // which key within basement
576 )
577 // Effect: Find the location within a leaf node where we want to perform a split
578 // num_left_bns is how many basement nodes (which OMT) should be split to the left.
579 // num_left_les is how many leafentries in OMT of the last bn should be on the left side of the split.
580 {
581 switch (split_mode) {
582 case SPLIT_LEFT_HEAVY: {
583 *num_left_bns = node->n_children;
584 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
585 if (*num_left_les == 0) {
586 *num_left_bns = node->n_children - 1;
587 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
588 }
589 goto exit;
590 }
591 case SPLIT_RIGHT_HEAVY: {
592 *num_left_bns = 1;
593 *num_left_les = BLB_DATA(node, 0)->num_klpairs() ? 1 : 0;
594 goto exit;
595 }
596 case SPLIT_EVENLY: {
597 paranoid_invariant(node->height == 0);
598 // TODO: (Zardosht) see if we can/should make this faster, we iterate over the rows twice
599 uint64_t sumlesizes = ftleaf_disk_size(node);
600 uint32_t size_so_far = 0;
601 for (int i = 0; i < node->n_children; i++) {
602 bn_data* bd = BLB_DATA(node, i);
603 uint32_t n_leafentries = bd->num_klpairs();
604 for (uint32_t j=0; j < n_leafentries; j++) {
605 size_t size_this_le;
606 int rr = bd->fetch_klpair_disksize(j, &size_this_le);
607 invariant_zero(rr);
608 size_so_far += size_this_le;
609 if (size_so_far >= sumlesizes/2) {
610 *num_left_bns = i + 1;
611 *num_left_les = j + 1;
612 if (*num_left_bns == node->n_children &&
613 (unsigned int) *num_left_les == n_leafentries) {
614 // need to correct for when we're splitting after the
615 // last element, that makes no sense
616 if (*num_left_les > 1) {
617 (*num_left_les)--;
618 } else if (*num_left_bns > 1) {
619 (*num_left_bns)--;
620 *num_left_les = BLB_DATA(node, *num_left_bns - 1)->num_klpairs();
621 } else {
622 // we are trying to split a leaf with only one
623 // leafentry in it
624 abort();
625 }
626 }
627 goto exit;
628 }
629 }
630 }
631 }
632 }
633 abort();
634 exit:
635 return;
636 }
637
638 static void
move_leafentries(BASEMENTNODE dest_bn,BASEMENTNODE src_bn,uint32_t lbi,uint32_t ube)639 move_leafentries(
640 BASEMENTNODE dest_bn,
641 BASEMENTNODE src_bn,
642 uint32_t lbi, //lower bound inclusive
643 uint32_t ube //upper bound exclusive
644 )
645 //Effect: move leafentries in the range [lbi, upe) from src_omt to newly created dest_omt
646 {
647 invariant(ube == src_bn->data_buffer.num_klpairs());
648 src_bn->data_buffer.split_klpairs(&dest_bn->data_buffer, lbi);
649 }
650
ftnode_finalize_split(FTNODE node,FTNODE B,MSN max_msn_applied_to_node)651 static void ftnode_finalize_split(FTNODE node, FTNODE B, MSN max_msn_applied_to_node) {
652 // Effect: Finalizes a split by updating some bits and dirtying both nodes
653 toku_ftnode_assert_fully_in_memory(node);
654 toku_ftnode_assert_fully_in_memory(B);
655 verify_all_in_mempool(node);
656 verify_all_in_mempool(B);
657
658 node->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
659 B->max_msn_applied_to_node_on_disk = max_msn_applied_to_node;
660
661 // The new node in the split inherits the oldest known reference xid
662 B->oldest_referenced_xid_known = node->oldest_referenced_xid_known;
663
664 node->set_dirty();
665 B->set_dirty();
666 }
667
668 void
ftleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,bool create_new_node,enum split_mode split_mode,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)669 ftleaf_split(
670 FT ft,
671 FTNODE node,
672 FTNODE *nodea,
673 FTNODE *nodeb,
674 DBT *splitk,
675 bool create_new_node,
676 enum split_mode split_mode,
677 uint32_t num_dependent_nodes,
678 FTNODE* dependent_nodes)
679 // Effect: Split a leaf node.
680 // Argument "node" is node to be split.
681 // Upon return:
682 // nodea and nodeb point to new nodes that result from split of "node"
683 // nodea is the left node that results from the split
684 // splitk is the right-most key of nodea
685 {
686
687 paranoid_invariant(node->height == 0);
688 FL_STATUS_VAL(FT_FLUSHER_SPLIT_LEAF)++;
689 if (node->n_children) {
690 // First move all the accumulated stat64info deltas into the first basement.
691 // After the split, either both nodes or neither node will be included in the next checkpoint.
692 // The accumulated stats in the dictionary will be correct in either case.
693 // By moving all the deltas into one (arbitrary) basement, we avoid the need to maintain
694 // correct information for a basement that is divided between two leafnodes (i.e. when split is
695 // not on a basement boundary).
696 STAT64INFO_S delta_for_leafnode = toku_get_and_clear_basement_stats(node);
697 BASEMENTNODE bn = BLB(node,0);
698 bn->stat64_delta = delta_for_leafnode;
699 }
700
701
702 FTNODE B = nullptr;
703 uint32_t fullhash;
704 BLOCKNUM name;
705
706 if (create_new_node) {
707 // put value in cachetable and do checkpointing
708 // of dependent nodes
709 //
710 // We do this here, before evaluating the last_bn_on_left
711 // and last_le_on_left_within_bn because this operation
712 // may write to disk the dependent nodes.
713 // While doing so, we may rebalance the leaf node
714 // we are splitting, thereby invalidating the
715 // values of last_bn_on_left and last_le_on_left_within_bn.
716 // So, we must call this before evaluating
717 // those two values
718 cachetable_put_empty_node_with_dep_nodes(
719 ft,
720 num_dependent_nodes,
721 dependent_nodes,
722 &name,
723 &fullhash,
724 &B
725 );
726 // GCC 4.8 seems to get confused and think B is maybe uninitialized at link time.
727 // TODO(leif): figure out why it thinks this and actually fix it.
728 invariant_notnull(B);
729 }
730
731
732 paranoid_invariant(node->height==0);
733 toku_ftnode_assert_fully_in_memory(node);
734 verify_all_in_mempool(node);
735 MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
736
737 // variables that say where we will do the split.
738 // After the split, there will be num_left_bns basement nodes in the left node,
739 // and the last basement node in the left node will have num_left_les leafentries.
740 int num_left_bns;
741 int num_left_les;
742 ftleaf_get_split_loc(node, split_mode, &num_left_bns, &num_left_les);
743 {
744 // did we split right on the boundary between basement nodes?
745 const bool split_on_boundary = (num_left_les == 0) || (num_left_les == (int) BLB_DATA(node, num_left_bns - 1)->num_klpairs());
746 // Now we know where we are going to break it
747 // the two nodes will have a total of n_children+1 basement nodes
748 // and n_children-1 pivots
749 // the left node, node, will have last_bn_on_left+1 basement nodes
750 // the right node, B, will have n_children-last_bn_on_left basement nodes
751 // the pivots of node will be the first last_bn_on_left pivots that originally exist
752 // the pivots of B will be the last (n_children - 1 - last_bn_on_left) pivots that originally exist
753
754 // Note: The basements will not be rebalanced. Only the mempool of the basement that is split
755 // (if split_on_boundary is false) will be affected. All other mempools will remain intact. ???
756
757 //set up the basement nodes in the new node
758 int num_children_in_node = num_left_bns;
759 // In the SPLIT_RIGHT_HEAVY case, we need to add 1 back because
760 // while it's not on the boundary, we do need node->n_children
761 // children in B.
762 int num_children_in_b = node->n_children - num_left_bns + (!split_on_boundary ? 1 : 0);
763 if (num_children_in_b == 0) {
764 // for uneven split, make sure we have at least 1 bn
765 paranoid_invariant(split_mode == SPLIT_LEFT_HEAVY);
766 num_children_in_b = 1;
767 }
768 paranoid_invariant(num_children_in_node > 0);
769 if (create_new_node) {
770 toku_initialize_empty_ftnode(
771 B,
772 name,
773 0,
774 num_children_in_b,
775 ft->h->layout_version,
776 ft->h->flags);
777 B->fullhash = fullhash;
778 }
779 else {
780 B = *nodeb;
781 REALLOC_N(num_children_in_b, B->bp);
782 B->n_children = num_children_in_b;
783 for (int i = 0; i < num_children_in_b; i++) {
784 BP_BLOCKNUM(B,i).b = 0;
785 BP_STATE(B,i) = PT_AVAIL;
786 BP_WORKDONE(B,i) = 0;
787 set_BLB(B, i, toku_create_empty_bn());
788 }
789 }
790
791 // now move all the data
792
793 int curr_src_bn_index = num_left_bns - 1;
794 int curr_dest_bn_index = 0;
795
796 // handle the move of a subset of data in last_bn_on_left from node to B
797 if (!split_on_boundary) {
798 BP_STATE(B,curr_dest_bn_index) = PT_AVAIL;
799 destroy_basement_node(BLB(B, curr_dest_bn_index)); // Destroy B's empty OMT, so I can rebuild it from an array
800 set_BNULL(B, curr_dest_bn_index);
801 set_BLB(B, curr_dest_bn_index, toku_create_empty_bn_no_buffer());
802 move_leafentries(BLB(B, curr_dest_bn_index),
803 BLB(node, curr_src_bn_index),
804 num_left_les, // first row to be moved to B
805 BLB_DATA(node, curr_src_bn_index)->num_klpairs() // number of rows in basement to be split
806 );
807 BLB_MAX_MSN_APPLIED(B, curr_dest_bn_index) = BLB_MAX_MSN_APPLIED(node, curr_src_bn_index);
808 curr_dest_bn_index++;
809 }
810 curr_src_bn_index++;
811
812 paranoid_invariant(B->n_children >= curr_dest_bn_index);
813 paranoid_invariant(node->n_children >= curr_src_bn_index);
814
815 // move the rest of the basement nodes
816 for ( ; curr_src_bn_index < node->n_children; curr_src_bn_index++, curr_dest_bn_index++) {
817 destroy_basement_node(BLB(B, curr_dest_bn_index));
818 set_BNULL(B, curr_dest_bn_index);
819 B->bp[curr_dest_bn_index] = node->bp[curr_src_bn_index];
820 }
821 if (curr_dest_bn_index < B->n_children) {
822 // B already has an empty basement node here.
823 BP_STATE(B, curr_dest_bn_index) = PT_AVAIL;
824 }
825
826 //
827 // now handle the pivots
828 //
829
830 // the child index in the original node that corresponds to the
831 // first node in the right node of the split
832 int split_idx = num_left_bns - (split_on_boundary ? 0 : 1);
833 node->pivotkeys.split_at(split_idx, &B->pivotkeys);
834 if (split_on_boundary && num_left_bns < node->n_children && splitk) {
835 toku_copyref_dbt(splitk, node->pivotkeys.get_pivot(num_left_bns - 1));
836 } else if (splitk) {
837 bn_data* bd = BLB_DATA(node, num_left_bns - 1);
838 uint32_t keylen;
839 void *key;
840 int rr = bd->fetch_key_and_len(bd->num_klpairs() - 1, &keylen, &key);
841 invariant_zero(rr);
842 toku_memdup_dbt(splitk, key, keylen);
843 }
844
845 node->n_children = num_children_in_node;
846 REALLOC_N(num_children_in_node, node->bp);
847 }
848
849 ftnode_finalize_split(node, B, max_msn_applied_to_node);
850 *nodea = node;
851 *nodeb = B;
852 } // end of ftleaf_split()
853
854 void
ft_nonleaf_split(FT ft,FTNODE node,FTNODE * nodea,FTNODE * nodeb,DBT * splitk,uint32_t num_dependent_nodes,FTNODE * dependent_nodes)855 ft_nonleaf_split(
856 FT ft,
857 FTNODE node,
858 FTNODE *nodea,
859 FTNODE *nodeb,
860 DBT *splitk,
861 uint32_t num_dependent_nodes,
862 FTNODE* dependent_nodes)
863 {
864 //VERIFY_NODE(t,node);
865 FL_STATUS_VAL(FT_FLUSHER_SPLIT_NONLEAF)++;
866 toku_ftnode_assert_fully_in_memory(node);
867 int old_n_children = node->n_children;
868 int n_children_in_a = old_n_children/2;
869 int n_children_in_b = old_n_children-n_children_in_a;
870 MSN max_msn_applied_to_node = node->max_msn_applied_to_node_on_disk;
871 FTNODE B;
872 paranoid_invariant(node->height>0);
873 paranoid_invariant(node->n_children>=2); // Otherwise, how do we split? We need at least two children to split. */
874 create_new_ftnode_with_dep_nodes(ft, &B, node->height, n_children_in_b, num_dependent_nodes, dependent_nodes);
875 {
876 /* The first n_children_in_a go into node a.
877 * That means that the first n_children_in_a-1 keys go into node a.
878 * The splitter key is key number n_children_in_a */
879 for (int i = n_children_in_a; i<old_n_children; i++) {
880 int targchild = i-n_children_in_a;
881 // TODO: Figure out better way to handle this
882 // the problem is that create_new_ftnode_with_dep_nodes for B creates
883 // all the data structures, whereas we really don't want it to fill
884 // in anything for the bp's.
885 // Now we have to go free what it just created so we can
886 // slide the bp over
887 destroy_nonleaf_childinfo(BNC(B, targchild));
888 // now move the bp over
889 B->bp[targchild] = node->bp[i];
890 memset(&node->bp[i], 0, sizeof(node->bp[0]));
891 }
892
893 // the split key for our parent is the rightmost pivot key in node
894 node->pivotkeys.split_at(n_children_in_a, &B->pivotkeys);
895 toku_clone_dbt(splitk, node->pivotkeys.get_pivot(n_children_in_a - 1));
896 node->pivotkeys.delete_at(n_children_in_a - 1);
897
898 node->n_children = n_children_in_a;
899 REALLOC_N(node->n_children, node->bp);
900 }
901
902 ftnode_finalize_split(node, B, max_msn_applied_to_node);
903 *nodea = node;
904 *nodeb = B;
905 }
906
907 //
908 // responsibility of ft_split_child is to take locked FTNODEs node and child
909 // and do the following:
910 // - split child,
911 // - fix node,
912 // - release lock on node
913 // - possibly flush either new children created from split, otherwise unlock children
914 //
915 static void
ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode,struct flusher_advice * fa)916 ft_split_child(
917 FT ft,
918 FTNODE node,
919 int childnum,
920 FTNODE child,
921 enum split_mode split_mode,
922 struct flusher_advice *fa)
923 {
924 paranoid_invariant(node->height>0);
925 paranoid_invariant(toku_bnc_nbytesinbuf(BNC(node, childnum))==0); // require that the buffer for this child is empty
926 FTNODE nodea, nodeb;
927 DBT splitk;
928
929 // for test
930 call_flusher_thread_callback(flt_flush_before_split);
931
932 FTNODE dep_nodes[2];
933 dep_nodes[0] = node;
934 dep_nodes[1] = child;
935 if (child->height==0) {
936 ftleaf_split(ft, child, &nodea, &nodeb, &splitk, true, split_mode, 2, dep_nodes);
937 } else {
938 ft_nonleaf_split(ft, child, &nodea, &nodeb, &splitk, 2, dep_nodes);
939 }
940 // printf("%s:%d child did split\n", __FILE__, __LINE__);
941 handle_split_of_child (ft, node, childnum, nodea, nodeb, &splitk);
942
943 // for test
944 call_flusher_thread_callback(flt_flush_during_split);
945
946 // at this point, the split is complete
947 // now we need to unlock node,
948 // and possibly continue
949 // flushing one of the children
950 int picked_child = fa->pick_child_after_split(ft, node, childnum, childnum + 1, fa->extra);
951 toku_unpin_ftnode(ft, node);
952 if (picked_child == childnum ||
953 (picked_child < 0 && nodea->height > 0 && fa->should_recursively_flush(nodea, fa->extra))) {
954 toku_unpin_ftnode(ft, nodeb);
955 toku_ft_flush_some_child(ft, nodea, fa);
956 }
957 else if (picked_child == childnum + 1 ||
958 (picked_child < 0 && nodeb->height > 0 && fa->should_recursively_flush(nodeb, fa->extra))) {
959 toku_unpin_ftnode(ft, nodea);
960 toku_ft_flush_some_child(ft, nodeb, fa);
961 }
962 else {
963 toku_unpin_ftnode(ft, nodea);
964 toku_unpin_ftnode(ft, nodeb);
965 }
966
967 toku_destroy_dbt(&splitk);
968 }
969
bring_node_fully_into_memory(FTNODE node,FT ft)970 static void bring_node_fully_into_memory(FTNODE node, FT ft) {
971 if (!toku_ftnode_fully_in_memory(node)) {
972 ftnode_fetch_extra bfe;
973 bfe.create_for_full_read(ft);
974 toku_cachetable_pf_pinned_pair(
975 node,
976 toku_ftnode_pf_callback,
977 &bfe,
978 ft->cf,
979 node->blocknum,
980 toku_cachetable_hash(ft->cf, node->blocknum)
981 );
982 }
983 }
984
985 static void
flush_this_child(FT ft,FTNODE node,FTNODE child,int childnum,struct flusher_advice * fa)986 flush_this_child(
987 FT ft,
988 FTNODE node,
989 FTNODE child,
990 int childnum,
991 struct flusher_advice *fa)
992 // Effect: Push everything in the CHILDNUMth buffer of node down into the child.
993 {
994 update_flush_status(child, 0);
995 toku_ftnode_assert_fully_in_memory(node);
996 if (fa->should_destroy_basement_nodes(fa)) {
997 maybe_destroy_child_blbs(node, child, ft);
998 }
999 bring_node_fully_into_memory(child, ft);
1000 toku_ftnode_assert_fully_in_memory(child);
1001 paranoid_invariant(node->height>0);
1002 paranoid_invariant(child->blocknum.b!=0);
1003 // VERIFY_NODE does not work off client thread as of now
1004 //VERIFY_NODE(t, child);
1005 node->set_dirty();
1006 child->set_dirty();
1007
1008 BP_WORKDONE(node, childnum) = 0; // this buffer is drained, no work has been done by its contents
1009 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1010 set_BNC(node, childnum, toku_create_empty_nl());
1011
1012 // now we have a bnc to flush to the child. pass down the parent's
1013 // oldest known referenced xid as we flush down to the child.
1014 toku_bnc_flush_to_child(ft, bnc, child, node->oldest_referenced_xid_known);
1015 destroy_nonleaf_childinfo(bnc);
1016 }
1017
1018 static void
merge_leaf_nodes(FTNODE a,FTNODE b)1019 merge_leaf_nodes(FTNODE a, FTNODE b)
1020 {
1021 FL_STATUS_VAL(FT_FLUSHER_MERGE_LEAF)++;
1022 toku_ftnode_assert_fully_in_memory(a);
1023 toku_ftnode_assert_fully_in_memory(b);
1024 paranoid_invariant(a->height == 0);
1025 paranoid_invariant(b->height == 0);
1026 paranoid_invariant(a->n_children > 0);
1027 paranoid_invariant(b->n_children > 0);
1028
1029 // Mark nodes as dirty before moving basements from b to a.
1030 // This way, whatever deltas are accumulated in the basements are
1031 // applied to the in_memory_stats in the header if they have not already
1032 // been (if nodes are clean).
1033 // TODO(leif): this is no longer the way in_memory_stats is
1034 // maintained. verify that it's ok to move this just before the unpin
1035 // and then do that.
1036 a->set_dirty();
1037 b->set_dirty();
1038
1039 bn_data* a_last_bd = BLB_DATA(a, a->n_children-1);
1040 // this bool states if the last basement node in a has any items or not
1041 // If it does, then it stays in the merge. If it does not, the last basement node
1042 // of a gets eliminated because we do not have a pivot to store for it (because it has no elements)
1043 const bool a_has_tail = a_last_bd->num_klpairs() > 0;
1044
1045 int num_children = a->n_children + b->n_children;
1046 if (!a_has_tail) {
1047 int lastchild = a->n_children - 1;
1048 BASEMENTNODE bn = BLB(a, lastchild);
1049
1050 // verify that last basement in a is empty, then destroy mempool
1051 size_t used_space = a_last_bd->get_disk_size();
1052 invariant_zero(used_space);
1053 destroy_basement_node(bn);
1054 set_BNULL(a, lastchild);
1055 num_children--;
1056 if (lastchild < a->pivotkeys.num_pivots()) {
1057 a->pivotkeys.delete_at(lastchild);
1058 }
1059 } else {
1060 // fill in pivot for what used to be max of node 'a', if it is needed
1061 uint32_t keylen;
1062 void *key;
1063 int r = a_last_bd->fetch_key_and_len(a_last_bd->num_klpairs() - 1, &keylen, &key);
1064 invariant_zero(r);
1065 DBT pivotkey;
1066 toku_fill_dbt(&pivotkey, key, keylen);
1067 a->pivotkeys.replace_at(&pivotkey, a->n_children - 1);
1068 }
1069
1070 // realloc basement nodes in `a'
1071 REALLOC_N(num_children, a->bp);
1072
1073 // move each basement node from b to a
1074 uint32_t offset = a_has_tail ? a->n_children : a->n_children - 1;
1075 for (int i = 0; i < b->n_children; i++) {
1076 a->bp[i + offset] = b->bp[i];
1077 memset(&b->bp[i], 0, sizeof(b->bp[0]));
1078 }
1079
1080 // append b's pivots to a's pivots
1081 a->pivotkeys.append(b->pivotkeys);
1082
1083 // now that all the data has been moved from b to a, we can destroy the data in b
1084 a->n_children = num_children;
1085 b->pivotkeys.destroy();
1086 b->n_children = 0;
1087 }
1088
balance_leaf_nodes(FTNODE a,FTNODE b,DBT * splitk)1089 static void balance_leaf_nodes(
1090 FTNODE a,
1091 FTNODE b,
1092 DBT *splitk)
1093 // Effect:
1094 // If b is bigger then move stuff from b to a until b is the smaller.
1095 // If a is bigger then move stuff from a to b until a is the smaller.
1096 {
1097 FL_STATUS_VAL(FT_FLUSHER_BALANCE_LEAF)++;
1098 // first merge all the data into a
1099 merge_leaf_nodes(a,b);
1100 // now split them
1101 // because we are not creating a new node, we can pass in no dependent nodes
1102 ftleaf_split(NULL, a, &a, &b, splitk, false, SPLIT_EVENLY, 0, NULL);
1103 }
1104
1105 static void
maybe_merge_pinned_leaf_nodes(FTNODE a,FTNODE b,const DBT * parent_splitk,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1106 maybe_merge_pinned_leaf_nodes(
1107 FTNODE a,
1108 FTNODE b,
1109 const DBT *parent_splitk,
1110 bool *did_merge,
1111 bool *did_rebalance,
1112 DBT *splitk,
1113 uint32_t nodesize
1114 )
1115 // Effect: Either merge a and b into one one node (merge them into a) and set *did_merge = true.
1116 // (We do this if the resulting node is not fissible)
1117 // or distribute the leafentries evenly between a and b, and set *did_rebalance = true.
1118 // (If a and be are already evenly distributed, we may do nothing.)
1119 {
1120 unsigned int sizea = toku_serialize_ftnode_size(a);
1121 unsigned int sizeb = toku_serialize_ftnode_size(b);
1122 uint32_t num_leafentries = toku_ftnode_leaf_num_entries(a) + toku_ftnode_leaf_num_entries(b);
1123 if (num_leafentries > 1 && (sizea + sizeb)*4 > (nodesize*3)) {
1124 // the combined size is more than 3/4 of a node, so don't merge them.
1125 *did_merge = false;
1126 if (sizea*4 > nodesize && sizeb*4 > nodesize) {
1127 // no need to do anything if both are more than 1/4 of a node.
1128 *did_rebalance = false;
1129 toku_clone_dbt(splitk, *parent_splitk);
1130 return;
1131 }
1132 // one is less than 1/4 of a node, and together they are more than 3/4 of a node.
1133 *did_rebalance = true;
1134 balance_leaf_nodes(a, b, splitk);
1135 } else {
1136 // we are merging them.
1137 *did_merge = true;
1138 *did_rebalance = false;
1139 toku_init_dbt(splitk);
1140 merge_leaf_nodes(a, b);
1141 }
1142 }
1143
1144 static void
maybe_merge_pinned_nonleaf_nodes(const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk)1145 maybe_merge_pinned_nonleaf_nodes(
1146 const DBT *parent_splitk,
1147 FTNODE a,
1148 FTNODE b,
1149 bool *did_merge,
1150 bool *did_rebalance,
1151 DBT *splitk)
1152 {
1153 toku_ftnode_assert_fully_in_memory(a);
1154 toku_ftnode_assert_fully_in_memory(b);
1155 invariant_notnull(parent_splitk->data);
1156
1157 int old_n_children = a->n_children;
1158 int new_n_children = old_n_children + b->n_children;
1159
1160 XREALLOC_N(new_n_children, a->bp);
1161 memcpy(a->bp + old_n_children, b->bp, b->n_children * sizeof(b->bp[0]));
1162 memset(b->bp, 0, b->n_children * sizeof(b->bp[0]));
1163
1164 a->pivotkeys.insert_at(parent_splitk, old_n_children - 1);
1165 a->pivotkeys.append(b->pivotkeys);
1166 a->n_children = new_n_children;
1167 b->n_children = 0;
1168
1169 a->set_dirty();
1170 b->set_dirty();
1171
1172 *did_merge = true;
1173 *did_rebalance = false;
1174 toku_init_dbt(splitk);
1175
1176 FL_STATUS_VAL(FT_FLUSHER_MERGE_NONLEAF)++;
1177 }
1178
1179 static void
maybe_merge_pinned_nodes(FTNODE parent,const DBT * parent_splitk,FTNODE a,FTNODE b,bool * did_merge,bool * did_rebalance,DBT * splitk,uint32_t nodesize)1180 maybe_merge_pinned_nodes(
1181 FTNODE parent,
1182 const DBT *parent_splitk,
1183 FTNODE a,
1184 FTNODE b,
1185 bool *did_merge,
1186 bool *did_rebalance,
1187 DBT *splitk,
1188 uint32_t nodesize
1189 )
1190 // Effect: either merge a and b into one node (merge them into a) and set *did_merge = true.
1191 // (We do this if the resulting node is not fissible)
1192 // or distribute a and b evenly and set *did_merge = false and *did_rebalance = true
1193 // (If a and be are already evenly distributed, we may do nothing.)
1194 // If we distribute:
1195 // For leaf nodes, we distribute the leafentries evenly.
1196 // For nonleaf nodes, we distribute the children evenly. That may leave one or both of the nodes overfull, but that's OK.
1197 // If we distribute, we set *splitk to a malloced pivot key.
1198 // Parameters:
1199 // t The FT.
1200 // parent The parent of the two nodes to be split.
1201 // parent_splitk The pivot key between a and b. This is either free()'d or returned in *splitk.
1202 // a The first node to merge.
1203 // b The second node to merge.
1204 // logger The logger.
1205 // did_merge (OUT): Did the two nodes actually get merged?
1206 // splitk (OUT): If the two nodes did not get merged, the new pivot key between the two nodes.
1207 {
1208 MSN msn_max;
1209 paranoid_invariant(a->height == b->height);
1210 toku_ftnode_assert_fully_in_memory(parent);
1211 toku_ftnode_assert_fully_in_memory(a);
1212 toku_ftnode_assert_fully_in_memory(b);
1213 parent->set_dirty(); // just to make sure
1214 {
1215 MSN msna = a->max_msn_applied_to_node_on_disk;
1216 MSN msnb = b->max_msn_applied_to_node_on_disk;
1217 msn_max = (msna.msn > msnb.msn) ? msna : msnb;
1218 }
1219 if (a->height == 0) {
1220 maybe_merge_pinned_leaf_nodes(a, b, parent_splitk, did_merge, did_rebalance, splitk, nodesize);
1221 } else {
1222 maybe_merge_pinned_nonleaf_nodes(parent_splitk, a, b, did_merge, did_rebalance, splitk);
1223 }
1224 if (*did_merge || *did_rebalance) {
1225 // accurate for leaf nodes because all msgs above have been
1226 // applied, accurate for non-leaf nodes because buffer immediately
1227 // above each node has been flushed
1228 a->max_msn_applied_to_node_on_disk = msn_max;
1229 b->max_msn_applied_to_node_on_disk = msn_max;
1230 }
1231 }
1232
merge_remove_key_callback(BLOCKNUM * bp,bool for_checkpoint,void * extra)1233 static void merge_remove_key_callback(BLOCKNUM *bp, bool for_checkpoint, void *extra) {
1234 FT ft = (FT) extra;
1235 ft->blocktable.free_blocknum(bp, ft, for_checkpoint);
1236 }
1237
1238 //
1239 // Takes as input a locked node and a childnum_to_merge
1240 // As output, two of node's children are merged or rebalanced, and node is unlocked
1241 //
1242 static void
ft_merge_child(FT ft,FTNODE node,int childnum_to_merge,bool * did_react,struct flusher_advice * fa)1243 ft_merge_child(
1244 FT ft,
1245 FTNODE node,
1246 int childnum_to_merge,
1247 bool *did_react,
1248 struct flusher_advice *fa)
1249 {
1250 // this function should not be called
1251 // if the child is not mergable
1252 paranoid_invariant(node->n_children > 1);
1253 toku_ftnode_assert_fully_in_memory(node);
1254
1255 int childnuma,childnumb;
1256 if (childnum_to_merge > 0) {
1257 childnuma = childnum_to_merge-1;
1258 childnumb = childnum_to_merge;
1259 } else {
1260 childnuma = childnum_to_merge;
1261 childnumb = childnum_to_merge+1;
1262 }
1263 paranoid_invariant(0 <= childnuma);
1264 paranoid_invariant(childnuma+1 == childnumb);
1265 paranoid_invariant(childnumb < node->n_children);
1266
1267 paranoid_invariant(node->height>0);
1268
1269 // We suspect that at least one of the children is fusible, but they might not be.
1270 // for test
1271 call_flusher_thread_callback(flt_flush_before_merge);
1272
1273 FTNODE childa, childb;
1274 {
1275 uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnuma);
1276 ftnode_fetch_extra bfe;
1277 bfe.create_for_full_read(ft);
1278 toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnuma), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &node, &childa, true);
1279 }
1280 // for test
1281 call_flusher_thread_callback(flt_flush_before_pin_second_node_for_merge);
1282 {
1283 FTNODE dep_nodes[2];
1284 dep_nodes[0] = node;
1285 dep_nodes[1] = childa;
1286 uint32_t childfullhash = compute_child_fullhash(ft->cf, node, childnumb);
1287 ftnode_fetch_extra bfe;
1288 bfe.create_for_full_read(ft);
1289 toku_pin_ftnode_with_dep_nodes(ft, BP_BLOCKNUM(node, childnumb), childfullhash, &bfe, PL_WRITE_EXPENSIVE, 2, dep_nodes, &childb, true);
1290 }
1291
1292 if (toku_bnc_n_entries(BNC(node,childnuma))>0) {
1293 flush_this_child(ft, node, childa, childnuma, fa);
1294 }
1295 if (toku_bnc_n_entries(BNC(node,childnumb))>0) {
1296 flush_this_child(ft, node, childb, childnumb, fa);
1297 }
1298
1299 // now we have both children pinned in main memory, and cachetable locked,
1300 // so no checkpoints will occur.
1301
1302 bool did_merge, did_rebalance;
1303 {
1304 DBT splitk;
1305 toku_init_dbt(&splitk);
1306 const DBT old_split_key = node->pivotkeys.get_pivot(childnuma);
1307 maybe_merge_pinned_nodes(node, &old_split_key, childa, childb, &did_merge, &did_rebalance, &splitk, ft->h->nodesize);
1308 //toku_verify_estimates(t,childa);
1309 // the tree did react if a merge (did_merge) or rebalance (new spkit key) occurred
1310 *did_react = (bool)(did_merge || did_rebalance);
1311
1312 if (did_merge) {
1313 invariant_null(splitk.data);
1314 NONLEAF_CHILDINFO remaining_bnc = BNC(node, childnuma);
1315 NONLEAF_CHILDINFO merged_bnc = BNC(node, childnumb);
1316 for (unsigned int i = 0; i < (sizeof remaining_bnc->flow) / (sizeof remaining_bnc->flow[0]); ++i) {
1317 remaining_bnc->flow[i] += merged_bnc->flow[i];
1318 }
1319 destroy_nonleaf_childinfo(merged_bnc);
1320 set_BNULL(node, childnumb);
1321 node->n_children--;
1322 memmove(&node->bp[childnumb],
1323 &node->bp[childnumb+1],
1324 (node->n_children-childnumb)*sizeof(node->bp[0]));
1325 REALLOC_N(node->n_children, node->bp);
1326 node->pivotkeys.delete_at(childnuma);
1327
1328 // Handle a merge of the rightmost leaf node.
1329 BLOCKNUM rightmost_blocknum = toku_unsafe_fetch(&ft->rightmost_blocknum);
1330 if (did_merge && childb->blocknum.b == rightmost_blocknum.b) {
1331 invariant(childb->blocknum.b != ft->h->root_blocknum.b);
1332 toku_ftnode_swap_pair_values(childa, childb);
1333 BP_BLOCKNUM(node, childnuma) = childa->blocknum;
1334 }
1335
1336 paranoid_invariant(BP_BLOCKNUM(node, childnuma).b == childa->blocknum.b);
1337 childa->set_dirty(); // just to make sure
1338 childb->set_dirty(); // just to make sure
1339 } else {
1340 // flow will be inaccurate for a while, oh well. the children
1341 // are leaves in this case so it's not a huge deal (we're
1342 // pretty far down the tree)
1343
1344 // If we didn't merge the nodes, then we need the correct pivot.
1345 invariant_notnull(splitk.data);
1346 node->pivotkeys.replace_at(&splitk, childnuma);
1347 node->set_dirty();
1348 }
1349 toku_destroy_dbt(&splitk);
1350 }
1351 //
1352 // now we possibly flush the children
1353 //
1354 if (did_merge) {
1355 // for test
1356 call_flusher_thread_callback(flt_flush_before_unpin_remove);
1357
1358 // merge_remove_key_callback will free the blocknum
1359 int rrb = toku_cachetable_unpin_and_remove(
1360 ft->cf,
1361 childb->ct_pair,
1362 merge_remove_key_callback,
1363 ft
1364 );
1365 assert_zero(rrb);
1366
1367 // for test
1368 call_flusher_thread_callback(ft_flush_aflter_merge);
1369
1370 // unlock the parent
1371 paranoid_invariant(node->dirty());
1372 toku_unpin_ftnode(ft, node);
1373 }
1374 else {
1375 // for test
1376 call_flusher_thread_callback(ft_flush_aflter_rebalance);
1377
1378 // unlock the parent
1379 paranoid_invariant(node->dirty());
1380 toku_unpin_ftnode(ft, node);
1381 toku_unpin_ftnode(ft, childb);
1382 }
1383 if (childa->height > 0 && fa->should_recursively_flush(childa, fa->extra)) {
1384 toku_ft_flush_some_child(ft, childa, fa);
1385 }
1386 else {
1387 toku_unpin_ftnode(ft, childa);
1388 }
1389 }
1390
toku_ft_flush_some_child(FT ft,FTNODE parent,struct flusher_advice * fa)1391 void toku_ft_flush_some_child(FT ft, FTNODE parent, struct flusher_advice *fa)
1392 // Effect: This function does the following:
1393 // - Pick a child of parent (the heaviest child),
1394 // - flush from parent to child,
1395 // - possibly split/merge child.
1396 // - if child is gorged, recursively proceed with child
1397 // Note that parent is already locked
1398 // Upon exit of this function, parent is unlocked and no new
1399 // new nodes (such as a child) remain locked
1400 {
1401 int dirtied = 0;
1402 NONLEAF_CHILDINFO bnc = NULL;
1403 paranoid_invariant(parent->height>0);
1404 toku_ftnode_assert_fully_in_memory(parent);
1405 TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1406
1407 // pick the child we want to flush to
1408 int childnum = fa->pick_child(ft, parent, fa->extra);
1409
1410 // for test
1411 call_flusher_thread_callback(flt_flush_before_child_pin);
1412
1413 // get the child into memory
1414 BLOCKNUM targetchild = BP_BLOCKNUM(parent, childnum);
1415 ft->blocktable.verify_blocknum_allocated(targetchild);
1416 uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1417 FTNODE child;
1418 ftnode_fetch_extra bfe;
1419 // Note that we don't read the entire node into memory yet.
1420 // The idea is let's try to do the minimum work before releasing the parent lock
1421 bfe.create_for_min_read(ft);
1422 toku_pin_ftnode_with_dep_nodes(ft, targetchild, childfullhash, &bfe, PL_WRITE_EXPENSIVE, 1, &parent, &child, true);
1423
1424 // for test
1425 call_flusher_thread_callback(ft_flush_aflter_child_pin);
1426
1427 if (fa->should_destroy_basement_nodes(fa)) {
1428 maybe_destroy_child_blbs(parent, child, ft);
1429 }
1430
1431 //Note that at this point, we don't have the entire child in.
1432 // Let's do a quick check to see if the child may be reactive
1433 // If the child cannot be reactive, then we can safely unlock
1434 // the parent before finishing reading in the entire child node.
1435 bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1436
1437 paranoid_invariant(child->blocknum.b!=0);
1438
1439 // only do the following work if there is a flush to perform
1440 if (toku_bnc_n_entries(BNC(parent, childnum)) > 0 || parent->height == 1) {
1441 if (!parent->dirty()) {
1442 dirtied++;
1443 parent->set_dirty();
1444 }
1445 // detach buffer
1446 BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
1447 bnc = BNC(parent, childnum);
1448 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1449 memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1450 set_BNC(parent, childnum, new_bnc);
1451 }
1452
1453 //
1454 // at this point, the buffer has been detached from the parent
1455 // and a new empty buffer has been placed in its stead
1456 // so, if we are absolutely sure that the child is not
1457 // reactive, we can unpin the parent
1458 //
1459 if (!may_child_be_reactive) {
1460 toku_unpin_ftnode(ft, parent);
1461 parent = NULL;
1462 }
1463
1464 //
1465 // now, if necessary, read/decompress the rest of child into memory,
1466 // so that we can proceed and apply the flush
1467 //
1468 bring_node_fully_into_memory(child, ft);
1469
1470 // It is possible after reading in the entire child,
1471 // that we now know that the child is not reactive
1472 // if so, we can unpin parent right now
1473 // we won't be splitting/merging child
1474 // and we have already replaced the bnc
1475 // for the root with a fresh one
1476 enum reactivity child_re = toku_ftnode_get_reactivity(ft, child);
1477 if (parent && child_re == RE_STABLE) {
1478 toku_unpin_ftnode(ft, parent);
1479 parent = NULL;
1480 }
1481
1482 // from above, we know at this point that either the bnc
1483 // is detached from the parent (which may be unpinned),
1484 // and we have to apply the flush, or there was no data
1485 // in the buffer to flush, and as a result, flushing is not necessary
1486 // and bnc is NULL
1487 if (bnc != NULL) {
1488 if (!child->dirty()) {
1489 dirtied++;
1490 child->set_dirty();
1491 }
1492 // do the actual flush
1493 toku_bnc_flush_to_child(
1494 ft,
1495 bnc,
1496 child,
1497 parent_oldest_referenced_xid_known
1498 );
1499 destroy_nonleaf_childinfo(bnc);
1500 }
1501
1502 fa->update_status(child, dirtied, fa->extra);
1503 // let's get the reactivity of the child again,
1504 // it is possible that the flush got rid of some values
1505 // and now the parent is no longer reactive
1506 child_re = toku_ftnode_get_reactivity(ft, child);
1507 // if the parent has been unpinned above, then
1508 // this is our only option, even if the child is not stable
1509 // if the child is not stable, we'll handle it the next
1510 // time we need to flush to the child
1511 if (!parent ||
1512 child_re == RE_STABLE ||
1513 (child_re == RE_FUSIBLE && parent->n_children == 1)
1514 )
1515 {
1516 if (parent) {
1517 toku_unpin_ftnode(ft, parent);
1518 parent = NULL;
1519 }
1520 //
1521 // it is the responsibility of toku_ft_flush_some_child to unpin child
1522 //
1523 if (child->height > 0 && fa->should_recursively_flush(child, fa->extra)) {
1524 toku_ft_flush_some_child(ft, child, fa);
1525 }
1526 else {
1527 toku_unpin_ftnode(ft, child);
1528 }
1529 }
1530 else if (child_re == RE_FISSIBLE) {
1531 //
1532 // it is responsibility of `ft_split_child` to unlock nodes of
1533 // parent and child as it sees fit
1534 //
1535 paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1536 ft_split_child(ft, parent, childnum, child, SPLIT_EVENLY, fa);
1537 }
1538 else if (child_re == RE_FUSIBLE) {
1539 //
1540 // it is responsibility of `maybe_merge_child to unlock nodes of
1541 // parent and child as it sees fit
1542 //
1543 paranoid_invariant(parent); // just make sure we have not accidentally unpinned parent
1544 fa->maybe_merge_child(fa, ft, parent, childnum, child, fa->extra);
1545 }
1546 else {
1547 abort();
1548 }
1549 }
1550
toku_bnc_flush_to_child(FT ft,NONLEAF_CHILDINFO bnc,FTNODE child,TXNID parent_oldest_referenced_xid_known)1551 void toku_bnc_flush_to_child(FT ft, NONLEAF_CHILDINFO bnc, FTNODE child, TXNID parent_oldest_referenced_xid_known) {
1552 paranoid_invariant(bnc);
1553
1554 TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1555 TXN_MANAGER txn_manager = logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
1556 TXNID oldest_referenced_xid_for_simple_gc = TXNID_NONE;
1557
1558 txn_manager_state txn_state_for_gc(txn_manager);
1559 bool do_garbage_collection = child->height == 0 && txn_manager != nullptr;
1560 if (do_garbage_collection) {
1561 txn_state_for_gc.init();
1562 oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1563 }
1564 txn_gc_info gc_info(&txn_state_for_gc,
1565 oldest_referenced_xid_for_simple_gc,
1566 child->oldest_referenced_xid_known,
1567 true);
1568 struct flush_msg_fn {
1569 FT ft;
1570 FTNODE child;
1571 NONLEAF_CHILDINFO bnc;
1572 txn_gc_info *gc_info;
1573
1574 STAT64INFO_S stats_delta;
1575 int64_t logical_rows_delta = 0;
1576 size_t remaining_memsize = bnc->msg_buffer.buffer_size_in_use();
1577
1578 flush_msg_fn(FT t, FTNODE n, NONLEAF_CHILDINFO nl, txn_gc_info *g) :
1579 ft(t), child(n), bnc(nl), gc_info(g), remaining_memsize(bnc->msg_buffer.buffer_size_in_use()) {
1580 stats_delta = { 0, 0 };
1581 }
1582 int operator()(const ft_msg &msg, bool is_fresh) {
1583 size_t flow_deltas[] = { 0, 0 };
1584 size_t memsize_in_buffer = message_buffer::msg_memsize_in_buffer(msg);
1585 if (remaining_memsize <= bnc->flow[0]) {
1586 // this message is in the current checkpoint's worth of
1587 // the end of the message buffer
1588 flow_deltas[0] = memsize_in_buffer;
1589 } else if (remaining_memsize <= bnc->flow[0] + bnc->flow[1]) {
1590 // this message is in the last checkpoint's worth of the
1591 // end of the message buffer
1592 flow_deltas[1] = memsize_in_buffer;
1593 }
1594 toku_ftnode_put_msg(
1595 ft->cmp,
1596 ft->update_fun,
1597 child,
1598 -1,
1599 msg,
1600 is_fresh,
1601 gc_info,
1602 flow_deltas,
1603 &stats_delta,
1604 &logical_rows_delta);
1605 remaining_memsize -= memsize_in_buffer;
1606 return 0;
1607 }
1608 } flush_fn(ft, child, bnc, &gc_info);
1609 bnc->msg_buffer.iterate(flush_fn);
1610
1611 child->oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1612
1613 invariant(flush_fn.remaining_memsize == 0);
1614 if (flush_fn.stats_delta.numbytes || flush_fn.stats_delta.numrows) {
1615 toku_ft_update_stats(&ft->in_memory_stats, flush_fn.stats_delta);
1616 }
1617 toku_ft_adjust_logical_row_count(ft, flush_fn.logical_rows_delta);
1618 if (do_garbage_collection) {
1619 size_t buffsize = bnc->msg_buffer.buffer_size_in_use();
1620 // may be misleading if there's a broadcast message in there
1621 toku_ft_status_note_msg_bytes_out(buffsize);
1622 }
1623 }
1624
1625 static void
update_cleaner_status(FTNODE node,int childnum)1626 update_cleaner_status(
1627 FTNODE node,
1628 int childnum)
1629 {
1630 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_NODES)++;
1631 if (node->height == 1) {
1632 FL_STATUS_VAL(FT_FLUSHER_CLEANER_H1_NODES)++;
1633 } else {
1634 FL_STATUS_VAL(FT_FLUSHER_CLEANER_HGT1_NODES)++;
1635 }
1636
1637 unsigned int nbytesinbuf = toku_bnc_nbytesinbuf(BNC(node, childnum));
1638 if (nbytesinbuf == 0) {
1639 FL_STATUS_VAL(FT_FLUSHER_CLEANER_EMPTY_NODES)++;
1640 } else {
1641 if (nbytesinbuf > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE)) {
1642 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_SIZE) = nbytesinbuf;
1643 }
1644 if (nbytesinbuf < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE)) {
1645 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_SIZE) = nbytesinbuf;
1646 }
1647 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_SIZE) += nbytesinbuf;
1648
1649 uint64_t workdone = BP_WORKDONE(node, childnum);
1650 if (workdone > FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE)) {
1651 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MAX_BUFFER_WORKDONE) = workdone;
1652 }
1653 if (workdone < FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE)) {
1654 FL_STATUS_VAL(FT_FLUSHER_CLEANER_MIN_BUFFER_WORKDONE) = workdone;
1655 }
1656 FL_STATUS_VAL(FT_FLUSHER_CLEANER_TOTAL_BUFFER_WORKDONE) += workdone;
1657 }
1658 }
1659
1660 static void
dummy_update_status(FTNODE UU (child),int UU (dirtied),void * UU (extra))1661 dummy_update_status(
1662 FTNODE UU(child),
1663 int UU(dirtied),
1664 void* UU(extra)
1665 )
1666 {
1667 }
1668
1669 static int
dummy_pick_heaviest_child(FT UU (h),FTNODE UU (parent),void * UU (extra))1670 dummy_pick_heaviest_child(FT UU(h),
1671 FTNODE UU(parent),
1672 void* UU(extra))
1673 {
1674 abort();
1675 return -1;
1676 }
1677
toku_ft_split_child(FT ft,FTNODE node,int childnum,FTNODE child,enum split_mode split_mode)1678 void toku_ft_split_child(
1679 FT ft,
1680 FTNODE node,
1681 int childnum,
1682 FTNODE child,
1683 enum split_mode split_mode
1684 )
1685 {
1686 struct flusher_advice fa;
1687 flusher_advice_init(
1688 &fa,
1689 dummy_pick_heaviest_child,
1690 dont_destroy_basement_nodes,
1691 never_recursively_flush,
1692 default_merge_child,
1693 dummy_update_status,
1694 default_pick_child_after_split,
1695 NULL
1696 );
1697 ft_split_child(
1698 ft,
1699 node,
1700 childnum, // childnum to split
1701 child,
1702 split_mode,
1703 &fa
1704 );
1705 }
1706
toku_ft_merge_child(FT ft,FTNODE node,int childnum)1707 void toku_ft_merge_child(
1708 FT ft,
1709 FTNODE node,
1710 int childnum
1711 )
1712 {
1713 struct flusher_advice fa;
1714 flusher_advice_init(
1715 &fa,
1716 dummy_pick_heaviest_child,
1717 dont_destroy_basement_nodes,
1718 never_recursively_flush,
1719 default_merge_child,
1720 dummy_update_status,
1721 default_pick_child_after_split,
1722 NULL
1723 );
1724 bool did_react;
1725 ft_merge_child(
1726 ft,
1727 node,
1728 childnum, // childnum to merge
1729 &did_react,
1730 &fa
1731 );
1732 }
1733
1734 int
toku_ftnode_cleaner_callback(void * ftnode_pv,BLOCKNUM blocknum,uint32_t fullhash,void * extraargs)1735 toku_ftnode_cleaner_callback(
1736 void *ftnode_pv,
1737 BLOCKNUM blocknum,
1738 uint32_t fullhash,
1739 void *extraargs)
1740 {
1741 FTNODE node = (FTNODE) ftnode_pv;
1742 invariant(node->blocknum.b == blocknum.b);
1743 invariant(node->fullhash == fullhash);
1744 invariant(node->height > 0); // we should never pick a leaf node (for now at least)
1745 FT ft = (FT) extraargs;
1746 bring_node_fully_into_memory(node, ft);
1747 int childnum = find_heaviest_child(node);
1748 update_cleaner_status(node, childnum);
1749
1750 // Either toku_ft_flush_some_child will unlock the node, or we do it here.
1751 if (toku_bnc_nbytesinbuf(BNC(node, childnum)) > 0) {
1752 struct flusher_advice fa;
1753 struct flush_status_update_extra fste;
1754 ct_flusher_advice_init(&fa, &fste, ft->h->nodesize);
1755 toku_ft_flush_some_child(ft, node, &fa);
1756 } else {
1757 toku_unpin_ftnode(ft, node);
1758 }
1759 return 0;
1760 }
1761
1762 struct flusher_extra {
1763 FT ft;
1764 FTNODE node;
1765 NONLEAF_CHILDINFO bnc;
1766 TXNID parent_oldest_referenced_xid_known;
1767 };
1768
1769 //
1770 // This is the function that gets called by a
1771 // background thread. Its purpose is to complete
1772 // a flush, and possibly do a split/merge.
1773 //
flush_node_fun(void * fe_v)1774 static void flush_node_fun(void *fe_v)
1775 {
1776 toku::context flush_ctx(CTX_FLUSH);
1777 struct flusher_extra* fe = (struct flusher_extra *) fe_v;
1778 // The node that has been placed on the background
1779 // thread may not be fully in memory. Some message
1780 // buffers may be compressed. Before performing
1781 // any operations, we must first make sure
1782 // the node is fully in memory
1783 //
1784 // If we have a bnc, that means fe->node is a child, and we've already
1785 // destroyed its basement nodes if necessary, so we now need to either
1786 // read them back in, or just do the regular partial fetch. If we
1787 // don't, that means fe->node is a parent, so we need to do this anyway.
1788 bring_node_fully_into_memory(fe->node,fe->ft);
1789 fe->node->set_dirty();
1790
1791 struct flusher_advice fa;
1792 struct flush_status_update_extra fste;
1793 flt_flusher_advice_init(&fa, &fste, fe->ft->h->nodesize);
1794
1795 if (fe->bnc) {
1796 // In this case, we have a bnc to flush to a node
1797
1798 // for test purposes
1799 call_flusher_thread_callback(flt_flush_before_applying_inbox);
1800
1801 toku_bnc_flush_to_child(
1802 fe->ft,
1803 fe->bnc,
1804 fe->node,
1805 fe->parent_oldest_referenced_xid_known
1806 );
1807 destroy_nonleaf_childinfo(fe->bnc);
1808
1809 // after the flush has completed, now check to see if the node needs flushing
1810 // If so, call toku_ft_flush_some_child on the node (because this flush intends to
1811 // pass a meaningful oldest referenced xid for simple garbage collection), and it is the
1812 // responsibility of the flush to unlock the node. otherwise, we unlock it here.
1813 if (fe->node->height > 0 && toku_ftnode_nonleaf_is_gorged(fe->node, fe->ft->h->nodesize)) {
1814 toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1815 }
1816 else {
1817 toku_unpin_ftnode(fe->ft,fe->node);
1818 }
1819 }
1820 else {
1821 // In this case, we were just passed a node with no
1822 // bnc, which means we are tasked with flushing some
1823 // buffer in the node.
1824 // It is the responsibility of flush some child to unlock the node
1825 toku_ft_flush_some_child(fe->ft, fe->node, &fa);
1826 }
1827 remove_background_job_from_cf(fe->ft->cf);
1828 toku_free(fe);
1829 }
1830
1831 static void
place_node_and_bnc_on_background_thread(FT ft,FTNODE node,NONLEAF_CHILDINFO bnc,TXNID parent_oldest_referenced_xid_known)1832 place_node_and_bnc_on_background_thread(
1833 FT ft,
1834 FTNODE node,
1835 NONLEAF_CHILDINFO bnc,
1836 TXNID parent_oldest_referenced_xid_known)
1837 {
1838 struct flusher_extra *XMALLOC(fe);
1839 fe->ft = ft;
1840 fe->node = node;
1841 fe->bnc = bnc;
1842 fe->parent_oldest_referenced_xid_known = parent_oldest_referenced_xid_known;
1843 cachefile_kibbutz_enq(ft->cf, flush_node_fun, fe);
1844 }
1845
1846 //
1847 // This takes as input a gorged, locked, non-leaf node named parent
1848 // and sets up a flush to be done in the background.
1849 // The flush is setup like this:
1850 // - We call maybe_get_and_pin_clean on the child we want to flush to in order to try to lock the child
1851 // - if we successfully pin the child, and the child does not need to be split or merged
1852 // then we detach the buffer, place the child and buffer onto a background thread, and
1853 // have the flush complete in the background, and unlock the parent. The child will be
1854 // unlocked on the background thread
1855 // - if any of the above does not happen (child cannot be locked,
1856 // child needs to be split/merged), then we place the parent on the background thread.
1857 // The parent will be unlocked on the background thread
1858 //
toku_ft_flush_node_on_background_thread(FT ft,FTNODE parent)1859 void toku_ft_flush_node_on_background_thread(FT ft, FTNODE parent)
1860 {
1861 toku::context flush_ctx(CTX_FLUSH);
1862 TXNID parent_oldest_referenced_xid_known = parent->oldest_referenced_xid_known;
1863 //
1864 // first let's see if we can detach buffer on client thread
1865 // and pick the child we want to flush to
1866 //
1867 int childnum = find_heaviest_child(parent);
1868 paranoid_invariant(toku_bnc_n_entries(BNC(parent, childnum))>0);
1869 //
1870 // see if we can pin the child
1871 //
1872 FTNODE child;
1873 uint32_t childfullhash = compute_child_fullhash(ft->cf, parent, childnum);
1874 int r = toku_maybe_pin_ftnode_clean(ft, BP_BLOCKNUM(parent, childnum), childfullhash, PL_WRITE_EXPENSIVE, &child);
1875 if (r != 0) {
1876 // In this case, we could not lock the child, so just place the parent on the background thread
1877 // In the callback, we will use toku_ft_flush_some_child, which checks to
1878 // see if we should blow away the old basement nodes.
1879 place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1880 }
1881 else {
1882 //
1883 // successfully locked child
1884 //
1885 bool may_child_be_reactive = ft_ftnode_may_be_reactive(ft, child);
1886 if (!may_child_be_reactive) {
1887 // We're going to unpin the parent, so before we do, we must
1888 // check to see if we need to blow away the basement nodes to
1889 // keep the MSN invariants intact.
1890 maybe_destroy_child_blbs(parent, child, ft);
1891
1892 //
1893 // can detach buffer and unpin root here
1894 //
1895 parent->set_dirty();
1896 BP_WORKDONE(parent, childnum) = 0; // this buffer is drained, no work has been done by its contents
1897 NONLEAF_CHILDINFO bnc = BNC(parent, childnum);
1898 NONLEAF_CHILDINFO new_bnc = toku_create_empty_nl();
1899 memcpy(new_bnc->flow, bnc->flow, sizeof bnc->flow);
1900 set_BNC(parent, childnum, new_bnc);
1901
1902 //
1903 // at this point, the buffer has been detached from the parent
1904 // and a new empty buffer has been placed in its stead
1905 // so, because we know for sure the child is not
1906 // reactive, we can unpin the parent
1907 //
1908 place_node_and_bnc_on_background_thread(ft, child, bnc, parent_oldest_referenced_xid_known);
1909 toku_unpin_ftnode(ft, parent);
1910 }
1911 else {
1912 // because the child may be reactive, we need to
1913 // put parent on background thread.
1914 // As a result, we unlock the child here.
1915 toku_unpin_ftnode(ft, child);
1916 // Again, we'll have the parent on the background thread, so
1917 // we don't need to destroy the basement nodes yet.
1918 place_node_and_bnc_on_background_thread(ft, parent, NULL, parent_oldest_referenced_xid_known);
1919 }
1920 }
1921 }
1922
1923 #include <toku_race_tools.h>
1924 void __attribute__((__constructor__)) toku_ft_flusher_helgrind_ignore(void);
1925 void
toku_ft_flusher_helgrind_ignore(void)1926 toku_ft_flusher_helgrind_ignore(void) {
1927 TOKU_VALGRIND_HG_DISABLE_CHECKING(&fl_status, sizeof fl_status);
1928 }
1929