1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /*======
main()4 This file is part of PerconaFT.
5
6
7 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8
9 PerconaFT is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License, version 2,
11 as published by the Free Software Foundation.
12
13 PerconaFT is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
20
21 ----------------------------------------
22
23 PerconaFT is free software: you can redistribute it and/or modify
24 it under the terms of the GNU Affero General Public License, version 3,
25 as published by the Free Software Foundation.
26
27 PerconaFT is distributed in the hope that it will be useful,
28 but WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 GNU Affero General Public License for more details.
31
32 You should have received a copy of the GNU Affero General Public License
33 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
34 ======= */
35
36 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37
38 #include <my_global.h>
39 #include "ft/ft.h"
40 #include "ft/ft-internal.h"
41 #include "ft/serialize/ft_node-serialize.h"
42 #include "ft/node.h"
43 #include "ft/serialize/rbuf.h"
44 #include "ft/serialize/wbuf.h"
45 #include "util/scoped_malloc.h"
46 #include "util/sort.h"
47
48 // Effect: Fill in N as an empty ftnode.
49 // TODO: Rename toku_ftnode_create
50 void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
51 paranoid_invariant(layout_version != 0);
52 paranoid_invariant(height >= 0);
53
54 n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others
55 n->flags = flags;
56 n->blocknum = blocknum;
57 n->layout_version = layout_version;
58 n->layout_version_original = layout_version;
59 n->layout_version_read_from_disk = layout_version;
60 n->height = height;
61 n->pivotkeys.create_empty();
62 n->bp = 0;
63 n->n_children = num_children;
64 n->oldest_referenced_xid_known = TXNID_NONE;
65
66 if (num_children > 0) {
67 XMALLOC_N(num_children, n->bp);
68 for (int i = 0; i < num_children; i++) {
69 BP_BLOCKNUM(n,i).b=0;
70 BP_STATE(n,i) = PT_INVALID;
71 BP_WORKDONE(n,i) = 0;
72 BP_INIT_TOUCHED_CLOCK(n, i);
73 set_BNULL(n,i);
74 if (height > 0) {
75 set_BNC(n, i, toku_create_empty_nl());
76 } else {
77 set_BLB(n, i, toku_create_empty_bn());
78 }
79 }
80 }
81 n->set_dirty(); // special case exception, it's okay to mark as dirty because the basements are empty
82
83 toku_ft_status_note_ftnode(height, true);
84 }
85
86 // destroys the internals of the ftnode, but it does not free the values
87 // that are stored
88 // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
89 // MUST NOT do anything besides free the structures that have been allocated
90 void toku_destroy_ftnode_internals(FTNODE node) {
91 node->pivotkeys.destroy();
92 for (int i = 0; i < node->n_children; i++) {
93 if (BP_STATE(node,i) == PT_AVAIL) {
94 if (node->height > 0) {
95 destroy_nonleaf_childinfo(BNC(node,i));
96 } else {
97 paranoid_invariant(BLB_LRD(node, i) == 0);
98 destroy_basement_node(BLB(node, i));
99 }
100 } else if (BP_STATE(node,i) == PT_COMPRESSED) {
101 SUB_BLOCK sb = BSB(node,i);
102 toku_free(sb->compressed_ptr);
103 toku_free(sb);
104 } else {
105 paranoid_invariant(is_BNULL(node, i));
106 }
107 set_BNULL(node, i);
108 }
109 toku_free(node->bp);
110 node->bp = NULL;
111 }
112
113 /* Frees a node, including all the stuff in the hash table. */
114 void toku_ftnode_free(FTNODE *nodep) {
115 FTNODE node = *nodep;
116 toku_ft_status_note_ftnode(node->height, false);
117 toku_destroy_ftnode_internals(node);
118 toku_free(node);
119 *nodep = nullptr;
120 }
121
122 void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) {
123 STAT64INFO_S deltas = ZEROSTATS;
124 // capture deltas before rebalancing basements for serialization
125 deltas = toku_get_and_clear_basement_stats(ftnode);
126 // locking not necessary here with respect to checkpointing
127 // in Clayface (because of the pending lock and cachetable lock
128 // in toku_cachetable_begin_checkpoint)
129 // essentially, if we are dealing with a for_checkpoint
130 // parameter in a function that is called by the flush_callback,
131 // then the cachetable needs to ensure that this is called in a safe
132 // manner that does not interfere with the beginning
133 // of a checkpoint, which it does with the cachetable lock
134 // and pending lock
135 toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
136 if (for_checkpoint) {
137 toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
138 }
139 }
140
141 void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
142 for (int i = 0; i < node->n_children; i++) {
143 BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
144 paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
145 BP_STATE(cloned_node,i) = PT_AVAIL;
146 BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
147 if (node->height == 0) {
148 set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
149 } else {
150 set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
151 }
152 }
153 }
154
155 void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) {
156 // free the basement node
157 assert(!node->dirty());
158 BASEMENTNODE bn = BLB(node, childnum);
159 toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
160 toku_ft_adjust_logical_row_count(ft, -BLB_LRD(node, childnum));
161 BLB_LRD(node, childnum) = 0;
162 destroy_basement_node(bn);
163 set_BNULL(node, childnum);
164 BP_STATE(node, childnum) = PT_ON_DISK;
165 }
166
167 BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
168 assert(BP_STATE(node, childnum) == PT_AVAIL);
169 BASEMENTNODE bn = BLB(node, childnum);
170 set_BNULL(node, childnum);
171 BP_STATE(node, childnum) = PT_ON_DISK;
172 return bn;
173 }
174
175 //
176 // Orthopush
177 //
178
179 struct store_msg_buffer_offset_extra {
180 int32_t *offsets;
181 int i;
182 };
183
184 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
185 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
186 {
187 extra->offsets[extra->i] = offset;
188 extra->i++;
189 return 0;
190 }
191
192 /**
193 * Given pointers to offsets within a message buffer where we can find messages,
194 * figure out the MSN of each message, and compare those MSNs. Returns 1,
195 * 0, or -1 if a is larger than, equal to, or smaller than b.
196 */
197 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
198 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
199 {
200 MSN amsn, bmsn;
201 msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
202 msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
203 if (amsn.msn > bmsn.msn) {
204 return +1;
205 }
206 if (amsn.msn < bmsn.msn) {
207 return -1;
208 }
209 return 0;
210 }
211
212 /**
213 * Given a message buffer and and offset, apply the message with
214 * toku_ft_bn_apply_msg, or discard it,
215 * based on its MSN and the MSN of the basement node.
216 */
217 static void do_bn_apply_msg(
218 FT_HANDLE ft_handle,
219 BASEMENTNODE bn,
220 message_buffer* msg_buffer,
221 int32_t offset,
222 txn_gc_info* gc_info,
223 uint64_t* workdone,
224 STAT64INFO stats_to_update,
225 int64_t* logical_rows_delta) {
226
227 DBT k, v;
228 ft_msg msg = msg_buffer->get_message(offset, &k, &v);
229
230 // The messages are being iterated over in (key,msn) order or just in
231 // msn order, so all the messages for one key, from one buffer, are in
232 // ascending msn order. So it's ok that we don't update the basement
233 // node's msn until the end.
234 if (msg.msn().msn > bn->max_msn_applied.msn) {
235 toku_ft_bn_apply_msg(
236 ft_handle->ft->cmp,
237 ft_handle->ft->update_fun,
238 bn,
239 msg,
240 gc_info,
241 workdone,
242 stats_to_update,
243 logical_rows_delta);
244 } else {
245 toku_ft_status_note_msn_discard();
246 }
247
248 // We must always mark message as stale since it has been marked
249 // (using omt::iterate_and_mark_range)
250 // It is possible to call do_bn_apply_msg even when it won't apply the
251 // message because the node containing it could have been evicted and
252 // brought back in.
253 msg_buffer->set_freshness(offset, false);
254 }
255
256
257 struct iterate_do_bn_apply_msg_extra {
258 FT_HANDLE t;
259 BASEMENTNODE bn;
260 NONLEAF_CHILDINFO bnc;
261 txn_gc_info *gc_info;
262 uint64_t *workdone;
263 STAT64INFO stats_to_update;
264 int64_t *logical_rows_delta;
265 };
266
267 int iterate_do_bn_apply_msg(
268 const int32_t &offset,
269 const uint32_t UU(idx),
270 struct iterate_do_bn_apply_msg_extra* const e)
271 __attribute__((nonnull(3)));
272
273 int iterate_do_bn_apply_msg(
274 const int32_t &offset,
275 const uint32_t UU(idx),
276 struct iterate_do_bn_apply_msg_extra* const e)
277 {
278 do_bn_apply_msg(
279 e->t,
280 e->bn,
281 &e->bnc->msg_buffer,
282 offset,
283 e->gc_info,
284 e->workdone,
285 e->stats_to_update,
286 e->logical_rows_delta);
287 return 0;
288 }
289
290 /**
291 * Given the bounds of the basement node to which we will apply messages,
292 * find the indexes within message_tree which contain the range of
293 * relevant messages.
294 *
295 * The message tree contains offsets into the buffer, where messages are
296 * found. The pivot_bounds are the lower bound exclusive and upper bound
297 * inclusive, because they come from pivot keys in the tree. We want OMT
298 * indices, which must have the lower bound be inclusive and the upper
299 * bound exclusive. We will get these by telling omt::find to look
300 * for something strictly bigger than each of our pivot bounds.
301 *
302 * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
303 * bound exclusive).
304 */
305 template<typename find_bounds_omt_t>
306 static void
307 find_bounds_within_message_tree(
308 const toku::comparator &cmp,
309 const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices
310 message_buffer *msg_buffer, /// message buffer in which messages are found
311 const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to
312 uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
313 uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
314 )
315 {
316 int r = 0;
317
318 if (!toku_dbt_is_empty(bounds.lbe())) {
319 // By setting msn to MAX_MSN and by using direction of +1, we will
320 // get the first message greater than (in (key, msn) order) any
321 // message (with any msn) with the key lower_bound_exclusive.
322 // This will be a message we want to try applying, so it is the
323 // "lower bound inclusive" within the message_tree.
324 struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
325 int32_t found_lb;
326 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
327 if (r == DB_NOTFOUND) {
328 // There is no relevant data (the lower bound is bigger than
329 // any message in this tree), so we have no range and we're
330 // done.
331 *lbi = 0;
332 *ube = 0;
333 return;
334 }
335 if (!toku_dbt_is_empty(bounds.ubi())) {
336 // Check if what we found for lbi is greater than the upper
337 // bound inclusive that we have. If so, there are no relevant
338 // messages between these bounds.
339 const DBT *ubi = bounds.ubi();
340 const int32_t offset = found_lb;
341 DBT found_lbidbt;
342 msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
343 int c = cmp(&found_lbidbt, ubi);
344 // These DBTs really are both inclusive bounds, so we need
345 // strict inequality in order to determine that there's
346 // nothing between them. If they're equal, then we actually
347 // need to apply the message pointed to by lbi, and also
348 // anything with the same key but a bigger msn.
349 if (c > 0) {
350 *lbi = 0;
351 *ube = 0;
352 return;
353 }
354 }
355 } else {
356 // No lower bound given, it's negative infinity, so we start at
357 // the first message in the OMT.
358 *lbi = 0;
359 }
360 if (!toku_dbt_is_empty(bounds.ubi())) {
361 // Again, we use an msn of MAX_MSN and a direction of +1 to get
362 // the first thing bigger than the upper_bound_inclusive key.
363 // This is therefore the smallest thing we don't want to apply,
364 // and omt::iterate_on_range will not examine it.
365 struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
366 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
367 if (r == DB_NOTFOUND) {
368 // Couldn't find anything in the buffer bigger than our key,
369 // so we need to look at everything up to the end of
370 // message_tree.
371 *ube = message_tree.size();
372 }
373 } else {
374 // No upper bound given, it's positive infinity, so we need to go
375 // through the end of the OMT.
376 *ube = message_tree.size();
377 }
378 }
379
380 // For each message in the ancestor's buffer (determined by childnum) that
381 // is key-wise between lower_bound_exclusive and upper_bound_inclusive,
382 // apply the message to the basement node. We treat the bounds as minus
383 // or plus infinity respectively if they are NULL. Do not mark the node
384 // as dirty (preserve previous state of 'dirty' bit).
385 static void bnc_apply_messages_to_basement_node(
386 FT_HANDLE t, // used for comparison function
387 BASEMENTNODE bn, // where to apply messages
388 FTNODE ancestor, // the ancestor node where we can find messages to apply
389 int childnum, // which child buffer of ancestor contains messages we want
390 const pivot_bounds &
391 bounds, // contains pivot key bounds of this basement node
392 txn_gc_info *gc_info,
393 bool *msgs_applied) {
394 int r;
395 NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
396
397 // Determine the offsets in the message trees between which we need to
398 // apply messages from this buffer
399 STAT64INFO_S stats_delta = {0, 0};
400 uint64_t workdone_this_ancestor = 0;
401 int64_t logical_rows_delta = 0;
402
403 uint32_t stale_lbi, stale_ube;
404 if (!bn->stale_ancestor_messages_applied) {
405 find_bounds_within_message_tree(t->ft->cmp,
406 bnc->stale_message_tree,
407 &bnc->msg_buffer,
408 bounds,
409 &stale_lbi,
410 &stale_ube);
411 } else {
412 stale_lbi = 0;
413 stale_ube = 0;
414 }
415 uint32_t fresh_lbi, fresh_ube;
416 find_bounds_within_message_tree(t->ft->cmp,
417 bnc->fresh_message_tree,
418 &bnc->msg_buffer,
419 bounds,
420 &fresh_lbi,
421 &fresh_ube);
422
423 // We now know where all the messages we must apply are, so one of the
424 // following 4 cases will do the application, depending on which of
425 // the lists contains relevant messages:
426 //
427 // 1. broadcast messages and anything else, or a mix of fresh and stale
428 // 2. only fresh messages
429 // 3. only stale messages
430 if (bnc->broadcast_list.size() > 0 ||
431 (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
432 // We have messages in multiple trees, so we grab all
433 // the relevant messages' offsets and sort them by MSN, then apply
434 // them in MSN order.
435 const int buffer_size =
436 ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
437 bnc->broadcast_list.size());
438 toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
439 int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
440 struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
441 .i = 0};
442
443 // Populate offsets array with offsets to stale messages
444 r = bnc->stale_message_tree
445 .iterate_on_range<struct store_msg_buffer_offset_extra,
446 store_msg_buffer_offset>(
447 stale_lbi, stale_ube, &sfo_extra);
448 assert_zero(r);
449
450 // Then store fresh offsets, and mark them to be moved to stale later.
451 r = bnc->fresh_message_tree
452 .iterate_and_mark_range<struct store_msg_buffer_offset_extra,
453 store_msg_buffer_offset>(
454 fresh_lbi, fresh_ube, &sfo_extra);
455 assert_zero(r);
456
457 // Store offsets of all broadcast messages.
458 r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
459 store_msg_buffer_offset>(&sfo_extra);
460 assert_zero(r);
461 invariant(sfo_extra.i == buffer_size);
462
463 // Sort by MSN.
464 toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
465 mergesort_r(offsets, buffer_size, bnc->msg_buffer);
466
467 // Apply the messages in MSN order.
468 for (int i = 0; i < buffer_size; ++i) {
469 *msgs_applied = true;
470 do_bn_apply_msg(t,
471 bn,
472 &bnc->msg_buffer,
473 offsets[i],
474 gc_info,
475 &workdone_this_ancestor,
476 &stats_delta,
477 &logical_rows_delta);
478 }
479 } else if (stale_lbi == stale_ube) {
480 // No stale messages to apply, we just apply fresh messages, and mark
481 // them to be moved to stale later.
482 struct iterate_do_bn_apply_msg_extra iter_extra = {
483 .t = t,
484 .bn = bn,
485 .bnc = bnc,
486 .gc_info = gc_info,
487 .workdone = &workdone_this_ancestor,
488 .stats_to_update = &stats_delta,
489 .logical_rows_delta = &logical_rows_delta};
490 if (fresh_ube - fresh_lbi > 0)
491 *msgs_applied = true;
492 r = bnc->fresh_message_tree
493 .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
494 iterate_do_bn_apply_msg>(
495 fresh_lbi, fresh_ube, &iter_extra);
496 assert_zero(r);
497 } else {
498 invariant(fresh_lbi == fresh_ube);
499 // No fresh messages to apply, we just apply stale messages.
500
501 if (stale_ube - stale_lbi > 0)
502 *msgs_applied = true;
503 struct iterate_do_bn_apply_msg_extra iter_extra = {
504 .t = t,
505 .bn = bn,
506 .bnc = bnc,
507 .gc_info = gc_info,
508 .workdone = &workdone_this_ancestor,
509 .stats_to_update = &stats_delta,
510 .logical_rows_delta = &logical_rows_delta};
511
512 r = bnc->stale_message_tree
513 .iterate_on_range<struct iterate_do_bn_apply_msg_extra,
514 iterate_do_bn_apply_msg>(
515 stale_lbi, stale_ube, &iter_extra);
516 assert_zero(r);
517 }
518 //
519 // update stats
520 //
521 if (workdone_this_ancestor > 0) {
522 (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
523 workdone_this_ancestor);
524 }
525 if (stats_delta.numbytes || stats_delta.numrows) {
526 toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
527 }
528 toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
529 bn->logical_rows_delta += logical_rows_delta;
530 }
531
532 static void
533 apply_ancestors_messages_to_bn(
534 FT_HANDLE t,
535 FTNODE node,
536 int childnum,
537 ANCESTORS ancestors,
538 const pivot_bounds &bounds,
539 txn_gc_info *gc_info,
540 bool* msgs_applied
541 )
542 {
543 BASEMENTNODE curr_bn = BLB(node, childnum);
544 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
545 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
546 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
547 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
548 bnc_apply_messages_to_basement_node(
549 t,
550 curr_bn,
551 curr_ancestors->node,
552 curr_ancestors->childnum,
553 curr_bounds,
554 gc_info,
555 msgs_applied
556 );
557 // We don't want to check this ancestor node again if the
558 // next time we query it, the msn hasn't changed.
559 curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
560 }
561 }
562 // At this point, we know all the stale messages above this
563 // basement node have been applied, and any new messages will be
564 // fresh, so we don't need to look at stale messages for this
565 // basement node, unless it gets evicted (and this field becomes
566 // false when it's read in again).
567 curr_bn->stale_ancestor_messages_applied = true;
568 }
569
570 void
571 toku_apply_ancestors_messages_to_node (
572 FT_HANDLE t,
573 FTNODE node,
574 ANCESTORS ancestors,
575 const pivot_bounds &bounds,
576 bool* msgs_applied,
577 int child_to_read
578 )
579 // Effect:
580 // Bring a leaf node up-to-date according to all the messages in the ancestors.
581 // If the leaf node is already up-to-date then do nothing.
582 // If the leaf node is not already up-to-date, then record the work done
583 // for that leaf in each ancestor.
584 // Requires:
585 // This is being called when pinning a leaf node for the query path.
586 // The entire root-to-leaf path is pinned and appears in the ancestors list.
587 {
588 VERIFY_NODE(t, node);
589 paranoid_invariant(node->height == 0);
590
591 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
592 txn_manager_state txn_state_for_gc(txn_manager);
593
594 TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
595 txn_gc_info gc_info(&txn_state_for_gc,
596 oldest_referenced_xid_for_simple_gc,
597 node->oldest_referenced_xid_known,
598 true);
599 if (!node->dirty() && child_to_read >= 0) {
600 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
601 apply_ancestors_messages_to_bn(
602 t,
603 node,
604 child_to_read,
605 ancestors,
606 bounds,
607 &gc_info,
608 msgs_applied
609 );
610 }
611 else {
612 // know we are a leaf node
613 // An important invariant:
614 // We MUST bring every available basement node for a dirty node up to date.
615 // flushing on the cleaner thread depends on this. This invariant
616 // allows the cleaner thread to just pick an internal node and flush it
617 // as opposed to being forced to start from the root.
618 for (int i = 0; i < node->n_children; i++) {
619 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
620 apply_ancestors_messages_to_bn(
621 t,
622 node,
623 i,
624 ancestors,
625 bounds,
626 &gc_info,
627 msgs_applied
628 );
629 }
630 }
631 VERIFY_NODE(t, node);
632 }
633
634 static bool bn_needs_ancestors_messages(
635 FT ft,
636 FTNODE node,
637 int childnum,
638 const pivot_bounds &bounds,
639 ANCESTORS ancestors,
640 MSN* max_msn_applied
641 )
642 {
643 BASEMENTNODE bn = BLB(node, childnum);
644 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
645 bool needs_ancestors_messages = false;
646 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
647 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
648 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
649 NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
650 if (bnc->broadcast_list.size() > 0) {
651 needs_ancestors_messages = true;
652 goto cleanup;
653 }
654 if (!bn->stale_ancestor_messages_applied) {
655 uint32_t stale_lbi, stale_ube;
656 find_bounds_within_message_tree(ft->cmp,
657 bnc->stale_message_tree,
658 &bnc->msg_buffer,
659 curr_bounds,
660 &stale_lbi,
661 &stale_ube);
662 if (stale_lbi < stale_ube) {
663 needs_ancestors_messages = true;
664 goto cleanup;
665 }
666 }
667 uint32_t fresh_lbi, fresh_ube;
668 find_bounds_within_message_tree(ft->cmp,
669 bnc->fresh_message_tree,
670 &bnc->msg_buffer,
671 curr_bounds,
672 &fresh_lbi,
673 &fresh_ube);
674 if (fresh_lbi < fresh_ube) {
675 needs_ancestors_messages = true;
676 goto cleanup;
677 }
678 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
679 max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
680 }
681 }
682 }
683 cleanup:
684 return needs_ancestors_messages;
685 }
686
687 bool toku_ft_leaf_needs_ancestors_messages(
688 FT ft,
689 FTNODE node,
690 ANCESTORS ancestors,
691 const pivot_bounds &bounds,
692 MSN *const max_msn_in_path,
693 int child_to_read
694 )
695 // Effect: Determine whether there are messages in a node's ancestors
696 // which must be applied to it. These messages are in the correct
697 // keyrange for any available basement nodes, and are in nodes with the
698 // correct max_msn_applied_to_node_on_disk.
699 // Notes:
700 // This is an approximate query.
701 // Output:
702 // max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
703 // ancestors. This is used later to update basement nodes'
704 // max_msn_applied values in case we don't do the full algorithm.
705 // Returns:
706 // true if there may be some such messages
707 // false only if there are definitely no such messages
708 // Rationale:
709 // When we pin a node with a read lock, we want to quickly determine if
710 // we should exchange it for a write lock in preparation for applying
711 // messages. If there are no messages, we don't need the write lock.
712 {
713 paranoid_invariant(node->height == 0);
714 bool needs_ancestors_messages = false;
715 // child_to_read may be -1 in test cases
716 if (!node->dirty() && child_to_read >= 0) {
717 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
718 needs_ancestors_messages = bn_needs_ancestors_messages(
719 ft,
720 node,
721 child_to_read,
722 bounds,
723 ancestors,
724 max_msn_in_path
725 );
726 }
727 else {
728 for (int i = 0; i < node->n_children; ++i) {
729 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
730 needs_ancestors_messages = bn_needs_ancestors_messages(
731 ft,
732 node,
733 i,
734 bounds,
735 ancestors,
736 max_msn_in_path
737 );
738 if (needs_ancestors_messages) {
739 goto cleanup;
740 }
741 }
742 }
743 cleanup:
744 return needs_ancestors_messages;
745 }
746
747 void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
748 invariant(node->height == 0);
749 if (!node->dirty() && child_to_read >= 0) {
750 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
751 BASEMENTNODE bn = BLB(node, child_to_read);
752 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
753 // see comment below
754 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
755 }
756 }
757 else {
758 for (int i = 0; i < node->n_children; ++i) {
759 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
760 BASEMENTNODE bn = BLB(node, i);
761 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
762 // This function runs in a shared access context, so to silence tools
763 // like DRD, we use a CAS and ignore the result.
764 // Any threads trying to update these basement nodes should be
765 // updating them to the same thing (since they all have a read lock on
766 // the same root-to-leaf path) so this is safe.
767 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
768 }
769 }
770 }
771 }
772
773 struct copy_to_stale_extra {
774 FT ft;
775 NONLEAF_CHILDINFO bnc;
776 };
777
778 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
779 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
780 {
781 MSN msn;
782 DBT key;
783 extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
784 struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra(extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn);
785 int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
786 invariant_zero(r);
787 return 0;
788 }
789
790 void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) {
791 struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
792 int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
793 invariant_zero(r);
794 bnc->fresh_message_tree.delete_all_marked();
795 }
796
797 void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
798 invariant(node->height > 0);
799 for (int i = 0; i < node->n_children; ++i) {
800 if (BP_STATE(node, i) != PT_AVAIL) {
801 continue;
802 }
803 NONLEAF_CHILDINFO bnc = BNC(node, i);
804 // We can't delete things out of the fresh tree inside the above
805 // procedures because we're still looking at the fresh tree. Instead
806 // we have to move messages after we're done looking at it.
807 toku_ft_bnc_move_messages_to_stale(ft, bnc);
808 }
809 }
810
811 //
812 // Balance // Availibility // Size
813
814 struct rebalance_array_info {
815 uint32_t offset;
816 LEAFENTRY *le_array;
817 uint32_t *key_sizes_array;
818 const void **key_ptr_array;
819 static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le,
820 const uint32_t idx, struct rebalance_array_info *const ai) {
821 ai->le_array[idx+ai->offset] = le;
822 ai->key_sizes_array[idx+ai->offset] = keylen;
823 ai->key_ptr_array[idx+ai->offset] = key;
824 return 0;
825 }
826 };
827
828 // There must still be at least one child
829 // Requires that all messages in buffers above have been applied.
830 // Because all messages above have been applied, setting msn of all new basements
831 // to max msn of existing basements is correct. (There cannot be any messages in
832 // buffers above that still need to be applied.)
833 void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) {
834
835 assert(node->height == 0);
836 assert(node->dirty());
837
838 uint32_t num_orig_basements = node->n_children;
839 // Count number of leaf entries in this leaf (num_le).
840 uint32_t num_le = 0;
841 for (uint32_t i = 0; i < num_orig_basements; i++) {
842 num_le += BLB_DATA(node, i)->num_klpairs();
843 }
844
845 uint32_t num_alloc = num_le ? num_le : 1; // simplify logic below by always having at least one entry per array
846
847 // Create an array of OMTVALUE's that store all the pointers to all the data.
848 // Each element in leafpointers is a pointer to a leaf.
849 toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc);
850 LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get());
851 leafpointers[0] = NULL;
852
853 toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc);
854 const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get());
855 key_pointers[0] = NULL;
856
857 toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc);
858 uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get());
859
860 // Capture pointers to old mempools' buffers (so they can be destroyed)
861 toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements);
862 BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get());
863 old_bns[0] = NULL;
864
865 uint32_t curr_le = 0;
866 for (uint32_t i = 0; i < num_orig_basements; i++) {
867 bn_data* bd = BLB_DATA(node, i);
868 struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers };
869 bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai);
870 curr_le += bd->num_klpairs();
871 }
872
873 // Create an array that will store indexes of new pivots.
874 // Each element in new_pivots is the index of a pivot key.
875 // (Allocating num_le of them is overkill, but num_le is an upper bound.)
876 toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc);
877 uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get());
878 new_pivots[0] = 0;
879
880 // Each element in le_sizes is the size of the leafentry pointed to by leafpointers.
881 toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc);
882 size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get());
883 le_sizes[0] = 0;
884
885 // Create an array that will store the size of each basement.
886 // This is the sum of the leaf sizes of all the leaves in that basement.
887 // We don't know how many basements there will be, so we use num_le as the upper bound.
888
889 // Sum of all le sizes in a single basement
890 toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc);
891 size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get());
892
893 // Sum of all key sizes in a single basement
894 toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc);
895 size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get());
896
897 // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les).
898 // Each entry is the number of leafentries in this basement. (Again, num_le is overkill upper baound.)
899 toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc);
900 uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get());
901 num_les_this_bn[0] = 0;
902
903 // Figure out the new pivots.
904 // We need the index of each pivot, and for each basement we need
905 // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement).
906 uint32_t curr_pivot = 0;
907 uint32_t num_le_in_curr_bn = 0;
908 uint32_t bn_size_so_far = 0;
909 for (uint32_t i = 0; i < num_le; i++) {
910 uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]);
911 le_sizes[i] = curr_le_size;
912 if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) {
913 // cap off the current basement node to end with the element before i
914 new_pivots[curr_pivot] = i-1;
915 curr_pivot++;
916 num_le_in_curr_bn = 0;
917 bn_size_so_far = 0;
918 }
919 num_le_in_curr_bn++;
920 num_les_this_bn[curr_pivot] = num_le_in_curr_bn;
921 bn_le_sizes[curr_pivot] += curr_le_size;
922 bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i]; // uint32_t le_offset
923 bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i];
924 }
925 // curr_pivot is now the total number of pivot keys in the leaf node
926 int num_pivots = curr_pivot;
927 int num_children = num_pivots + 1;
928
929 // now we need to fill in the new basement nodes and pivots
930
931 // TODO: (Zardosht) this is an ugly thing right now
932 // Need to figure out how to properly deal with seqinsert.
933 // I am not happy with how this is being
934 // handled with basement nodes
935 uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1);
936
937 // choose the max msn applied to any basement as the max msn applied to all new basements
938 MSN max_msn = ZERO_MSN;
939 for (uint32_t i = 0; i < num_orig_basements; i++) {
940 MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
941 max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
942 }
943 // remove the basement node in the node, we've saved a copy
944 for (uint32_t i = 0; i < num_orig_basements; i++) {
945 // save a reference to the old basement nodes
946 // we will need them to ensure that the memory
947 // stays intact
948 old_bns[i] = toku_detach_bn(node, i);
949 }
950 // Now destroy the old basements, but do not destroy leaves
951 toku_destroy_ftnode_internals(node);
952
953 // now reallocate pieces and start filling them in
954 invariant(num_children > 0);
955
956 node->n_children = num_children;
957 XCALLOC_N(num_children, node->bp); // allocate pointers to basements (bp)
958 for (int i = 0; i < num_children; i++) {
959 set_BLB(node, i, toku_create_empty_bn()); // allocate empty basements and set bp pointers
960 }
961
962 // now we start to fill in the data
963
964 // first the pivots
965 toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT));
966 DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get());
967 for (int i = 0; i < num_pivots; i++) {
968 uint32_t size = key_sizes[new_pivots[i]];
969 const void *key = key_pointers[new_pivots[i]];
970 toku_fill_dbt(&pivotkeys[i], key, size);
971 }
972 node->pivotkeys.create_from_dbts(pivotkeys, num_pivots);
973
974 uint32_t baseindex_this_bn = 0;
975 // now the basement nodes
976 for (int i = 0; i < num_children; i++) {
977 // put back seqinsert
978 BLB_SEQINSERT(node, i) = tmp_seqinsert;
979
980 // create start (inclusive) and end (exclusive) boundaries for data of basement node
981 uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1; // index of first leaf in basement
982 uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1; // index of first leaf in next basement
983 uint32_t num_in_bn = curr_end - curr_start; // number of leaves in this basement
984
985 // create indexes for new basement
986 invariant(baseindex_this_bn == curr_start);
987 uint32_t num_les_to_copy = num_les_this_bn[i];
988 invariant(num_les_to_copy == num_in_bn);
989
990 bn_data* bd = BLB_DATA(node, i);
991 bd->set_contents_as_clone_of_sorted_array(
992 num_les_to_copy,
993 &key_pointers[baseindex_this_bn],
994 &key_sizes[baseindex_this_bn],
995 &leafpointers[baseindex_this_bn],
996 &le_sizes[baseindex_this_bn],
997 bn_key_sizes[i], // Total key sizes
998 bn_le_sizes[i] // total le sizes
999 );
1000
1001 BP_STATE(node,i) = PT_AVAIL;
1002 BP_TOUCH_CLOCK(node,i);
1003 BLB_MAX_MSN_APPLIED(node,i) = max_msn;
1004 baseindex_this_bn += num_les_to_copy; // set to index of next bn
1005 }
1006 node->max_msn_applied_to_node_on_disk = max_msn;
1007
1008 // destroy buffers of old mempools
1009 for (uint32_t i = 0; i < num_orig_basements; i++) {
1010 destroy_basement_node(old_bns[i]);
1011 }
1012 }
1013
1014 bool toku_ftnode_fully_in_memory(FTNODE node) {
1015 for (int i = 0; i < node->n_children; i++) {
1016 if (BP_STATE(node,i) != PT_AVAIL) {
1017 return false;
1018 }
1019 }
1020 return true;
1021 }
1022
1023 void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) {
1024 paranoid_invariant(toku_ftnode_fully_in_memory(node));
1025 }
1026
1027 uint32_t toku_ftnode_leaf_num_entries(FTNODE node) {
1028 toku_ftnode_assert_fully_in_memory(node);
1029 uint32_t num_entries = 0;
1030 for (int i = 0; i < node->n_children; i++) {
1031 num_entries += BLB_DATA(node, i)->num_klpairs();
1032 }
1033 return num_entries;
1034 }
1035
1036 enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) {
1037 enum reactivity re = RE_STABLE;
1038 toku_ftnode_assert_fully_in_memory(node);
1039 paranoid_invariant(node->height==0);
1040 unsigned int size = toku_serialize_ftnode_size(node);
1041 if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) {
1042 re = RE_FISSIBLE;
1043 } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
1044 re = RE_FUSIBLE;
1045 }
1046 return re;
1047 }
1048
1049 enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) {
1050 paranoid_invariant(node->height > 0);
1051 int n_children = node->n_children;
1052 if (n_children > (int) fanout) {
1053 return RE_FISSIBLE;
1054 }
1055 if (n_children * 4 < (int) fanout) {
1056 return RE_FUSIBLE;
1057 }
1058 return RE_STABLE;
1059 }
1060
1061 enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) {
1062 toku_ftnode_assert_fully_in_memory(node);
1063 if (node->height == 0) {
1064 return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize);
1065 } else {
1066 return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout);
1067 }
1068 }
1069
1070 unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) {
1071 return bnc->msg_buffer.buffer_size_in_use();
1072 }
1073
1074 // Return true if the size of the buffers plus the amount of work done is large enough.
1075 // Return false if there is nothing to be flushed (the buffers empty).
1076 bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) {
1077 uint64_t size = toku_serialize_ftnode_size(node);
1078
1079 bool buffers_are_empty = true;
1080 toku_ftnode_assert_fully_in_memory(node);
1081 //
1082 // the nonleaf node is gorged if the following holds true:
1083 // - the buffers are non-empty
1084 // - the total workdone by the buffers PLUS the size of the buffers
1085 // is greater than nodesize (which as of Maxwell should be
1086 // 4MB)
1087 //
1088 paranoid_invariant(node->height > 0);
1089 for (int child = 0; child < node->n_children; ++child) {
1090 size += BP_WORKDONE(node, child);
1091 }
1092 for (int child = 0; child < node->n_children; ++child) {
1093 if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
1094 buffers_are_empty = false;
1095 break;
1096 }
1097 }
1098 return ((size > nodesize)
1099 &&
1100 (!buffers_are_empty));
1101 }
1102
1103 int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) {
1104 return bnc->msg_buffer.num_entries();
1105 }
1106
1107 // how much memory does this child buffer consume?
1108 long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) {
1109 return (sizeof(*bnc) +
1110 bnc->msg_buffer.memory_footprint() +
1111 bnc->fresh_message_tree.memory_size() +
1112 bnc->stale_message_tree.memory_size() +
1113 bnc->broadcast_list.memory_size());
1114 }
1115
1116 // how much memory in this child buffer holds useful data?
1117 // originally created solely for use by test program(s).
1118 long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) {
1119 return (sizeof(*bnc) +
1120 bnc->msg_buffer.memory_size_in_use() +
1121 bnc->fresh_message_tree.memory_size() +
1122 bnc->stale_message_tree.memory_size() +
1123 bnc->broadcast_list.memory_size());
1124 }
1125
1126 //
1127 // Garbage collection
1128 // Message injection
1129 // Message application
1130 //
1131
1132 // Used only by test programs: append a child node to a parent node
1133 void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1134 int childnum = node->n_children;
1135 node->n_children++;
1136 REALLOC_N(node->n_children, node->bp);
1137 BP_BLOCKNUM(node,childnum) = child->blocknum;
1138 BP_STATE(node,childnum) = PT_AVAIL;
1139 BP_WORKDONE(node, childnum) = 0;
1140 set_BNC(node, childnum, toku_create_empty_nl());
1141 if (pivotkey) {
1142 invariant(childnum > 0);
1143 node->pivotkeys.insert_at(pivotkey, childnum - 1);
1144 }
1145 node->set_dirty();
1146 }
1147
1148 void
1149 toku_ft_bn_apply_msg_once (
1150 BASEMENTNODE bn,
1151 const ft_msg &msg,
1152 uint32_t idx,
1153 uint32_t le_keylen,
1154 LEAFENTRY le,
1155 txn_gc_info *gc_info,
1156 uint64_t *workdone,
1157 STAT64INFO stats_to_update,
1158 int64_t *logical_rows_delta
1159 )
1160 // Effect: Apply msg to leafentry (msn is ignored)
1161 // Calculate work done by message on leafentry and add it to caller's workdone counter.
1162 // idx is the location where it goes
1163 // le is old leafentry
1164 {
1165 size_t newsize=0, oldsize=0, workdone_this_le=0;
1166 LEAFENTRY new_le=0;
1167 // how many bytes of user data (not including overhead) were added or
1168 // deleted from this row
1169 int64_t numbytes_delta = 0;
1170 // will be +1 or -1 or 0 (if row was added or deleted or not)
1171 int64_t numrows_delta = 0;
1172 // will be +1, -1 or 0 if a message that was accounted for logically has
1173 // changed in meaning such as an insert changed to an update or a delete
1174 // changed to a noop
1175 int64_t logical_rows_delta_le = 0;
1176 uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
1177 if (le) {
1178 oldsize = leafentry_memsize(le) + key_storage_size;
1179 }
1180
1181 // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
1182 // to allocate more space. That means le is guaranteed to not cause a
1183 // sigsegv but it may point to a mempool that is no longer in use.
1184 // We'll have to release the old mempool later.
1185 logical_rows_delta_le = toku_le_apply_msg(
1186 msg,
1187 le,
1188 &bn->data_buffer,
1189 idx,
1190 le_keylen,
1191 gc_info,
1192 &new_le,
1193 &numbytes_delta);
1194
1195 // at this point, we cannot trust cmd->u.id.key to be valid.
1196 // The dmt may have realloced its mempool and freed the one containing key.
1197
1198 newsize = new_le ? (leafentry_memsize(new_le) + + key_storage_size) : 0;
1199 if (le && new_le) {
1200 workdone_this_le = (oldsize > newsize ? oldsize : newsize); // work done is max of le size before and after message application
1201
1202 } else { // we did not just replace a row, so ...
1203 if (le) {
1204 // ... we just deleted a row ...
1205 workdone_this_le = oldsize;
1206 numrows_delta = -1;
1207 }
1208 if (new_le) {
1209 // ... or we just added a row
1210 workdone_this_le = newsize;
1211 numrows_delta = 1;
1212 }
1213 }
1214 if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL
1215 *workdone += workdone_this_le;
1216 }
1217
1218 if (FT_LIKELY(logical_rows_delta != NULL)) {
1219 *logical_rows_delta += logical_rows_delta_le;
1220 }
1221 // now update stat64 statistics
1222 bn->stat64_delta.numrows += numrows_delta;
1223 bn->stat64_delta.numbytes += numbytes_delta;
1224 // the only reason stats_to_update may be null is for tests
1225 if (FT_LIKELY(stats_to_update != NULL)) {
1226 stats_to_update->numrows += numrows_delta;
1227 stats_to_update->numbytes += numbytes_delta;
1228 }
1229 }
1230
1231 static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in.
1232 struct setval_extra_s {
1233 uint32_t tag;
1234 bool did_set_val;
1235 // any error code that setval_fun wants to return goes here.
1236 int setval_r;
1237 // need arguments for toku_ft_bn_apply_msg_once
1238 BASEMENTNODE bn;
1239 // captured from original message, not currently used
1240 MSN msn;
1241 XIDS xids;
1242 const DBT* key;
1243 uint32_t idx;
1244 uint32_t le_keylen;
1245 LEAFENTRY le;
1246 txn_gc_info* gc_info;
1247 uint64_t* workdone; // set by toku_ft_bn_apply_msg_once()
1248 STAT64INFO stats_to_update;
1249 int64_t* logical_rows_delta;
1250 };
1251
1252 /*
1253 * If new_val == NULL, we send a delete message instead of an insert.
1254 * This happens here instead of in do_delete() for consistency.
1255 * setval_fun() is called from handlerton, passing in svextra_v
1256 * from setval_extra_s input arg to ft->update_fun().
1257 */
1258 static void setval_fun (const DBT *new_val, void *svextra_v) {
1259 struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1260 paranoid_invariant(svextra->tag==setval_tag);
1261 paranoid_invariant(!svextra->did_set_val);
1262 svextra->did_set_val = true;
1263
1264 {
1265 // can't leave scope until toku_ft_bn_apply_msg_once if
1266 // this is a delete
1267 DBT val;
1268 ft_msg msg(
1269 svextra->key,
1270 new_val ? new_val : toku_init_dbt(&val),
1271 new_val ? FT_INSERT : FT_DELETE_ANY,
1272 svextra->msn,
1273 svextra->xids);
1274 toku_ft_bn_apply_msg_once(
1275 svextra->bn,
1276 msg,
1277 svextra->idx,
1278 svextra->le_keylen,
1279 svextra->le,
1280 svextra->gc_info,
1281 svextra->workdone,
1282 svextra->stats_to_update,
1283 svextra->logical_rows_delta);
1284 svextra->setval_r = 0;
1285 }
1286 }
1287
1288 // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
1289 // do_update()), so capturing the msn in the setval_extra_s is not strictly
1290 // required. The alternative would be to put a dummy msn in the messages
1291 // created by setval_fun(), but preserving the original msn seems cleaner and
1292 // it preserves accountability at a lower layer.
1293 static int do_update(
1294 ft_update_func update_fun,
1295 const DESCRIPTOR_S* desc,
1296 BASEMENTNODE bn,
1297 const ft_msg &msg,
1298 uint32_t idx,
1299 LEAFENTRY le,
1300 void* keydata,
1301 uint32_t keylen,
1302 txn_gc_info* gc_info,
1303 uint64_t* workdone,
1304 STAT64INFO stats_to_update,
1305 int64_t* logical_rows_delta) {
1306
1307 LEAFENTRY le_for_update;
1308 DBT key;
1309 const DBT *keyp;
1310 const DBT *update_function_extra;
1311 DBT vdbt;
1312 const DBT *vdbtp;
1313
1314 // the location of data depends whether this is a regular or
1315 // broadcast update
1316 if (msg.type() == FT_UPDATE) {
1317 // key is passed in with command (should be same as from le)
1318 // update function extra is passed in with command
1319 keyp = msg.kdbt();
1320 update_function_extra = msg.vdbt();
1321 } else {
1322 invariant(msg.type() == FT_UPDATE_BROADCAST_ALL);
1323 // key is not passed in with broadcast, it comes from le
1324 // update function extra is passed in with command
1325 paranoid_invariant(le); // for broadcast updates, we just hit all leafentries
1326 // so this cannot be null
1327 paranoid_invariant(keydata);
1328 paranoid_invariant(keylen);
1329 paranoid_invariant(msg.kdbt()->size == 0);
1330 keyp = toku_fill_dbt(&key, keydata, keylen);
1331 update_function_extra = msg.vdbt();
1332 }
1333 toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL);
1334
1335 if (le && !le_latest_is_del(le)) {
1336 // if the latest val exists, use it, and we'll use the leafentry later
1337 uint32_t vallen;
1338 void *valp = le_latest_val_and_len(le, &vallen);
1339 vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
1340 } else {
1341 // otherwise, the val and leafentry are both going to be null
1342 vdbtp = NULL;
1343 }
1344 le_for_update = le;
1345
1346 struct setval_extra_s setval_extra = {
1347 setval_tag,
1348 false,
1349 0,
1350 bn,
1351 msg.msn(),
1352 msg.xids(),
1353 keyp,
1354 idx,
1355 keylen,
1356 le_for_update,
1357 gc_info,
1358 workdone,
1359 stats_to_update,
1360 logical_rows_delta
1361 };
1362 // call handlerton's ft->update_fun(), which passes setval_extra
1363 // to setval_fun()
1364 FAKE_DB(db, desc);
1365 int r = update_fun(
1366 &db,
1367 keyp,
1368 vdbtp,
1369 update_function_extra,
1370 setval_fun,
1371 &setval_extra);
1372
1373 if (r == 0) { r = setval_extra.setval_r; }
1374 return r;
1375 }
1376
1377 // Should be renamed as something like "apply_msg_to_basement()."
1378 void toku_ft_bn_apply_msg(
1379 const toku::comparator& cmp,
1380 ft_update_func update_fun,
1381 BASEMENTNODE bn,
1382 const ft_msg& msg,
1383 txn_gc_info* gc_info,
1384 uint64_t* workdone,
1385 STAT64INFO stats_to_update,
1386 int64_t* logical_rows_delta) {
1387 // Effect:
1388 // Put a msg into a leaf.
1389 // Calculate work done by message on leafnode and add it to caller's
1390 // workdone counter.
1391 // The leaf could end up "too big" or "too small". The caller must fix that up.
1392 LEAFENTRY storeddata;
1393 void* key = NULL;
1394 uint32_t keylen = 0;
1395
1396 uint32_t num_klpairs;
1397 int r;
1398 struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt());
1399
1400 unsigned int doing_seqinsert = bn->seqinsert;
1401 bn->seqinsert = 0;
1402
1403 switch (msg.type()) {
1404 case FT_INSERT_NO_OVERWRITE:
1405 case FT_INSERT: {
1406 uint32_t idx;
1407 if (doing_seqinsert) {
1408 idx = bn->data_buffer.num_klpairs();
1409 DBT kdbt;
1410 r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data);
1411 if (r != 0) goto fz;
1412 int c = toku_msg_leafval_heaviside(kdbt, be);
1413 if (c >= 0) goto fz;
1414 r = DB_NOTFOUND;
1415 } else {
1416 fz:
1417 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1418 be,
1419 &storeddata,
1420 &key,
1421 &keylen,
1422 &idx
1423 );
1424 }
1425 if (r==DB_NOTFOUND) {
1426 storeddata = 0;
1427 } else {
1428 assert_zero(r);
1429 }
1430 toku_ft_bn_apply_msg_once(
1431 bn,
1432 msg,
1433 idx,
1434 keylen,
1435 storeddata,
1436 gc_info,
1437 workdone,
1438 stats_to_update,
1439 logical_rows_delta);
1440
1441 // if the insertion point is within a window of the right edge of
1442 // the leaf then it is sequential
1443 // window = min(32, number of leaf entries/16)
1444 {
1445 uint32_t s = bn->data_buffer.num_klpairs();
1446 uint32_t w = s / 16;
1447 if (w == 0) w = 1;
1448 if (w > 32) w = 32;
1449
1450 // within the window?
1451 if (s - idx <= w)
1452 bn->seqinsert = doing_seqinsert + 1;
1453 }
1454 break;
1455 }
1456 case FT_DELETE_ANY:
1457 case FT_ABORT_ANY:
1458 case FT_COMMIT_ANY: {
1459 uint32_t idx;
1460 // Apply to all the matches
1461
1462 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1463 be,
1464 &storeddata,
1465 &key,
1466 &keylen,
1467 &idx);
1468 if (r == DB_NOTFOUND) break;
1469 assert_zero(r);
1470 toku_ft_bn_apply_msg_once(
1471 bn,
1472 msg,
1473 idx,
1474 keylen,
1475 storeddata,
1476 gc_info,
1477 workdone,
1478 stats_to_update,
1479 logical_rows_delta);
1480 break;
1481 }
1482 case FT_OPTIMIZE_FOR_UPGRADE:
1483 // fall through so that optimize_for_upgrade performs rest of the optimize logic
1484 case FT_COMMIT_BROADCAST_ALL:
1485 case FT_OPTIMIZE:
1486 // Apply to all leafentries
1487 num_klpairs = bn->data_buffer.num_klpairs();
1488 for (uint32_t idx = 0; idx < num_klpairs; ) {
1489 void* curr_keyp = NULL;
1490 uint32_t curr_keylen = 0;
1491 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1492 assert_zero(r);
1493 int deleted = 0;
1494 if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1495 // message application code needs a key in order to determine
1496 // how much work was done by this message. since this is a
1497 // broadcast message, we have to create a new message whose
1498 // key is the current le's key.
1499 DBT curr_keydbt;
1500 ft_msg curr_msg(
1501 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1502 msg.vdbt(),
1503 msg.type(),
1504 msg.msn(),
1505 msg.xids());
1506 toku_ft_bn_apply_msg_once(
1507 bn,
1508 curr_msg,
1509 idx,
1510 curr_keylen,
1511 storeddata,
1512 gc_info,
1513 workdone,
1514 stats_to_update,
1515 logical_rows_delta);
1516 // at this point, we cannot trust msg.kdbt to be valid.
1517 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1518 if (new_dmt_size != num_klpairs) {
1519 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1520 //Item was deleted.
1521 deleted = 1;
1522 }
1523 }
1524 if (deleted)
1525 num_klpairs--;
1526 else
1527 idx++;
1528 }
1529 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1530
1531 break;
1532 case FT_COMMIT_BROADCAST_TXN:
1533 case FT_ABORT_BROADCAST_TXN:
1534 // Apply to all leafentries if txn is represented
1535 num_klpairs = bn->data_buffer.num_klpairs();
1536 for (uint32_t idx = 0; idx < num_klpairs; ) {
1537 void* curr_keyp = NULL;
1538 uint32_t curr_keylen = 0;
1539 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1540 assert_zero(r);
1541 int deleted = 0;
1542 if (le_has_xids(storeddata, msg.xids())) {
1543 // message application code needs a key in order to determine
1544 // how much work was done by this message. since this is a
1545 // broadcast message, we have to create a new message whose key
1546 // is the current le's key.
1547 DBT curr_keydbt;
1548 ft_msg curr_msg(
1549 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1550 msg.vdbt(),
1551 msg.type(),
1552 msg.msn(),
1553 msg.xids());
1554 toku_ft_bn_apply_msg_once(
1555 bn,
1556 curr_msg,
1557 idx,
1558 curr_keylen,
1559 storeddata,
1560 gc_info,
1561 workdone,
1562 stats_to_update,
1563 logical_rows_delta);
1564 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1565 if (new_dmt_size != num_klpairs) {
1566 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1567 //Item was deleted.
1568 deleted = 1;
1569 }
1570 }
1571 if (deleted)
1572 num_klpairs--;
1573 else
1574 idx++;
1575 }
1576 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1577
1578 break;
1579 case FT_UPDATE: {
1580 uint32_t idx;
1581 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1582 be,
1583 &storeddata,
1584 &key,
1585 &keylen,
1586 &idx
1587 );
1588 if (r==DB_NOTFOUND) {
1589 {
1590 //Point to msg's copy of the key so we don't worry about le being freed
1591 //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled
1592 key = msg.kdbt()->data;
1593 keylen = msg.kdbt()->size;
1594 }
1595 r = do_update(
1596 update_fun,
1597 cmp.get_descriptor(),
1598 bn,
1599 msg,
1600 idx,
1601 NULL,
1602 NULL,
1603 0,
1604 gc_info,
1605 workdone,
1606 stats_to_update,
1607 logical_rows_delta);
1608 } else if (r==0) {
1609 r = do_update(
1610 update_fun,
1611 cmp.get_descriptor(),
1612 bn,
1613 msg,
1614 idx,
1615 storeddata,
1616 key,
1617 keylen,
1618 gc_info,
1619 workdone,
1620 stats_to_update,
1621 logical_rows_delta);
1622 } // otherwise, a worse error, just return it
1623 break;
1624 }
1625 case FT_UPDATE_BROADCAST_ALL: {
1626 // apply to all leafentries.
1627 uint32_t idx = 0;
1628 uint32_t num_leafentries_before;
1629 // This is used to avoid having the logical row count changed on apply
1630 // of this message since it will return a negative number of the number
1631 // of leaf entries visited and cause the ft header value to go to 0;
1632 // This message will not change the number of rows, so just use the
1633 // bogus value.
1634 int64_t temp_logical_rows_delta = 0;
1635 while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1636 void* curr_key = nullptr;
1637 uint32_t curr_keylen = 0;
1638 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key);
1639 assert_zero(r);
1640
1641 //TODO: 46 replace this with something better than cloning key
1642 // TODO: (Zardosht) This may be unnecessary now, due to how the key
1643 // is handled in the bndata. Investigate and determine
1644 char clone_mem[curr_keylen]; // only lasts one loop, alloca would overflow (end of function)
1645 memcpy((void*)clone_mem, curr_key, curr_keylen);
1646 curr_key = (void*)clone_mem;
1647
1648 // This is broken below. Have a compilation error checked
1649 // in as a reminder
1650 r = do_update(
1651 update_fun,
1652 cmp.get_descriptor(),
1653 bn,
1654 msg,
1655 idx,
1656 storeddata,
1657 curr_key,
1658 curr_keylen,
1659 gc_info,
1660 workdone,
1661 stats_to_update,
1662 &temp_logical_rows_delta);
1663 assert_zero(r);
1664
1665 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1666 // we didn't delete something, so increment the index.
1667 idx++;
1668 }
1669 }
1670 break;
1671 }
1672 case FT_NONE: break; // don't do anything
1673 }
1674
1675 return;
1676 }
1677
1678 static inline int
1679 key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) {
1680 int r = cmp(a, b);
1681 if (r == 0) {
1682 if (amsn.msn > bmsn.msn) {
1683 r = +1;
1684 } else if (amsn.msn < bmsn.msn) {
1685 r = -1;
1686 } else {
1687 r = 0;
1688 }
1689 }
1690 return r;
1691 }
1692
1693 int toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra) {
1694 MSN query_msn;
1695 DBT query_key;
1696 extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
1697 return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp);
1698 }
1699
1700 int toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) {
1701 MSN amsn, bmsn;
1702 DBT akey, bkey;
1703 extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
1704 extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
1705 return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp);
1706 }
1707
1708 // Effect: Enqueue the message represented by the parameters into the
1709 // bnc's buffer, and put it in either the fresh or stale message tree,
1710 // or the broadcast list.
1711 static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) {
1712 int r = 0;
1713 int32_t offset;
1714 bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
1715 enum ft_msg_type type = msg.type();
1716 if (ft_msg_type_applies_once(type)) {
1717 DBT key;
1718 toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size);
1719 struct toku_msg_buffer_key_msn_heaviside_extra extra(cmp, &bnc->msg_buffer, &key, msg.msn());
1720 if (is_fresh) {
1721 r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1722 assert_zero(r);
1723 } else {
1724 r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1725 assert_zero(r);
1726 }
1727 } else {
1728 invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
1729 const uint32_t idx = bnc->broadcast_list.size();
1730 r = bnc->broadcast_list.insert_at(offset, idx);
1731 assert_zero(r);
1732 }
1733 }
1734
1735 // This is only exported for tests.
1736 void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp)
1737 {
1738 DBT k, v;
1739 ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids);
1740 bnc_insert_msg(bnc, msg, is_fresh, cmp);
1741 }
1742
1743 // append a msg to a nonleaf node's child buffer
1744 static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node,
1745 int childnum, const ft_msg &msg, bool is_fresh) {
1746 paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
1747 bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp);
1748 node->set_dirty();
1749 }
1750
1751 // This is only exported for tests.
1752 void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1753 ft_msg msg(key, val, type, msn, xids);
1754 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1755 }
1756
1757 static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1758 // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
1759 // Also we don't worry about the node getting overfull here. It's the caller's problem.
1760 {
1761 unsigned int childnum = (target_childnum >= 0
1762 ? target_childnum
1763 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
1764 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1765 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1766 bnc->flow[0] += flow_deltas[0];
1767 bnc->flow[1] += flow_deltas[1];
1768 }
1769
1770 // TODO: Remove me, I'm boring.
1771 static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) {
1772 return cmp(key, pivot);
1773 }
1774
1775 /* Find the leftmost child that may contain the key.
1776 * If the key exists it will be in the child whose number
1777 * is the return value of this function.
1778 */
1779 int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1780 // a funny case of no pivots
1781 if (node->n_children <= 1) return 0;
1782
1783 DBT pivot;
1784
1785 // check the last key to optimize seq insertions
1786 int n = node->n_children-1;
1787 int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
1788 if (c > 0) return n;
1789
1790 // binary search the pivots
1791 int lo = 0;
1792 int hi = n-1; // skip the last one, we checked it above
1793 int mi;
1794 while (lo < hi) {
1795 mi = (lo + hi) / 2;
1796 c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1797 if (c > 0) {
1798 lo = mi+1;
1799 continue;
1800 }
1801 if (c < 0) {
1802 hi = mi;
1803 continue;
1804 }
1805 return mi;
1806 }
1807 return lo;
1808 }
1809
1810 // Used for HOT.
1811 int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1812 DBT pivot;
1813 int low = 0;
1814 int hi = node->n_children - 1;
1815 int mi;
1816 while (low < hi) {
1817 mi = (low + hi) / 2;
1818 int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1819 if (r > 0) {
1820 low = mi + 1;
1821 } else if (r < 0) {
1822 hi = mi;
1823 } else {
1824 // if they were exactly equal, then we want the sub-tree under
1825 // the next pivot.
1826 return mi + 1;
1827 }
1828 }
1829 invariant(low == hi);
1830 return low;
1831 }
1832
1833 void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
1834 FTNODE CAST_FROM_VOIDP(node, value_data);
1835 node->ct_pair = p;
1836 }
1837
1838 static void
1839 ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1840 // Effect: Put the message into a nonleaf node. We put it into all children, possibly causing the children to become reactive.
1841 // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1842 // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1843 {
1844 for (int i = 0; i < node->n_children; i++) {
1845 ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas);
1846 }
1847 }
1848
1849 static void
1850 ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1851 // Effect: Put the message into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
1852 // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1853 // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1854 //
1855 {
1856
1857 //
1858 // see comments in toku_ft_leaf_apply_msg
1859 // to understand why we handle setting
1860 // node->max_msn_applied_to_node_on_disk here,
1861 // and don't do it in toku_ftnode_put_msg
1862 //
1863 MSN msg_msn = msg.msn();
1864 invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
1865 node->max_msn_applied_to_node_on_disk = msg_msn;
1866
1867 if (ft_msg_type_applies_once(msg.type())) {
1868 ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
1869 } else if (ft_msg_type_applies_all(msg.type())) {
1870 ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas);
1871 } else {
1872 paranoid_invariant(ft_msg_type_does_nothing(msg.type()));
1873 }
1874 }
1875
1876 // Garbage collect one leaf entry.
1877 static void
1878 ft_basement_node_gc_once(BASEMENTNODE bn,
1879 uint32_t index,
1880 void* keyp,
1881 uint32_t keylen,
1882 LEAFENTRY leaf_entry,
1883 txn_gc_info *gc_info,
1884 STAT64INFO_S * delta)
1885 {
1886 paranoid_invariant(leaf_entry);
1887
1888 // Don't run garbage collection on non-mvcc leaf entries.
1889 if (leaf_entry->type != LE_MVCC) {
1890 goto exit;
1891 }
1892
1893 // Don't run garbage collection if this leafentry decides it's not worth it.
1894 if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
1895 goto exit;
1896 }
1897
1898 LEAFENTRY new_leaf_entry;
1899 new_leaf_entry = NULL;
1900
1901 // The mempool doesn't free itself. When it allocates new memory,
1902 // this pointer will be set to the older memory that must now be
1903 // freed.
1904 void * maybe_free;
1905 maybe_free = NULL;
1906
1907 // These will represent the number of bytes and rows changed as
1908 // part of the garbage collection.
1909 int64_t numbytes_delta;
1910 int64_t numrows_delta;
1911 toku_le_garbage_collect(leaf_entry,
1912 &bn->data_buffer,
1913 index,
1914 keyp,
1915 keylen,
1916 gc_info,
1917 &new_leaf_entry,
1918 &numbytes_delta);
1919
1920 numrows_delta = 0;
1921 if (new_leaf_entry) {
1922 numrows_delta = 0;
1923 } else {
1924 numrows_delta = -1;
1925 }
1926
1927 // If we created a new mempool buffer we must free the
1928 // old/original buffer.
1929 if (maybe_free) {
1930 toku_free(maybe_free);
1931 }
1932
1933 // Update stats.
1934 bn->stat64_delta.numrows += numrows_delta;
1935 bn->stat64_delta.numbytes += numbytes_delta;
1936 delta->numrows += numrows_delta;
1937 delta->numbytes += numbytes_delta;
1938
1939 exit:
1940 return;
1941 }
1942
1943 // Garbage collect all leaf entries for a given basement node.
1944 static void
1945 basement_node_gc_all_les(BASEMENTNODE bn,
1946 txn_gc_info *gc_info,
1947 STAT64INFO_S * delta)
1948 {
1949 int r = 0;
1950 uint32_t index = 0;
1951 uint32_t num_leafentries_before;
1952 while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1953 void* keyp = NULL;
1954 uint32_t keylen = 0;
1955 LEAFENTRY leaf_entry;
1956 r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp);
1957 assert_zero(r);
1958 ft_basement_node_gc_once(
1959 bn,
1960 index,
1961 keyp,
1962 keylen,
1963 leaf_entry,
1964 gc_info,
1965 delta
1966 );
1967 // Check if the leaf entry was deleted or not.
1968 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1969 ++index;
1970 }
1971 }
1972 }
1973
1974 // Garbage collect all leaf entires in all basement nodes.
1975 static void
1976 ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
1977 {
1978 toku_ftnode_assert_fully_in_memory(node);
1979 paranoid_invariant_zero(node->height);
1980 // Loop through each leaf entry, garbage collecting as we go.
1981 for (int i = 0; i < node->n_children; ++i) {
1982 // Perform the garbage collection.
1983 BASEMENTNODE bn = BLB(node, i);
1984 STAT64INFO_S delta;
1985 delta.numrows = 0;
1986 delta.numbytes = 0;
1987 basement_node_gc_all_les(bn, gc_info, &delta);
1988 toku_ft_update_stats(&ft->in_memory_stats, delta);
1989 }
1990 }
1991
1992 void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
1993 TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1994 if (logger) {
1995 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
1996 txn_manager_state txn_state_for_gc(txn_manager);
1997 txn_state_for_gc.init();
1998 TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1999
2000 // Perform full garbage collection.
2001 //
2002 // - txn_state_for_gc
2003 // a fresh snapshot of the transaction system.
2004 // - oldest_referenced_xid_for_simple_gc
2005 // the oldest xid in any live list as of right now - suitible for simple gc
2006 // - node->oldest_referenced_xid_known
2007 // the last known oldest referenced xid for this node and any unapplied messages.
2008 // it is a lower bound on the actual oldest referenced xid - but becasue there
2009 // may be abort messages above us, we need to be careful to only use this value
2010 // for implicit promotion (as opposed to the oldest referenced xid for simple gc)
2011 //
2012 // The node has its own oldest referenced xid because it must be careful not to implicitly promote
2013 // provisional entries for transactions that are no longer live, but may have abort messages
2014 // somewhere above us in the tree.
2015 txn_gc_info gc_info(&txn_state_for_gc,
2016 oldest_referenced_xid_for_simple_gc,
2017 node->oldest_referenced_xid_known,
2018 true);
2019 ft_leaf_gc_all_les(ft, node, &gc_info);
2020 }
2021 }
2022
2023 void toku_ftnode_put_msg(
2024 const toku::comparator &cmp,
2025 ft_update_func update_fun,
2026 FTNODE node,
2027 int target_childnum,
2028 const ft_msg &msg,
2029 bool is_fresh,
2030 txn_gc_info* gc_info,
2031 size_t flow_deltas[],
2032 STAT64INFO stats_to_update,
2033 int64_t* logical_rows_delta) {
2034 // Effect: Push message into the subtree rooted at NODE.
2035 // If NODE is a leaf, then
2036 // put message into leaf, applying it to the leafentries
2037 // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
2038 // The node may become overfull. That's not our problem.
2039 toku_ftnode_assert_fully_in_memory(node);
2040 //
2041 // see comments in toku_ft_leaf_apply_msg
2042 // to understand why we don't handle setting
2043 // node->max_msn_applied_to_node_on_disk here,
2044 // and instead defer to these functions
2045 //
2046 if (node->height==0) {
2047 toku_ft_leaf_apply_msg(
2048 cmp,
2049 update_fun,
2050 node,
2051 target_childnum, msg,
2052 gc_info,
2053 nullptr,
2054 stats_to_update,
2055 logical_rows_delta);
2056 } else {
2057 ft_nonleaf_put_msg(
2058 cmp,
2059 node,
2060 target_childnum,
2061 msg,
2062 is_fresh,
2063 flow_deltas);
2064 }
2065 }
2066
2067 // Effect: applies the message to the leaf if the appropriate basement node is
2068 // in memory. This function is called during message injection and/or
2069 // flushing, so the entire node MUST be in memory.
2070 void toku_ft_leaf_apply_msg(
2071 const toku::comparator& cmp,
2072 ft_update_func update_fun,
2073 FTNODE node,
2074 int target_childnum, // which child to inject to, or -1 if unknown
2075 const ft_msg& msg,
2076 txn_gc_info* gc_info,
2077 uint64_t* workdone,
2078 STAT64INFO stats_to_update,
2079 int64_t* logical_rows_delta) {
2080
2081 VERIFY_NODE(t, node);
2082 toku_ftnode_assert_fully_in_memory(node);
2083
2084 //
2085 // Because toku_ft_leaf_apply_msg is called with the intent of permanently
2086 // applying a message to a leaf node (meaning the message is permanently applied
2087 // and will be purged from the system after this call, as opposed to
2088 // toku_apply_ancestors_messages_to_node, which applies a message
2089 // for a query, but the message may still reside in the system and
2090 // be reapplied later), we mark the node as dirty and
2091 // take the opportunity to update node->max_msn_applied_to_node_on_disk.
2092 //
2093 node->set_dirty();
2094
2095 //
2096 // we cannot blindly update node->max_msn_applied_to_node_on_disk,
2097 // we must check to see if the msn is greater that the one already stored,
2098 // because the message may have already been applied earlier (via
2099 // toku_apply_ancestors_messages_to_node) to answer a query
2100 //
2101 // This is why we handle node->max_msn_applied_to_node_on_disk both here
2102 // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg.
2103 //
2104 MSN msg_msn = msg.msn();
2105 if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
2106 node->max_msn_applied_to_node_on_disk = msg_msn;
2107 }
2108
2109 if (ft_msg_type_applies_once(msg.type())) {
2110 unsigned int childnum = (target_childnum >= 0
2111 ? target_childnum
2112 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
2113 BASEMENTNODE bn = BLB(node, childnum);
2114 if (msg.msn().msn > bn->max_msn_applied.msn) {
2115 bn->max_msn_applied = msg.msn();
2116 toku_ft_bn_apply_msg(
2117 cmp,
2118 update_fun,
2119 bn,
2120 msg,
2121 gc_info,
2122 workdone,
2123 stats_to_update,
2124 logical_rows_delta);
2125 } else {
2126 toku_ft_status_note_msn_discard();
2127 }
2128 } else if (ft_msg_type_applies_all(msg.type())) {
2129 for (int childnum=0; childnum<node->n_children; childnum++) {
2130 if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
2131 BLB(node, childnum)->max_msn_applied = msg.msn();
2132 toku_ft_bn_apply_msg(
2133 cmp,
2134 update_fun,
2135 BLB(node, childnum),
2136 msg,
2137 gc_info,
2138 workdone,
2139 stats_to_update,
2140 logical_rows_delta);
2141 } else {
2142 toku_ft_status_note_msn_discard();
2143 }
2144 }
2145 } else if (!ft_msg_type_does_nothing(msg.type())) {
2146 invariant(ft_msg_type_does_nothing(msg.type()));
2147 }
2148 VERIFY_NODE(t, node);
2149 }
2150
2151