1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 /*======
4 This file is part of PerconaFT.
5
6
7 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
8
9 PerconaFT is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License, version 2,
11 as published by the Free Software Foundation.
12
13 PerconaFT is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
20
21 ----------------------------------------
22
23 PerconaFT is free software: you can redistribute it and/or modify
24 it under the terms of the GNU Affero General Public License, version 3,
25 as published by the Free Software Foundation.
26
27 PerconaFT is distributed in the hope that it will be useful,
28 but WITHOUT ANY WARRANTY; without even the implied warranty of
29 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 GNU Affero General Public License for more details.
31
32 You should have received a copy of the GNU Affero General Public License
33 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
34 ======= */
35
36 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
37
38 #include "ft/ft.h"
39 #include "ft/ft-internal.h"
40 #include "ft/serialize/ft_node-serialize.h"
41 #include "ft/node.h"
42 #include "ft/serialize/rbuf.h"
43 #include "ft/serialize/wbuf.h"
44 #include "util/scoped_malloc.h"
45 #include "util/sort.h"
46
47 // Effect: Fill in N as an empty ftnode.
48 // TODO: Rename toku_ftnode_create
toku_initialize_empty_ftnode(FTNODE n,BLOCKNUM blocknum,int height,int num_children,int layout_version,unsigned int flags)49 void toku_initialize_empty_ftnode(FTNODE n, BLOCKNUM blocknum, int height, int num_children, int layout_version, unsigned int flags) {
50 paranoid_invariant(layout_version != 0);
51 paranoid_invariant(height >= 0);
52
53 n->max_msn_applied_to_node_on_disk = ZERO_MSN; // correct value for root node, harmless for others
54 n->flags = flags;
55 n->blocknum = blocknum;
56 n->layout_version = layout_version;
57 n->layout_version_original = layout_version;
58 n->layout_version_read_from_disk = layout_version;
59 n->height = height;
60 n->pivotkeys.create_empty();
61 n->bp = 0;
62 n->n_children = num_children;
63 n->oldest_referenced_xid_known = TXNID_NONE;
64
65 if (num_children > 0) {
66 XMALLOC_N(num_children, n->bp);
67 for (int i = 0; i < num_children; i++) {
68 BP_BLOCKNUM(n,i).b=0;
69 BP_STATE(n,i) = PT_INVALID;
70 BP_WORKDONE(n,i) = 0;
71 BP_INIT_TOUCHED_CLOCK(n, i);
72 set_BNULL(n,i);
73 if (height > 0) {
74 set_BNC(n, i, toku_create_empty_nl());
75 } else {
76 set_BLB(n, i, toku_create_empty_bn());
77 }
78 }
79 }
80 n->set_dirty(); // special case exception, it's okay to mark as dirty because the basements are empty
81
82 toku_ft_status_note_ftnode(height, true);
83 }
84
85 // destroys the internals of the ftnode, but it does not free the values
86 // that are stored
87 // this is common functionality for toku_ftnode_free and rebalance_ftnode_leaf
88 // MUST NOT do anything besides free the structures that have been allocated
toku_destroy_ftnode_internals(FTNODE node)89 void toku_destroy_ftnode_internals(FTNODE node) {
90 node->pivotkeys.destroy();
91 for (int i = 0; i < node->n_children; i++) {
92 if (BP_STATE(node,i) == PT_AVAIL) {
93 if (node->height > 0) {
94 destroy_nonleaf_childinfo(BNC(node,i));
95 } else {
96 paranoid_invariant(BLB_LRD(node, i) == 0);
97 destroy_basement_node(BLB(node, i));
98 }
99 } else if (BP_STATE(node,i) == PT_COMPRESSED) {
100 SUB_BLOCK sb = BSB(node,i);
101 toku_free(sb->compressed_ptr);
102 toku_free(sb);
103 } else {
104 paranoid_invariant(is_BNULL(node, i));
105 }
106 set_BNULL(node, i);
107 }
108 toku_free(node->bp);
109 node->bp = NULL;
110 }
111
112 /* Frees a node, including all the stuff in the hash table. */
toku_ftnode_free(FTNODE * nodep)113 void toku_ftnode_free(FTNODE *nodep) {
114 FTNODE node = *nodep;
115 toku_ft_status_note_ftnode(node->height, false);
116 toku_destroy_ftnode_internals(node);
117 toku_free(node);
118 *nodep = nullptr;
119 }
120
toku_ftnode_update_disk_stats(FTNODE ftnode,FT ft,bool for_checkpoint)121 void toku_ftnode_update_disk_stats(FTNODE ftnode, FT ft, bool for_checkpoint) {
122 STAT64INFO_S deltas = ZEROSTATS;
123 // capture deltas before rebalancing basements for serialization
124 deltas = toku_get_and_clear_basement_stats(ftnode);
125 // locking not necessary here with respect to checkpointing
126 // in Clayface (because of the pending lock and cachetable lock
127 // in toku_cachetable_begin_checkpoint)
128 // essentially, if we are dealing with a for_checkpoint
129 // parameter in a function that is called by the flush_callback,
130 // then the cachetable needs to ensure that this is called in a safe
131 // manner that does not interfere with the beginning
132 // of a checkpoint, which it does with the cachetable lock
133 // and pending lock
134 toku_ft_update_stats(&ft->h->on_disk_stats, deltas);
135 if (for_checkpoint) {
136 toku_ft_update_stats(&ft->checkpoint_header->on_disk_stats, deltas);
137 }
138 }
139
toku_ftnode_clone_partitions(FTNODE node,FTNODE cloned_node)140 void toku_ftnode_clone_partitions(FTNODE node, FTNODE cloned_node) {
141 for (int i = 0; i < node->n_children; i++) {
142 BP_BLOCKNUM(cloned_node,i) = BP_BLOCKNUM(node,i);
143 paranoid_invariant(BP_STATE(node,i) == PT_AVAIL);
144 BP_STATE(cloned_node,i) = PT_AVAIL;
145 BP_WORKDONE(cloned_node, i) = BP_WORKDONE(node, i);
146 if (node->height == 0) {
147 set_BLB(cloned_node, i, toku_clone_bn(BLB(node,i)));
148 } else {
149 set_BNC(cloned_node, i, toku_clone_nl(BNC(node,i)));
150 }
151 }
152 }
153
toku_evict_bn_from_memory(FTNODE node,int childnum,FT ft)154 void toku_evict_bn_from_memory(FTNODE node, int childnum, FT ft) {
155 // free the basement node
156 assert(!node->dirty());
157 BASEMENTNODE bn = BLB(node, childnum);
158 toku_ft_decrease_stats(&ft->in_memory_stats, bn->stat64_delta);
159 toku_ft_adjust_logical_row_count(ft, -BLB_LRD(node, childnum));
160 BLB_LRD(node, childnum) = 0;
161 destroy_basement_node(bn);
162 set_BNULL(node, childnum);
163 BP_STATE(node, childnum) = PT_ON_DISK;
164 }
165
toku_detach_bn(FTNODE node,int childnum)166 BASEMENTNODE toku_detach_bn(FTNODE node, int childnum) {
167 assert(BP_STATE(node, childnum) == PT_AVAIL);
168 BASEMENTNODE bn = BLB(node, childnum);
169 set_BNULL(node, childnum);
170 BP_STATE(node, childnum) = PT_ON_DISK;
171 return bn;
172 }
173
174 //
175 // Orthopush
176 //
177
178 struct store_msg_buffer_offset_extra {
179 int32_t *offsets;
180 int i;
181 };
182
183 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra) __attribute__((nonnull(3)));
store_msg_buffer_offset(const int32_t & offset,const uint32_t UU (idx),struct store_msg_buffer_offset_extra * const extra)184 int store_msg_buffer_offset(const int32_t &offset, const uint32_t UU(idx), struct store_msg_buffer_offset_extra *const extra)
185 {
186 extra->offsets[extra->i] = offset;
187 extra->i++;
188 return 0;
189 }
190
191 /**
192 * Given pointers to offsets within a message buffer where we can find messages,
193 * figure out the MSN of each message, and compare those MSNs. Returns 1,
194 * 0, or -1 if a is larger than, equal to, or smaller than b.
195 */
196 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo);
msg_buffer_offset_msn_cmp(message_buffer & msg_buffer,const int32_t & ao,const int32_t & bo)197 int msg_buffer_offset_msn_cmp(message_buffer &msg_buffer, const int32_t &ao, const int32_t &bo)
198 {
199 MSN amsn, bmsn;
200 msg_buffer.get_message_key_msn(ao, nullptr, &amsn);
201 msg_buffer.get_message_key_msn(bo, nullptr, &bmsn);
202 if (amsn.msn > bmsn.msn) {
203 return +1;
204 }
205 if (amsn.msn < bmsn.msn) {
206 return -1;
207 }
208 return 0;
209 }
210
211 /**
212 * Given a message buffer and and offset, apply the message with
213 * toku_ft_bn_apply_msg, or discard it,
214 * based on its MSN and the MSN of the basement node.
215 */
do_bn_apply_msg(FT_HANDLE ft_handle,BASEMENTNODE bn,message_buffer * msg_buffer,int32_t offset,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)216 static void do_bn_apply_msg(
217 FT_HANDLE ft_handle,
218 BASEMENTNODE bn,
219 message_buffer* msg_buffer,
220 int32_t offset,
221 txn_gc_info* gc_info,
222 uint64_t* workdone,
223 STAT64INFO stats_to_update,
224 int64_t* logical_rows_delta) {
225
226 DBT k, v;
227 ft_msg msg = msg_buffer->get_message(offset, &k, &v);
228
229 // The messages are being iterated over in (key,msn) order or just in
230 // msn order, so all the messages for one key, from one buffer, are in
231 // ascending msn order. So it's ok that we don't update the basement
232 // node's msn until the end.
233 if (msg.msn().msn > bn->max_msn_applied.msn) {
234 toku_ft_bn_apply_msg(
235 ft_handle->ft->cmp,
236 ft_handle->ft->update_fun,
237 bn,
238 msg,
239 gc_info,
240 workdone,
241 stats_to_update,
242 logical_rows_delta);
243 } else {
244 toku_ft_status_note_msn_discard();
245 }
246
247 // We must always mark message as stale since it has been marked
248 // (using omt::iterate_and_mark_range)
249 // It is possible to call do_bn_apply_msg even when it won't apply the
250 // message because the node containing it could have been evicted and
251 // brought back in.
252 msg_buffer->set_freshness(offset, false);
253 }
254
255
256 struct iterate_do_bn_apply_msg_extra {
257 FT_HANDLE t;
258 BASEMENTNODE bn;
259 NONLEAF_CHILDINFO bnc;
260 txn_gc_info *gc_info;
261 uint64_t *workdone;
262 STAT64INFO stats_to_update;
263 int64_t *logical_rows_delta;
264 };
265
266 int iterate_do_bn_apply_msg(
267 const int32_t &offset,
268 const uint32_t UU(idx),
269 struct iterate_do_bn_apply_msg_extra* const e)
270 __attribute__((nonnull(3)));
271
iterate_do_bn_apply_msg(const int32_t & offset,const uint32_t UU (idx),struct iterate_do_bn_apply_msg_extra * const e)272 int iterate_do_bn_apply_msg(
273 const int32_t &offset,
274 const uint32_t UU(idx),
275 struct iterate_do_bn_apply_msg_extra* const e)
276 {
277 do_bn_apply_msg(
278 e->t,
279 e->bn,
280 &e->bnc->msg_buffer,
281 offset,
282 e->gc_info,
283 e->workdone,
284 e->stats_to_update,
285 e->logical_rows_delta);
286 return 0;
287 }
288
289 /**
290 * Given the bounds of the basement node to which we will apply messages,
291 * find the indexes within message_tree which contain the range of
292 * relevant messages.
293 *
294 * The message tree contains offsets into the buffer, where messages are
295 * found. The pivot_bounds are the lower bound exclusive and upper bound
296 * inclusive, because they come from pivot keys in the tree. We want OMT
297 * indices, which must have the lower bound be inclusive and the upper
298 * bound exclusive. We will get these by telling omt::find to look
299 * for something strictly bigger than each of our pivot bounds.
300 *
301 * Outputs the OMT indices in lbi (lower bound inclusive) and ube (upper
302 * bound exclusive).
303 */
304 template<typename find_bounds_omt_t>
305 static void
find_bounds_within_message_tree(const toku::comparator & cmp,const find_bounds_omt_t & message_tree,message_buffer * msg_buffer,const pivot_bounds & bounds,uint32_t * lbi,uint32_t * ube)306 find_bounds_within_message_tree(
307 const toku::comparator &cmp,
308 const find_bounds_omt_t &message_tree, /// tree holding message buffer offsets, in which we want to look for indices
309 message_buffer *msg_buffer, /// message buffer in which messages are found
310 const pivot_bounds &bounds, /// key bounds within the basement node we're applying messages to
311 uint32_t *lbi, /// (output) "lower bound inclusive" (index into message_tree)
312 uint32_t *ube /// (output) "upper bound exclusive" (index into message_tree)
313 )
314 {
315 int r = 0;
316
317 if (!toku_dbt_is_empty(bounds.lbe())) {
318 // By setting msn to MAX_MSN and by using direction of +1, we will
319 // get the first message greater than (in (key, msn) order) any
320 // message (with any msn) with the key lower_bound_exclusive.
321 // This will be a message we want to try applying, so it is the
322 // "lower bound inclusive" within the message_tree.
323 struct toku_msg_buffer_key_msn_heaviside_extra lbi_extra(cmp, msg_buffer, bounds.lbe(), MAX_MSN);
324 int32_t found_lb;
325 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(lbi_extra, +1, &found_lb, lbi);
326 if (r == DB_NOTFOUND) {
327 // There is no relevant data (the lower bound is bigger than
328 // any message in this tree), so we have no range and we're
329 // done.
330 *lbi = 0;
331 *ube = 0;
332 return;
333 }
334 if (!toku_dbt_is_empty(bounds.ubi())) {
335 // Check if what we found for lbi is greater than the upper
336 // bound inclusive that we have. If so, there are no relevant
337 // messages between these bounds.
338 const DBT *ubi = bounds.ubi();
339 const int32_t offset = found_lb;
340 DBT found_lbidbt;
341 msg_buffer->get_message_key_msn(offset, &found_lbidbt, nullptr);
342 int c = cmp(&found_lbidbt, ubi);
343 // These DBTs really are both inclusive bounds, so we need
344 // strict inequality in order to determine that there's
345 // nothing between them. If they're equal, then we actually
346 // need to apply the message pointed to by lbi, and also
347 // anything with the same key but a bigger msn.
348 if (c > 0) {
349 *lbi = 0;
350 *ube = 0;
351 return;
352 }
353 }
354 } else {
355 // No lower bound given, it's negative infinity, so we start at
356 // the first message in the OMT.
357 *lbi = 0;
358 }
359 if (!toku_dbt_is_empty(bounds.ubi())) {
360 // Again, we use an msn of MAX_MSN and a direction of +1 to get
361 // the first thing bigger than the upper_bound_inclusive key.
362 // This is therefore the smallest thing we don't want to apply,
363 // and omt::iterate_on_range will not examine it.
364 struct toku_msg_buffer_key_msn_heaviside_extra ube_extra(cmp, msg_buffer, bounds.ubi(), MAX_MSN);
365 r = message_tree.template find<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(ube_extra, +1, nullptr, ube);
366 if (r == DB_NOTFOUND) {
367 // Couldn't find anything in the buffer bigger than our key,
368 // so we need to look at everything up to the end of
369 // message_tree.
370 *ube = message_tree.size();
371 }
372 } else {
373 // No upper bound given, it's positive infinity, so we need to go
374 // through the end of the OMT.
375 *ube = message_tree.size();
376 }
377 }
378
379 // For each message in the ancestor's buffer (determined by childnum) that
380 // is key-wise between lower_bound_exclusive and upper_bound_inclusive,
381 // apply the message to the basement node. We treat the bounds as minus
382 // or plus infinity respectively if they are NULL. Do not mark the node
383 // as dirty (preserve previous state of 'dirty' bit).
bnc_apply_messages_to_basement_node(FT_HANDLE t,BASEMENTNODE bn,FTNODE ancestor,int childnum,const pivot_bounds & bounds,txn_gc_info * gc_info,bool * msgs_applied)384 static void bnc_apply_messages_to_basement_node(
385 FT_HANDLE t, // used for comparison function
386 BASEMENTNODE bn, // where to apply messages
387 FTNODE ancestor, // the ancestor node where we can find messages to apply
388 int childnum, // which child buffer of ancestor contains messages we want
389 const pivot_bounds &
390 bounds, // contains pivot key bounds of this basement node
391 txn_gc_info *gc_info,
392 bool *msgs_applied) {
393 int r;
394 NONLEAF_CHILDINFO bnc = BNC(ancestor, childnum);
395
396 // Determine the offsets in the message trees between which we need to
397 // apply messages from this buffer
398 STAT64INFO_S stats_delta = {0, 0};
399 uint64_t workdone_this_ancestor = 0;
400 int64_t logical_rows_delta = 0;
401
402 uint32_t stale_lbi, stale_ube;
403 if (!bn->stale_ancestor_messages_applied) {
404 find_bounds_within_message_tree(t->ft->cmp,
405 bnc->stale_message_tree,
406 &bnc->msg_buffer,
407 bounds,
408 &stale_lbi,
409 &stale_ube);
410 } else {
411 stale_lbi = 0;
412 stale_ube = 0;
413 }
414 uint32_t fresh_lbi, fresh_ube;
415 find_bounds_within_message_tree(t->ft->cmp,
416 bnc->fresh_message_tree,
417 &bnc->msg_buffer,
418 bounds,
419 &fresh_lbi,
420 &fresh_ube);
421
422 // We now know where all the messages we must apply are, so one of the
423 // following 4 cases will do the application, depending on which of
424 // the lists contains relevant messages:
425 //
426 // 1. broadcast messages and anything else, or a mix of fresh and stale
427 // 2. only fresh messages
428 // 3. only stale messages
429 if (bnc->broadcast_list.size() > 0 ||
430 (stale_lbi != stale_ube && fresh_lbi != fresh_ube)) {
431 // We have messages in multiple trees, so we grab all
432 // the relevant messages' offsets and sort them by MSN, then apply
433 // them in MSN order.
434 const int buffer_size =
435 ((stale_ube - stale_lbi) + (fresh_ube - fresh_lbi) +
436 bnc->broadcast_list.size());
437 toku::scoped_malloc offsets_buf(buffer_size * sizeof(int32_t));
438 int32_t *offsets = reinterpret_cast<int32_t *>(offsets_buf.get());
439 struct store_msg_buffer_offset_extra sfo_extra = {.offsets = offsets,
440 .i = 0};
441
442 // Populate offsets array with offsets to stale messages
443 r = bnc->stale_message_tree
444 .iterate_on_range<struct store_msg_buffer_offset_extra,
445 store_msg_buffer_offset>(
446 stale_lbi, stale_ube, &sfo_extra);
447 assert_zero(r);
448
449 // Then store fresh offsets, and mark them to be moved to stale later.
450 r = bnc->fresh_message_tree
451 .iterate_and_mark_range<struct store_msg_buffer_offset_extra,
452 store_msg_buffer_offset>(
453 fresh_lbi, fresh_ube, &sfo_extra);
454 assert_zero(r);
455
456 // Store offsets of all broadcast messages.
457 r = bnc->broadcast_list.iterate<struct store_msg_buffer_offset_extra,
458 store_msg_buffer_offset>(&sfo_extra);
459 assert_zero(r);
460 invariant(sfo_extra.i == buffer_size);
461
462 // Sort by MSN.
463 toku::sort<int32_t, message_buffer, msg_buffer_offset_msn_cmp>::
464 mergesort_r(offsets, buffer_size, bnc->msg_buffer);
465
466 // Apply the messages in MSN order.
467 for (int i = 0; i < buffer_size; ++i) {
468 *msgs_applied = true;
469 do_bn_apply_msg(t,
470 bn,
471 &bnc->msg_buffer,
472 offsets[i],
473 gc_info,
474 &workdone_this_ancestor,
475 &stats_delta,
476 &logical_rows_delta);
477 }
478 } else if (stale_lbi == stale_ube) {
479 // No stale messages to apply, we just apply fresh messages, and mark
480 // them to be moved to stale later.
481 struct iterate_do_bn_apply_msg_extra iter_extra = {
482 .t = t,
483 .bn = bn,
484 .bnc = bnc,
485 .gc_info = gc_info,
486 .workdone = &workdone_this_ancestor,
487 .stats_to_update = &stats_delta,
488 .logical_rows_delta = &logical_rows_delta};
489 if (fresh_ube - fresh_lbi > 0)
490 *msgs_applied = true;
491 r = bnc->fresh_message_tree
492 .iterate_and_mark_range<struct iterate_do_bn_apply_msg_extra,
493 iterate_do_bn_apply_msg>(
494 fresh_lbi, fresh_ube, &iter_extra);
495 assert_zero(r);
496 } else {
497 invariant(fresh_lbi == fresh_ube);
498 // No fresh messages to apply, we just apply stale messages.
499
500 if (stale_ube - stale_lbi > 0)
501 *msgs_applied = true;
502 struct iterate_do_bn_apply_msg_extra iter_extra = {
503 .t = t,
504 .bn = bn,
505 .bnc = bnc,
506 .gc_info = gc_info,
507 .workdone = &workdone_this_ancestor,
508 .stats_to_update = &stats_delta,
509 .logical_rows_delta = &logical_rows_delta};
510
511 r = bnc->stale_message_tree
512 .iterate_on_range<struct iterate_do_bn_apply_msg_extra,
513 iterate_do_bn_apply_msg>(
514 stale_lbi, stale_ube, &iter_extra);
515 assert_zero(r);
516 }
517 //
518 // update stats
519 //
520 if (workdone_this_ancestor > 0) {
521 (void)toku_sync_fetch_and_add(&BP_WORKDONE(ancestor, childnum),
522 workdone_this_ancestor);
523 }
524 if (stats_delta.numbytes || stats_delta.numrows) {
525 toku_ft_update_stats(&t->ft->in_memory_stats, stats_delta);
526 }
527 toku_ft_adjust_logical_row_count(t->ft, logical_rows_delta);
528 bn->logical_rows_delta += logical_rows_delta;
529 }
530
531 static void
apply_ancestors_messages_to_bn(FT_HANDLE t,FTNODE node,int childnum,ANCESTORS ancestors,const pivot_bounds & bounds,txn_gc_info * gc_info,bool * msgs_applied)532 apply_ancestors_messages_to_bn(
533 FT_HANDLE t,
534 FTNODE node,
535 int childnum,
536 ANCESTORS ancestors,
537 const pivot_bounds &bounds,
538 txn_gc_info *gc_info,
539 bool* msgs_applied
540 )
541 {
542 BASEMENTNODE curr_bn = BLB(node, childnum);
543 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
544 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
545 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > curr_bn->max_msn_applied.msn) {
546 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
547 bnc_apply_messages_to_basement_node(
548 t,
549 curr_bn,
550 curr_ancestors->node,
551 curr_ancestors->childnum,
552 curr_bounds,
553 gc_info,
554 msgs_applied
555 );
556 // We don't want to check this ancestor node again if the
557 // next time we query it, the msn hasn't changed.
558 curr_bn->max_msn_applied = curr_ancestors->node->max_msn_applied_to_node_on_disk;
559 }
560 }
561 // At this point, we know all the stale messages above this
562 // basement node have been applied, and any new messages will be
563 // fresh, so we don't need to look at stale messages for this
564 // basement node, unless it gets evicted (and this field becomes
565 // false when it's read in again).
566 curr_bn->stale_ancestor_messages_applied = true;
567 }
568
569 void
toku_apply_ancestors_messages_to_node(FT_HANDLE t,FTNODE node,ANCESTORS ancestors,const pivot_bounds & bounds,bool * msgs_applied,int child_to_read)570 toku_apply_ancestors_messages_to_node (
571 FT_HANDLE t,
572 FTNODE node,
573 ANCESTORS ancestors,
574 const pivot_bounds &bounds,
575 bool* msgs_applied,
576 int child_to_read
577 )
578 // Effect:
579 // Bring a leaf node up-to-date according to all the messages in the ancestors.
580 // If the leaf node is already up-to-date then do nothing.
581 // If the leaf node is not already up-to-date, then record the work done
582 // for that leaf in each ancestor.
583 // Requires:
584 // This is being called when pinning a leaf node for the query path.
585 // The entire root-to-leaf path is pinned and appears in the ancestors list.
586 {
587 VERIFY_NODE(t, node);
588 paranoid_invariant(node->height == 0);
589
590 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(t);
591 txn_manager_state txn_state_for_gc(txn_manager);
592
593 TXNID oldest_referenced_xid_for_simple_gc = toku_ft_get_oldest_referenced_xid_estimate(t);
594 txn_gc_info gc_info(&txn_state_for_gc,
595 oldest_referenced_xid_for_simple_gc,
596 node->oldest_referenced_xid_known,
597 true);
598 if (!node->dirty() && child_to_read >= 0) {
599 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
600 apply_ancestors_messages_to_bn(
601 t,
602 node,
603 child_to_read,
604 ancestors,
605 bounds,
606 &gc_info,
607 msgs_applied
608 );
609 }
610 else {
611 // know we are a leaf node
612 // An important invariant:
613 // We MUST bring every available basement node for a dirty node up to date.
614 // flushing on the cleaner thread depends on this. This invariant
615 // allows the cleaner thread to just pick an internal node and flush it
616 // as opposed to being forced to start from the root.
617 for (int i = 0; i < node->n_children; i++) {
618 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
619 apply_ancestors_messages_to_bn(
620 t,
621 node,
622 i,
623 ancestors,
624 bounds,
625 &gc_info,
626 msgs_applied
627 );
628 }
629 }
630 VERIFY_NODE(t, node);
631 }
632
bn_needs_ancestors_messages(FT ft,FTNODE node,int childnum,const pivot_bounds & bounds,ANCESTORS ancestors,MSN * max_msn_applied)633 static bool bn_needs_ancestors_messages(
634 FT ft,
635 FTNODE node,
636 int childnum,
637 const pivot_bounds &bounds,
638 ANCESTORS ancestors,
639 MSN* max_msn_applied
640 )
641 {
642 BASEMENTNODE bn = BLB(node, childnum);
643 const pivot_bounds curr_bounds = bounds.next_bounds(node, childnum);
644 bool needs_ancestors_messages = false;
645 for (ANCESTORS curr_ancestors = ancestors; curr_ancestors; curr_ancestors = curr_ancestors->next) {
646 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > bn->max_msn_applied.msn) {
647 paranoid_invariant(BP_STATE(curr_ancestors->node, curr_ancestors->childnum) == PT_AVAIL);
648 NONLEAF_CHILDINFO bnc = BNC(curr_ancestors->node, curr_ancestors->childnum);
649 if (bnc->broadcast_list.size() > 0) {
650 needs_ancestors_messages = true;
651 goto cleanup;
652 }
653 if (!bn->stale_ancestor_messages_applied) {
654 uint32_t stale_lbi, stale_ube;
655 find_bounds_within_message_tree(ft->cmp,
656 bnc->stale_message_tree,
657 &bnc->msg_buffer,
658 curr_bounds,
659 &stale_lbi,
660 &stale_ube);
661 if (stale_lbi < stale_ube) {
662 needs_ancestors_messages = true;
663 goto cleanup;
664 }
665 }
666 uint32_t fresh_lbi, fresh_ube;
667 find_bounds_within_message_tree(ft->cmp,
668 bnc->fresh_message_tree,
669 &bnc->msg_buffer,
670 curr_bounds,
671 &fresh_lbi,
672 &fresh_ube);
673 if (fresh_lbi < fresh_ube) {
674 needs_ancestors_messages = true;
675 goto cleanup;
676 }
677 if (curr_ancestors->node->max_msn_applied_to_node_on_disk.msn > max_msn_applied->msn) {
678 max_msn_applied->msn = curr_ancestors->node->max_msn_applied_to_node_on_disk.msn;
679 }
680 }
681 }
682 cleanup:
683 return needs_ancestors_messages;
684 }
685
toku_ft_leaf_needs_ancestors_messages(FT ft,FTNODE node,ANCESTORS ancestors,const pivot_bounds & bounds,MSN * const max_msn_in_path,int child_to_read)686 bool toku_ft_leaf_needs_ancestors_messages(
687 FT ft,
688 FTNODE node,
689 ANCESTORS ancestors,
690 const pivot_bounds &bounds,
691 MSN *const max_msn_in_path,
692 int child_to_read
693 )
694 // Effect: Determine whether there are messages in a node's ancestors
695 // which must be applied to it. These messages are in the correct
696 // keyrange for any available basement nodes, and are in nodes with the
697 // correct max_msn_applied_to_node_on_disk.
698 // Notes:
699 // This is an approximate query.
700 // Output:
701 // max_msn_in_path: max of "max_msn_applied_to_node_on_disk" over
702 // ancestors. This is used later to update basement nodes'
703 // max_msn_applied values in case we don't do the full algorithm.
704 // Returns:
705 // true if there may be some such messages
706 // false only if there are definitely no such messages
707 // Rationale:
708 // When we pin a node with a read lock, we want to quickly determine if
709 // we should exchange it for a write lock in preparation for applying
710 // messages. If there are no messages, we don't need the write lock.
711 {
712 paranoid_invariant(node->height == 0);
713 bool needs_ancestors_messages = false;
714 // child_to_read may be -1 in test cases
715 if (!node->dirty() && child_to_read >= 0) {
716 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
717 needs_ancestors_messages = bn_needs_ancestors_messages(
718 ft,
719 node,
720 child_to_read,
721 bounds,
722 ancestors,
723 max_msn_in_path
724 );
725 }
726 else {
727 for (int i = 0; i < node->n_children; ++i) {
728 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
729 needs_ancestors_messages = bn_needs_ancestors_messages(
730 ft,
731 node,
732 i,
733 bounds,
734 ancestors,
735 max_msn_in_path
736 );
737 if (needs_ancestors_messages) {
738 goto cleanup;
739 }
740 }
741 }
742 cleanup:
743 return needs_ancestors_messages;
744 }
745
toku_ft_bn_update_max_msn(FTNODE node,MSN max_msn_applied,int child_to_read)746 void toku_ft_bn_update_max_msn(FTNODE node, MSN max_msn_applied, int child_to_read) {
747 invariant(node->height == 0);
748 if (!node->dirty() && child_to_read >= 0) {
749 paranoid_invariant(BP_STATE(node, child_to_read) == PT_AVAIL);
750 BASEMENTNODE bn = BLB(node, child_to_read);
751 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
752 // see comment below
753 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
754 }
755 }
756 else {
757 for (int i = 0; i < node->n_children; ++i) {
758 if (BP_STATE(node, i) != PT_AVAIL) { continue; }
759 BASEMENTNODE bn = BLB(node, i);
760 if (max_msn_applied.msn > bn->max_msn_applied.msn) {
761 // This function runs in a shared access context, so to silence tools
762 // like DRD, we use a CAS and ignore the result.
763 // Any threads trying to update these basement nodes should be
764 // updating them to the same thing (since they all have a read lock on
765 // the same root-to-leaf path) so this is safe.
766 (void) toku_sync_val_compare_and_swap(&bn->max_msn_applied.msn, bn->max_msn_applied.msn, max_msn_applied.msn);
767 }
768 }
769 }
770 }
771
772 struct copy_to_stale_extra {
773 FT ft;
774 NONLEAF_CHILDINFO bnc;
775 };
776
777 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra) __attribute__((nonnull(3)));
copy_to_stale(const int32_t & offset,const uint32_t UU (idx),struct copy_to_stale_extra * const extra)778 int copy_to_stale(const int32_t &offset, const uint32_t UU(idx), struct copy_to_stale_extra *const extra)
779 {
780 MSN msn;
781 DBT key;
782 extra->bnc->msg_buffer.get_message_key_msn(offset, &key, &msn);
783 struct toku_msg_buffer_key_msn_heaviside_extra heaviside_extra(extra->ft->cmp, &extra->bnc->msg_buffer, &key, msn);
784 int r = extra->bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, heaviside_extra, nullptr);
785 invariant_zero(r);
786 return 0;
787 }
788
toku_ft_bnc_move_messages_to_stale(FT ft,NONLEAF_CHILDINFO bnc)789 void toku_ft_bnc_move_messages_to_stale(FT ft, NONLEAF_CHILDINFO bnc) {
790 struct copy_to_stale_extra cts_extra = { .ft = ft, .bnc = bnc };
791 int r = bnc->fresh_message_tree.iterate_over_marked<struct copy_to_stale_extra, copy_to_stale>(&cts_extra);
792 invariant_zero(r);
793 bnc->fresh_message_tree.delete_all_marked();
794 }
795
toku_move_ftnode_messages_to_stale(FT ft,FTNODE node)796 void toku_move_ftnode_messages_to_stale(FT ft, FTNODE node) {
797 invariant(node->height > 0);
798 for (int i = 0; i < node->n_children; ++i) {
799 if (BP_STATE(node, i) != PT_AVAIL) {
800 continue;
801 }
802 NONLEAF_CHILDINFO bnc = BNC(node, i);
803 // We can't delete things out of the fresh tree inside the above
804 // procedures because we're still looking at the fresh tree. Instead
805 // we have to move messages after we're done looking at it.
806 toku_ft_bnc_move_messages_to_stale(ft, bnc);
807 }
808 }
809
810 //
811 // Balance // Availibility // Size
812
813 struct rebalance_array_info {
814 uint32_t offset;
815 LEAFENTRY *le_array;
816 uint32_t *key_sizes_array;
817 const void **key_ptr_array;
fnrebalance_array_info818 static int fn(const void* key, const uint32_t keylen, const LEAFENTRY &le,
819 const uint32_t idx, struct rebalance_array_info *const ai) {
820 ai->le_array[idx+ai->offset] = le;
821 ai->key_sizes_array[idx+ai->offset] = keylen;
822 ai->key_ptr_array[idx+ai->offset] = key;
823 return 0;
824 }
825 };
826
827 // There must still be at least one child
828 // Requires that all messages in buffers above have been applied.
829 // Because all messages above have been applied, setting msn of all new basements
830 // to max msn of existing basements is correct. (There cannot be any messages in
831 // buffers above that still need to be applied.)
toku_ftnode_leaf_rebalance(FTNODE node,unsigned int basementnodesize)832 void toku_ftnode_leaf_rebalance(FTNODE node, unsigned int basementnodesize) {
833
834 assert(node->height == 0);
835 assert(node->dirty());
836
837 uint32_t num_orig_basements = node->n_children;
838 // Count number of leaf entries in this leaf (num_le).
839 uint32_t num_le = 0;
840 for (uint32_t i = 0; i < num_orig_basements; i++) {
841 num_le += BLB_DATA(node, i)->num_klpairs();
842 }
843
844 uint32_t num_alloc = num_le ? num_le : 1; // simplify logic below by always having at least one entry per array
845
846 // Create an array of OMTVALUE's that store all the pointers to all the data.
847 // Each element in leafpointers is a pointer to a leaf.
848 toku::scoped_malloc leafpointers_buf(sizeof(LEAFENTRY) * num_alloc);
849 LEAFENTRY *leafpointers = reinterpret_cast<LEAFENTRY *>(leafpointers_buf.get());
850 leafpointers[0] = NULL;
851
852 toku::scoped_malloc key_pointers_buf(sizeof(void *) * num_alloc);
853 const void **key_pointers = reinterpret_cast<const void **>(key_pointers_buf.get());
854 key_pointers[0] = NULL;
855
856 toku::scoped_malloc key_sizes_buf(sizeof(uint32_t) * num_alloc);
857 uint32_t *key_sizes = reinterpret_cast<uint32_t *>(key_sizes_buf.get());
858
859 // Capture pointers to old mempools' buffers (so they can be destroyed)
860 toku::scoped_malloc old_bns_buf(sizeof(BASEMENTNODE) * num_orig_basements);
861 BASEMENTNODE *old_bns = reinterpret_cast<BASEMENTNODE *>(old_bns_buf.get());
862 old_bns[0] = NULL;
863
864 uint32_t curr_le = 0;
865 for (uint32_t i = 0; i < num_orig_basements; i++) {
866 bn_data* bd = BLB_DATA(node, i);
867 struct rebalance_array_info ai {.offset = curr_le, .le_array = leafpointers, .key_sizes_array = key_sizes, .key_ptr_array = key_pointers };
868 bd->iterate<rebalance_array_info, rebalance_array_info::fn>(&ai);
869 curr_le += bd->num_klpairs();
870 }
871
872 // Create an array that will store indexes of new pivots.
873 // Each element in new_pivots is the index of a pivot key.
874 // (Allocating num_le of them is overkill, but num_le is an upper bound.)
875 toku::scoped_malloc new_pivots_buf(sizeof(uint32_t) * num_alloc);
876 uint32_t *new_pivots = reinterpret_cast<uint32_t *>(new_pivots_buf.get());
877 new_pivots[0] = 0;
878
879 // Each element in le_sizes is the size of the leafentry pointed to by leafpointers.
880 toku::scoped_malloc le_sizes_buf(sizeof(size_t) * num_alloc);
881 size_t *le_sizes = reinterpret_cast<size_t *>(le_sizes_buf.get());
882 le_sizes[0] = 0;
883
884 // Create an array that will store the size of each basement.
885 // This is the sum of the leaf sizes of all the leaves in that basement.
886 // We don't know how many basements there will be, so we use num_le as the upper bound.
887
888 // Sum of all le sizes in a single basement
889 toku::scoped_calloc bn_le_sizes_buf(sizeof(size_t) * num_alloc);
890 size_t *bn_le_sizes = reinterpret_cast<size_t *>(bn_le_sizes_buf.get());
891
892 // Sum of all key sizes in a single basement
893 toku::scoped_calloc bn_key_sizes_buf(sizeof(size_t) * num_alloc);
894 size_t *bn_key_sizes = reinterpret_cast<size_t *>(bn_key_sizes_buf.get());
895
896 // TODO 4050: All these arrays should be combined into a single array of some bn_info struct (pivot, msize, num_les).
897 // Each entry is the number of leafentries in this basement. (Again, num_le is overkill upper baound.)
898 toku::scoped_malloc num_les_this_bn_buf(sizeof(uint32_t) * num_alloc);
899 uint32_t *num_les_this_bn = reinterpret_cast<uint32_t *>(num_les_this_bn_buf.get());
900 num_les_this_bn[0] = 0;
901
902 // Figure out the new pivots.
903 // We need the index of each pivot, and for each basement we need
904 // the number of leaves and the sum of the sizes of the leaves (memory requirement for basement).
905 uint32_t curr_pivot = 0;
906 uint32_t num_le_in_curr_bn = 0;
907 uint32_t bn_size_so_far = 0;
908 for (uint32_t i = 0; i < num_le; i++) {
909 uint32_t curr_le_size = leafentry_disksize((LEAFENTRY) leafpointers[i]);
910 le_sizes[i] = curr_le_size;
911 if ((bn_size_so_far + curr_le_size + sizeof(uint32_t) + key_sizes[i] > basementnodesize) && (num_le_in_curr_bn != 0)) {
912 // cap off the current basement node to end with the element before i
913 new_pivots[curr_pivot] = i-1;
914 curr_pivot++;
915 num_le_in_curr_bn = 0;
916 bn_size_so_far = 0;
917 }
918 num_le_in_curr_bn++;
919 num_les_this_bn[curr_pivot] = num_le_in_curr_bn;
920 bn_le_sizes[curr_pivot] += curr_le_size;
921 bn_key_sizes[curr_pivot] += sizeof(uint32_t) + key_sizes[i]; // uint32_t le_offset
922 bn_size_so_far += curr_le_size + sizeof(uint32_t) + key_sizes[i];
923 }
924 // curr_pivot is now the total number of pivot keys in the leaf node
925 int num_pivots = curr_pivot;
926 int num_children = num_pivots + 1;
927
928 // now we need to fill in the new basement nodes and pivots
929
930 // TODO: (Zardosht) this is an ugly thing right now
931 // Need to figure out how to properly deal with seqinsert.
932 // I am not happy with how this is being
933 // handled with basement nodes
934 uint32_t tmp_seqinsert = BLB_SEQINSERT(node, num_orig_basements - 1);
935
936 // choose the max msn applied to any basement as the max msn applied to all new basements
937 MSN max_msn = ZERO_MSN;
938 for (uint32_t i = 0; i < num_orig_basements; i++) {
939 MSN curr_msn = BLB_MAX_MSN_APPLIED(node,i);
940 max_msn = (curr_msn.msn > max_msn.msn) ? curr_msn : max_msn;
941 }
942 // remove the basement node in the node, we've saved a copy
943 for (uint32_t i = 0; i < num_orig_basements; i++) {
944 // save a reference to the old basement nodes
945 // we will need them to ensure that the memory
946 // stays intact
947 old_bns[i] = toku_detach_bn(node, i);
948 }
949 // Now destroy the old basements, but do not destroy leaves
950 toku_destroy_ftnode_internals(node);
951
952 // now reallocate pieces and start filling them in
953 invariant(num_children > 0);
954
955 node->n_children = num_children;
956 XCALLOC_N(num_children, node->bp); // allocate pointers to basements (bp)
957 for (int i = 0; i < num_children; i++) {
958 set_BLB(node, i, toku_create_empty_bn()); // allocate empty basements and set bp pointers
959 }
960
961 // now we start to fill in the data
962
963 // first the pivots
964 toku::scoped_malloc pivotkeys_buf(num_pivots * sizeof(DBT));
965 DBT *pivotkeys = reinterpret_cast<DBT *>(pivotkeys_buf.get());
966 for (int i = 0; i < num_pivots; i++) {
967 uint32_t size = key_sizes[new_pivots[i]];
968 const void *key = key_pointers[new_pivots[i]];
969 toku_fill_dbt(&pivotkeys[i], key, size);
970 }
971 node->pivotkeys.create_from_dbts(pivotkeys, num_pivots);
972
973 uint32_t baseindex_this_bn = 0;
974 // now the basement nodes
975 for (int i = 0; i < num_children; i++) {
976 // put back seqinsert
977 BLB_SEQINSERT(node, i) = tmp_seqinsert;
978
979 // create start (inclusive) and end (exclusive) boundaries for data of basement node
980 uint32_t curr_start = (i==0) ? 0 : new_pivots[i-1]+1; // index of first leaf in basement
981 uint32_t curr_end = (i==num_pivots) ? num_le : new_pivots[i]+1; // index of first leaf in next basement
982 uint32_t num_in_bn = curr_end - curr_start; // number of leaves in this basement
983
984 // create indexes for new basement
985 invariant(baseindex_this_bn == curr_start);
986 uint32_t num_les_to_copy = num_les_this_bn[i];
987 invariant(num_les_to_copy == num_in_bn);
988
989 bn_data* bd = BLB_DATA(node, i);
990 bd->set_contents_as_clone_of_sorted_array(
991 num_les_to_copy,
992 &key_pointers[baseindex_this_bn],
993 &key_sizes[baseindex_this_bn],
994 &leafpointers[baseindex_this_bn],
995 &le_sizes[baseindex_this_bn],
996 bn_key_sizes[i], // Total key sizes
997 bn_le_sizes[i] // total le sizes
998 );
999
1000 BP_STATE(node,i) = PT_AVAIL;
1001 BP_TOUCH_CLOCK(node,i);
1002 BLB_MAX_MSN_APPLIED(node,i) = max_msn;
1003 baseindex_this_bn += num_les_to_copy; // set to index of next bn
1004 }
1005 node->max_msn_applied_to_node_on_disk = max_msn;
1006
1007 // destroy buffers of old mempools
1008 for (uint32_t i = 0; i < num_orig_basements; i++) {
1009 destroy_basement_node(old_bns[i]);
1010 }
1011 }
1012
toku_ftnode_fully_in_memory(FTNODE node)1013 bool toku_ftnode_fully_in_memory(FTNODE node) {
1014 for (int i = 0; i < node->n_children; i++) {
1015 if (BP_STATE(node,i) != PT_AVAIL) {
1016 return false;
1017 }
1018 }
1019 return true;
1020 }
1021
toku_ftnode_assert_fully_in_memory(FTNODE UU (node))1022 void toku_ftnode_assert_fully_in_memory(FTNODE UU(node)) {
1023 paranoid_invariant(toku_ftnode_fully_in_memory(node));
1024 }
1025
toku_ftnode_leaf_num_entries(FTNODE node)1026 uint32_t toku_ftnode_leaf_num_entries(FTNODE node) {
1027 toku_ftnode_assert_fully_in_memory(node);
1028 uint32_t num_entries = 0;
1029 for (int i = 0; i < node->n_children; i++) {
1030 num_entries += BLB_DATA(node, i)->num_klpairs();
1031 }
1032 return num_entries;
1033 }
1034
toku_ftnode_get_leaf_reactivity(FTNODE node,uint32_t nodesize)1035 enum reactivity toku_ftnode_get_leaf_reactivity(FTNODE node, uint32_t nodesize) {
1036 enum reactivity re = RE_STABLE;
1037 toku_ftnode_assert_fully_in_memory(node);
1038 paranoid_invariant(node->height==0);
1039 unsigned int size = toku_serialize_ftnode_size(node);
1040 if (size > nodesize && toku_ftnode_leaf_num_entries(node) > 1) {
1041 re = RE_FISSIBLE;
1042 } else if ((size*4) < nodesize && !BLB_SEQINSERT(node, node->n_children-1)) {
1043 re = RE_FUSIBLE;
1044 }
1045 return re;
1046 }
1047
toku_ftnode_get_nonleaf_reactivity(FTNODE node,unsigned int fanout)1048 enum reactivity toku_ftnode_get_nonleaf_reactivity(FTNODE node, unsigned int fanout) {
1049 paranoid_invariant(node->height > 0);
1050 int n_children = node->n_children;
1051 if (n_children > (int) fanout) {
1052 return RE_FISSIBLE;
1053 }
1054 if (n_children * 4 < (int) fanout) {
1055 return RE_FUSIBLE;
1056 }
1057 return RE_STABLE;
1058 }
1059
toku_ftnode_get_reactivity(FT ft,FTNODE node)1060 enum reactivity toku_ftnode_get_reactivity(FT ft, FTNODE node) {
1061 toku_ftnode_assert_fully_in_memory(node);
1062 if (node->height == 0) {
1063 return toku_ftnode_get_leaf_reactivity(node, ft->h->nodesize);
1064 } else {
1065 return toku_ftnode_get_nonleaf_reactivity(node, ft->h->fanout);
1066 }
1067 }
1068
toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc)1069 unsigned int toku_bnc_nbytesinbuf(NONLEAF_CHILDINFO bnc) {
1070 return bnc->msg_buffer.buffer_size_in_use();
1071 }
1072
1073 // Return true if the size of the buffers plus the amount of work done is large enough.
1074 // Return false if there is nothing to be flushed (the buffers empty).
toku_ftnode_nonleaf_is_gorged(FTNODE node,uint32_t nodesize)1075 bool toku_ftnode_nonleaf_is_gorged(FTNODE node, uint32_t nodesize) {
1076 uint64_t size = toku_serialize_ftnode_size(node);
1077
1078 bool buffers_are_empty = true;
1079 toku_ftnode_assert_fully_in_memory(node);
1080 //
1081 // the nonleaf node is gorged if the following holds true:
1082 // - the buffers are non-empty
1083 // - the total workdone by the buffers PLUS the size of the buffers
1084 // is greater than nodesize (which as of Maxwell should be
1085 // 4MB)
1086 //
1087 paranoid_invariant(node->height > 0);
1088 for (int child = 0; child < node->n_children; ++child) {
1089 size += BP_WORKDONE(node, child);
1090 }
1091 for (int child = 0; child < node->n_children; ++child) {
1092 if (toku_bnc_nbytesinbuf(BNC(node, child)) > 0) {
1093 buffers_are_empty = false;
1094 break;
1095 }
1096 }
1097 return ((size > nodesize)
1098 &&
1099 (!buffers_are_empty));
1100 }
1101
toku_bnc_n_entries(NONLEAF_CHILDINFO bnc)1102 int toku_bnc_n_entries(NONLEAF_CHILDINFO bnc) {
1103 return bnc->msg_buffer.num_entries();
1104 }
1105
1106 // how much memory does this child buffer consume?
toku_bnc_memory_size(NONLEAF_CHILDINFO bnc)1107 long toku_bnc_memory_size(NONLEAF_CHILDINFO bnc) {
1108 return (sizeof(*bnc) +
1109 bnc->msg_buffer.memory_footprint() +
1110 bnc->fresh_message_tree.memory_size() +
1111 bnc->stale_message_tree.memory_size() +
1112 bnc->broadcast_list.memory_size());
1113 }
1114
1115 // how much memory in this child buffer holds useful data?
1116 // originally created solely for use by test program(s).
toku_bnc_memory_used(NONLEAF_CHILDINFO bnc)1117 long toku_bnc_memory_used(NONLEAF_CHILDINFO bnc) {
1118 return (sizeof(*bnc) +
1119 bnc->msg_buffer.memory_size_in_use() +
1120 bnc->fresh_message_tree.memory_size() +
1121 bnc->stale_message_tree.memory_size() +
1122 bnc->broadcast_list.memory_size());
1123 }
1124
1125 //
1126 // Garbage collection
1127 // Message injection
1128 // Message application
1129 //
1130
1131 // Used only by test programs: append a child node to a parent node
toku_ft_nonleaf_append_child(FTNODE node,FTNODE child,const DBT * pivotkey)1132 void toku_ft_nonleaf_append_child(FTNODE node, FTNODE child, const DBT *pivotkey) {
1133 int childnum = node->n_children;
1134 node->n_children++;
1135 REALLOC_N(node->n_children, node->bp);
1136 BP_BLOCKNUM(node,childnum) = child->blocknum;
1137 BP_STATE(node,childnum) = PT_AVAIL;
1138 BP_WORKDONE(node, childnum) = 0;
1139 set_BNC(node, childnum, toku_create_empty_nl());
1140 if (pivotkey) {
1141 invariant(childnum > 0);
1142 node->pivotkeys.insert_at(pivotkey, childnum - 1);
1143 }
1144 node->set_dirty();
1145 }
1146
1147 void
toku_ft_bn_apply_msg_once(BASEMENTNODE bn,const ft_msg & msg,uint32_t idx,uint32_t le_keylen,LEAFENTRY le,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1148 toku_ft_bn_apply_msg_once (
1149 BASEMENTNODE bn,
1150 const ft_msg &msg,
1151 uint32_t idx,
1152 uint32_t le_keylen,
1153 LEAFENTRY le,
1154 txn_gc_info *gc_info,
1155 uint64_t *workdone,
1156 STAT64INFO stats_to_update,
1157 int64_t *logical_rows_delta
1158 )
1159 // Effect: Apply msg to leafentry (msn is ignored)
1160 // Calculate work done by message on leafentry and add it to caller's workdone counter.
1161 // idx is the location where it goes
1162 // le is old leafentry
1163 {
1164 size_t newsize=0, oldsize=0, workdone_this_le=0;
1165 LEAFENTRY new_le=0;
1166 // how many bytes of user data (not including overhead) were added or
1167 // deleted from this row
1168 int64_t numbytes_delta = 0;
1169 // will be +1 or -1 or 0 (if row was added or deleted or not)
1170 int64_t numrows_delta = 0;
1171 // will be +1, -1 or 0 if a message that was accounted for logically has
1172 // changed in meaning such as an insert changed to an update or a delete
1173 // changed to a noop
1174 int64_t logical_rows_delta_le = 0;
1175 uint32_t key_storage_size = msg.kdbt()->size + sizeof(uint32_t);
1176 if (le) {
1177 oldsize = leafentry_memsize(le) + key_storage_size;
1178 }
1179
1180 // toku_le_apply_msg() may call bn_data::mempool_malloc_and_update_dmt()
1181 // to allocate more space. That means le is guaranteed to not cause a
1182 // sigsegv but it may point to a mempool that is no longer in use.
1183 // We'll have to release the old mempool later.
1184 logical_rows_delta_le = toku_le_apply_msg(
1185 msg,
1186 le,
1187 &bn->data_buffer,
1188 idx,
1189 le_keylen,
1190 gc_info,
1191 &new_le,
1192 &numbytes_delta);
1193
1194 // at this point, we cannot trust cmd->u.id.key to be valid.
1195 // The dmt may have realloced its mempool and freed the one containing key.
1196
1197 newsize = new_le ? (leafentry_memsize(new_le) + + key_storage_size) : 0;
1198 if (le && new_le) {
1199 workdone_this_le = (oldsize > newsize ? oldsize : newsize); // work done is max of le size before and after message application
1200
1201 } else { // we did not just replace a row, so ...
1202 if (le) {
1203 // ... we just deleted a row ...
1204 workdone_this_le = oldsize;
1205 numrows_delta = -1;
1206 }
1207 if (new_le) {
1208 // ... or we just added a row
1209 workdone_this_le = newsize;
1210 numrows_delta = 1;
1211 }
1212 }
1213 if (FT_LIKELY(workdone != NULL)) { // test programs may call with NULL
1214 *workdone += workdone_this_le;
1215 }
1216
1217 if (FT_LIKELY(logical_rows_delta != NULL)) {
1218 *logical_rows_delta += logical_rows_delta_le;
1219 }
1220 // now update stat64 statistics
1221 bn->stat64_delta.numrows += numrows_delta;
1222 bn->stat64_delta.numbytes += numbytes_delta;
1223 // the only reason stats_to_update may be null is for tests
1224 if (FT_LIKELY(stats_to_update != NULL)) {
1225 stats_to_update->numrows += numrows_delta;
1226 stats_to_update->numbytes += numbytes_delta;
1227 }
1228 }
1229
1230 static const uint32_t setval_tag = 0xee0ccb99; // this was gotten by doing "cat /dev/random|head -c4|od -x" to get a random number. We want to make sure that the user actually passes us the setval_extra_s that we passed in.
1231 struct setval_extra_s {
1232 uint32_t tag;
1233 bool did_set_val;
1234 // any error code that setval_fun wants to return goes here.
1235 int setval_r;
1236 // need arguments for toku_ft_bn_apply_msg_once
1237 BASEMENTNODE bn;
1238 // captured from original message, not currently used
1239 MSN msn;
1240 XIDS xids;
1241 const DBT* key;
1242 uint32_t idx;
1243 uint32_t le_keylen;
1244 LEAFENTRY le;
1245 txn_gc_info* gc_info;
1246 uint64_t* workdone; // set by toku_ft_bn_apply_msg_once()
1247 STAT64INFO stats_to_update;
1248 int64_t* logical_rows_delta;
1249 };
1250
1251 /*
1252 * If new_val == NULL, we send a delete message instead of an insert.
1253 * This happens here instead of in do_delete() for consistency.
1254 * setval_fun() is called from handlerton, passing in svextra_v
1255 * from setval_extra_s input arg to ft->update_fun().
1256 */
setval_fun(const DBT * new_val,void * svextra_v)1257 static void setval_fun (const DBT *new_val, void *svextra_v) {
1258 struct setval_extra_s *CAST_FROM_VOIDP(svextra, svextra_v);
1259 paranoid_invariant(svextra->tag==setval_tag);
1260 paranoid_invariant(!svextra->did_set_val);
1261 svextra->did_set_val = true;
1262
1263 {
1264 // can't leave scope until toku_ft_bn_apply_msg_once if
1265 // this is a delete
1266 DBT val;
1267 ft_msg msg(
1268 svextra->key,
1269 new_val ? new_val : toku_init_dbt(&val),
1270 new_val ? FT_INSERT : FT_DELETE_ANY,
1271 svextra->msn,
1272 svextra->xids);
1273 toku_ft_bn_apply_msg_once(
1274 svextra->bn,
1275 msg,
1276 svextra->idx,
1277 svextra->le_keylen,
1278 svextra->le,
1279 svextra->gc_info,
1280 svextra->workdone,
1281 svextra->stats_to_update,
1282 svextra->logical_rows_delta);
1283 svextra->setval_r = 0;
1284 }
1285 }
1286
1287 // We are already past the msn filter (in toku_ft_bn_apply_msg(), which calls
1288 // do_update()), so capturing the msn in the setval_extra_s is not strictly
1289 // required. The alternative would be to put a dummy msn in the messages
1290 // created by setval_fun(), but preserving the original msn seems cleaner and
1291 // it preserves accountability at a lower layer.
do_update(ft_update_func update_fun,const DESCRIPTOR_S * desc,BASEMENTNODE bn,const ft_msg & msg,uint32_t idx,LEAFENTRY le,void * keydata,uint32_t keylen,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1292 static int do_update(
1293 ft_update_func update_fun,
1294 const DESCRIPTOR_S* desc,
1295 BASEMENTNODE bn,
1296 const ft_msg &msg,
1297 uint32_t idx,
1298 LEAFENTRY le,
1299 void* keydata,
1300 uint32_t keylen,
1301 txn_gc_info* gc_info,
1302 uint64_t* workdone,
1303 STAT64INFO stats_to_update,
1304 int64_t* logical_rows_delta) {
1305
1306 LEAFENTRY le_for_update;
1307 DBT key;
1308 const DBT *keyp;
1309 const DBT *update_function_extra;
1310 DBT vdbt;
1311 const DBT *vdbtp;
1312
1313 // the location of data depends whether this is a regular or
1314 // broadcast update
1315 if (msg.type() == FT_UPDATE) {
1316 // key is passed in with command (should be same as from le)
1317 // update function extra is passed in with command
1318 keyp = msg.kdbt();
1319 update_function_extra = msg.vdbt();
1320 } else {
1321 invariant(msg.type() == FT_UPDATE_BROADCAST_ALL);
1322 // key is not passed in with broadcast, it comes from le
1323 // update function extra is passed in with command
1324 paranoid_invariant(le); // for broadcast updates, we just hit all leafentries
1325 // so this cannot be null
1326 paranoid_invariant(keydata);
1327 paranoid_invariant(keylen);
1328 paranoid_invariant(msg.kdbt()->size == 0);
1329 keyp = toku_fill_dbt(&key, keydata, keylen);
1330 update_function_extra = msg.vdbt();
1331 }
1332 toku_ft_status_note_update(msg.type() == FT_UPDATE_BROADCAST_ALL);
1333
1334 if (le && !le_latest_is_del(le)) {
1335 // if the latest val exists, use it, and we'll use the leafentry later
1336 uint32_t vallen;
1337 void *valp = le_latest_val_and_len(le, &vallen);
1338 vdbtp = toku_fill_dbt(&vdbt, valp, vallen);
1339 } else {
1340 // otherwise, the val and leafentry are both going to be null
1341 vdbtp = NULL;
1342 }
1343 le_for_update = le;
1344
1345 struct setval_extra_s setval_extra = {
1346 setval_tag,
1347 false,
1348 0,
1349 bn,
1350 msg.msn(),
1351 msg.xids(),
1352 keyp,
1353 idx,
1354 keylen,
1355 le_for_update,
1356 gc_info,
1357 workdone,
1358 stats_to_update,
1359 logical_rows_delta
1360 };
1361 // call handlerton's ft->update_fun(), which passes setval_extra
1362 // to setval_fun()
1363 FAKE_DB(db, desc);
1364 int r = update_fun(
1365 &db,
1366 keyp,
1367 vdbtp,
1368 update_function_extra,
1369 setval_fun,
1370 &setval_extra);
1371
1372 if (r == 0) { r = setval_extra.setval_r; }
1373 return r;
1374 }
1375
1376 // Should be renamed as something like "apply_msg_to_basement()."
toku_ft_bn_apply_msg(const toku::comparator & cmp,ft_update_func update_fun,BASEMENTNODE bn,const ft_msg & msg,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)1377 void toku_ft_bn_apply_msg(
1378 const toku::comparator& cmp,
1379 ft_update_func update_fun,
1380 BASEMENTNODE bn,
1381 const ft_msg& msg,
1382 txn_gc_info* gc_info,
1383 uint64_t* workdone,
1384 STAT64INFO stats_to_update,
1385 int64_t* logical_rows_delta) {
1386 // Effect:
1387 // Put a msg into a leaf.
1388 // Calculate work done by message on leafnode and add it to caller's
1389 // workdone counter.
1390 // The leaf could end up "too big" or "too small". The caller must fix that up.
1391 LEAFENTRY storeddata;
1392 void* key = NULL;
1393 uint32_t keylen = 0;
1394
1395 uint32_t num_klpairs;
1396 int r;
1397 struct toku_msg_leafval_heaviside_extra be(cmp, msg.kdbt());
1398
1399 unsigned int doing_seqinsert = bn->seqinsert;
1400 bn->seqinsert = 0;
1401
1402 switch (msg.type()) {
1403 case FT_INSERT_NO_OVERWRITE:
1404 case FT_INSERT: {
1405 uint32_t idx;
1406 if (doing_seqinsert) {
1407 idx = bn->data_buffer.num_klpairs();
1408 DBT kdbt;
1409 r = bn->data_buffer.fetch_key_and_len(idx-1, &kdbt.size, &kdbt.data);
1410 if (r != 0) goto fz;
1411 int c = toku_msg_leafval_heaviside(kdbt, be);
1412 if (c >= 0) goto fz;
1413 r = DB_NOTFOUND;
1414 } else {
1415 fz:
1416 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1417 be,
1418 &storeddata,
1419 &key,
1420 &keylen,
1421 &idx
1422 );
1423 }
1424 if (r==DB_NOTFOUND) {
1425 storeddata = 0;
1426 } else {
1427 assert_zero(r);
1428 }
1429 toku_ft_bn_apply_msg_once(
1430 bn,
1431 msg,
1432 idx,
1433 keylen,
1434 storeddata,
1435 gc_info,
1436 workdone,
1437 stats_to_update,
1438 logical_rows_delta);
1439
1440 // if the insertion point is within a window of the right edge of
1441 // the leaf then it is sequential
1442 // window = min(32, number of leaf entries/16)
1443 {
1444 uint32_t s = bn->data_buffer.num_klpairs();
1445 uint32_t w = s / 16;
1446 if (w == 0) w = 1;
1447 if (w > 32) w = 32;
1448
1449 // within the window?
1450 if (s - idx <= w)
1451 bn->seqinsert = doing_seqinsert + 1;
1452 }
1453 break;
1454 }
1455 case FT_DELETE_ANY:
1456 case FT_ABORT_ANY:
1457 case FT_COMMIT_ANY: {
1458 uint32_t idx;
1459 // Apply to all the matches
1460
1461 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1462 be,
1463 &storeddata,
1464 &key,
1465 &keylen,
1466 &idx);
1467 if (r == DB_NOTFOUND) break;
1468 assert_zero(r);
1469 toku_ft_bn_apply_msg_once(
1470 bn,
1471 msg,
1472 idx,
1473 keylen,
1474 storeddata,
1475 gc_info,
1476 workdone,
1477 stats_to_update,
1478 logical_rows_delta);
1479 break;
1480 }
1481 case FT_OPTIMIZE_FOR_UPGRADE:
1482 // fall through so that optimize_for_upgrade performs rest of the optimize logic
1483 case FT_COMMIT_BROADCAST_ALL:
1484 case FT_OPTIMIZE:
1485 // Apply to all leafentries
1486 num_klpairs = bn->data_buffer.num_klpairs();
1487 for (uint32_t idx = 0; idx < num_klpairs; ) {
1488 void* curr_keyp = NULL;
1489 uint32_t curr_keylen = 0;
1490 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1491 assert_zero(r);
1492 int deleted = 0;
1493 if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
1494 // message application code needs a key in order to determine
1495 // how much work was done by this message. since this is a
1496 // broadcast message, we have to create a new message whose
1497 // key is the current le's key.
1498 DBT curr_keydbt;
1499 ft_msg curr_msg(
1500 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1501 msg.vdbt(),
1502 msg.type(),
1503 msg.msn(),
1504 msg.xids());
1505 toku_ft_bn_apply_msg_once(
1506 bn,
1507 curr_msg,
1508 idx,
1509 curr_keylen,
1510 storeddata,
1511 gc_info,
1512 workdone,
1513 stats_to_update,
1514 logical_rows_delta);
1515 // at this point, we cannot trust msg.kdbt to be valid.
1516 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1517 if (new_dmt_size != num_klpairs) {
1518 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1519 //Item was deleted.
1520 deleted = 1;
1521 }
1522 }
1523 if (deleted)
1524 num_klpairs--;
1525 else
1526 idx++;
1527 }
1528 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1529
1530 break;
1531 case FT_COMMIT_BROADCAST_TXN:
1532 case FT_ABORT_BROADCAST_TXN:
1533 // Apply to all leafentries if txn is represented
1534 num_klpairs = bn->data_buffer.num_klpairs();
1535 for (uint32_t idx = 0; idx < num_klpairs; ) {
1536 void* curr_keyp = NULL;
1537 uint32_t curr_keylen = 0;
1538 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_keyp);
1539 assert_zero(r);
1540 int deleted = 0;
1541 if (le_has_xids(storeddata, msg.xids())) {
1542 // message application code needs a key in order to determine
1543 // how much work was done by this message. since this is a
1544 // broadcast message, we have to create a new message whose key
1545 // is the current le's key.
1546 DBT curr_keydbt;
1547 ft_msg curr_msg(
1548 toku_fill_dbt(&curr_keydbt, curr_keyp, curr_keylen),
1549 msg.vdbt(),
1550 msg.type(),
1551 msg.msn(),
1552 msg.xids());
1553 toku_ft_bn_apply_msg_once(
1554 bn,
1555 curr_msg,
1556 idx,
1557 curr_keylen,
1558 storeddata,
1559 gc_info,
1560 workdone,
1561 stats_to_update,
1562 logical_rows_delta);
1563 uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
1564 if (new_dmt_size != num_klpairs) {
1565 paranoid_invariant(new_dmt_size + 1 == num_klpairs);
1566 //Item was deleted.
1567 deleted = 1;
1568 }
1569 }
1570 if (deleted)
1571 num_klpairs--;
1572 else
1573 idx++;
1574 }
1575 paranoid_invariant(bn->data_buffer.num_klpairs() == num_klpairs);
1576
1577 break;
1578 case FT_UPDATE: {
1579 uint32_t idx;
1580 r = bn->data_buffer.find_zero<decltype(be), toku_msg_leafval_heaviside>(
1581 be,
1582 &storeddata,
1583 &key,
1584 &keylen,
1585 &idx
1586 );
1587 if (r==DB_NOTFOUND) {
1588 {
1589 //Point to msg's copy of the key so we don't worry about le being freed
1590 //TODO: 46 MAYBE Get rid of this when le_apply message memory is better handled
1591 key = msg.kdbt()->data;
1592 keylen = msg.kdbt()->size;
1593 }
1594 r = do_update(
1595 update_fun,
1596 cmp.get_descriptor(),
1597 bn,
1598 msg,
1599 idx,
1600 NULL,
1601 NULL,
1602 0,
1603 gc_info,
1604 workdone,
1605 stats_to_update,
1606 logical_rows_delta);
1607 } else if (r==0) {
1608 r = do_update(
1609 update_fun,
1610 cmp.get_descriptor(),
1611 bn,
1612 msg,
1613 idx,
1614 storeddata,
1615 key,
1616 keylen,
1617 gc_info,
1618 workdone,
1619 stats_to_update,
1620 logical_rows_delta);
1621 } // otherwise, a worse error, just return it
1622 break;
1623 }
1624 case FT_UPDATE_BROADCAST_ALL: {
1625 // apply to all leafentries.
1626 uint32_t idx = 0;
1627 uint32_t num_leafentries_before;
1628 // This is used to avoid having the logical row count changed on apply
1629 // of this message since it will return a negative number of the number
1630 // of leaf entries visited and cause the ft header value to go to 0;
1631 // This message will not change the number of rows, so just use the
1632 // bogus value.
1633 int64_t temp_logical_rows_delta = 0;
1634 while (idx < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1635 void* curr_key = nullptr;
1636 uint32_t curr_keylen = 0;
1637 r = bn->data_buffer.fetch_klpair(idx, &storeddata, &curr_keylen, &curr_key);
1638 assert_zero(r);
1639
1640 //TODO: 46 replace this with something better than cloning key
1641 // TODO: (Zardosht) This may be unnecessary now, due to how the key
1642 // is handled in the bndata. Investigate and determine
1643 char clone_mem[curr_keylen]; // only lasts one loop, alloca would overflow (end of function)
1644 memcpy((void*)clone_mem, curr_key, curr_keylen);
1645 curr_key = (void*)clone_mem;
1646
1647 // This is broken below. Have a compilation error checked
1648 // in as a reminder
1649 r = do_update(
1650 update_fun,
1651 cmp.get_descriptor(),
1652 bn,
1653 msg,
1654 idx,
1655 storeddata,
1656 curr_key,
1657 curr_keylen,
1658 gc_info,
1659 workdone,
1660 stats_to_update,
1661 &temp_logical_rows_delta);
1662 assert_zero(r);
1663
1664 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1665 // we didn't delete something, so increment the index.
1666 idx++;
1667 }
1668 }
1669 break;
1670 }
1671 case FT_NONE: break; // don't do anything
1672 }
1673
1674 return;
1675 }
1676
1677 static inline int
key_msn_cmp(const DBT * a,const DBT * b,const MSN amsn,const MSN bmsn,const toku::comparator & cmp)1678 key_msn_cmp(const DBT *a, const DBT *b, const MSN amsn, const MSN bmsn, const toku::comparator &cmp) {
1679 int r = cmp(a, b);
1680 if (r == 0) {
1681 if (amsn.msn > bmsn.msn) {
1682 r = +1;
1683 } else if (amsn.msn < bmsn.msn) {
1684 r = -1;
1685 } else {
1686 r = 0;
1687 }
1688 }
1689 return r;
1690 }
1691
toku_msg_buffer_key_msn_heaviside(const int32_t & offset,const struct toku_msg_buffer_key_msn_heaviside_extra & extra)1692 int toku_msg_buffer_key_msn_heaviside(const int32_t &offset, const struct toku_msg_buffer_key_msn_heaviside_extra &extra) {
1693 MSN query_msn;
1694 DBT query_key;
1695 extra.msg_buffer->get_message_key_msn(offset, &query_key, &query_msn);
1696 return key_msn_cmp(&query_key, extra.key, query_msn, extra.msn, extra.cmp);
1697 }
1698
toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra & extra,const int32_t & ao,const int32_t & bo)1699 int toku_msg_buffer_key_msn_cmp(const struct toku_msg_buffer_key_msn_cmp_extra &extra, const int32_t &ao, const int32_t &bo) {
1700 MSN amsn, bmsn;
1701 DBT akey, bkey;
1702 extra.msg_buffer->get_message_key_msn(ao, &akey, &amsn);
1703 extra.msg_buffer->get_message_key_msn(bo, &bkey, &bmsn);
1704 return key_msn_cmp(&akey, &bkey, amsn, bmsn, extra.cmp);
1705 }
1706
1707 // Effect: Enqueue the message represented by the parameters into the
1708 // bnc's buffer, and put it in either the fresh or stale message tree,
1709 // or the broadcast list.
bnc_insert_msg(NONLEAF_CHILDINFO bnc,const ft_msg & msg,bool is_fresh,const toku::comparator & cmp)1710 static void bnc_insert_msg(NONLEAF_CHILDINFO bnc, const ft_msg &msg, bool is_fresh, const toku::comparator &cmp) {
1711 int r = 0;
1712 int32_t offset;
1713 bnc->msg_buffer.enqueue(msg, is_fresh, &offset);
1714 enum ft_msg_type type = msg.type();
1715 if (ft_msg_type_applies_once(type)) {
1716 DBT key;
1717 toku_fill_dbt(&key, msg.kdbt()->data, msg.kdbt()->size);
1718 struct toku_msg_buffer_key_msn_heaviside_extra extra(cmp, &bnc->msg_buffer, &key, msg.msn());
1719 if (is_fresh) {
1720 r = bnc->fresh_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1721 assert_zero(r);
1722 } else {
1723 r = bnc->stale_message_tree.insert<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(offset, extra, nullptr);
1724 assert_zero(r);
1725 }
1726 } else {
1727 invariant(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type));
1728 const uint32_t idx = bnc->broadcast_list.size();
1729 r = bnc->broadcast_list.insert_at(offset, idx);
1730 assert_zero(r);
1731 }
1732 }
1733
1734 // This is only exported for tests.
toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc,const void * key,uint32_t keylen,const void * data,uint32_t datalen,enum ft_msg_type type,MSN msn,XIDS xids,bool is_fresh,const toku::comparator & cmp)1735 void toku_bnc_insert_msg(NONLEAF_CHILDINFO bnc, const void *key, uint32_t keylen, const void *data, uint32_t datalen, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const toku::comparator &cmp)
1736 {
1737 DBT k, v;
1738 ft_msg msg(toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), type, msn, xids);
1739 bnc_insert_msg(bnc, msg, is_fresh, cmp);
1740 }
1741
1742 // append a msg to a nonleaf node's child buffer
ft_append_msg_to_child_buffer(const toku::comparator & cmp,FTNODE node,int childnum,const ft_msg & msg,bool is_fresh)1743 static void ft_append_msg_to_child_buffer(const toku::comparator &cmp, FTNODE node,
1744 int childnum, const ft_msg &msg, bool is_fresh) {
1745 paranoid_invariant(BP_STATE(node,childnum) == PT_AVAIL);
1746 bnc_insert_msg(BNC(node, childnum), msg, is_fresh, cmp);
1747 node->set_dirty();
1748 }
1749
1750 // This is only exported for tests.
toku_ft_append_to_child_buffer(const toku::comparator & cmp,FTNODE node,int childnum,enum ft_msg_type type,MSN msn,XIDS xids,bool is_fresh,const DBT * key,const DBT * val)1751 void toku_ft_append_to_child_buffer(const toku::comparator &cmp, FTNODE node, int childnum, enum ft_msg_type type, MSN msn, XIDS xids, bool is_fresh, const DBT *key, const DBT *val) {
1752 ft_msg msg(key, val, type, msn, xids);
1753 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1754 }
1755
ft_nonleaf_msg_once_to_child(const toku::comparator & cmp,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1756 static void ft_nonleaf_msg_once_to_child(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1757 // Previously we had passive aggressive promotion, but that causes a lot of I/O a the checkpoint. So now we are just putting it in the buffer here.
1758 // Also we don't worry about the node getting overfull here. It's the caller's problem.
1759 {
1760 unsigned int childnum = (target_childnum >= 0
1761 ? target_childnum
1762 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
1763 ft_append_msg_to_child_buffer(cmp, node, childnum, msg, is_fresh);
1764 NONLEAF_CHILDINFO bnc = BNC(node, childnum);
1765 bnc->flow[0] += flow_deltas[0];
1766 bnc->flow[1] += flow_deltas[1];
1767 }
1768
1769 // TODO: Remove me, I'm boring.
ft_compare_pivot(const toku::comparator & cmp,const DBT * key,const DBT * pivot)1770 static int ft_compare_pivot(const toku::comparator &cmp, const DBT *key, const DBT *pivot) {
1771 return cmp(key, pivot);
1772 }
1773
1774 /* Find the leftmost child that may contain the key.
1775 * If the key exists it will be in the child whose number
1776 * is the return value of this function.
1777 */
toku_ftnode_which_child(FTNODE node,const DBT * k,const toku::comparator & cmp)1778 int toku_ftnode_which_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1779 // a funny case of no pivots
1780 if (node->n_children <= 1) return 0;
1781
1782 DBT pivot;
1783
1784 // check the last key to optimize seq insertions
1785 int n = node->n_children-1;
1786 int c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(n - 1, &pivot));
1787 if (c > 0) return n;
1788
1789 // binary search the pivots
1790 int lo = 0;
1791 int hi = n-1; // skip the last one, we checked it above
1792 int mi;
1793 while (lo < hi) {
1794 mi = (lo + hi) / 2;
1795 c = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1796 if (c > 0) {
1797 lo = mi+1;
1798 continue;
1799 }
1800 if (c < 0) {
1801 hi = mi;
1802 continue;
1803 }
1804 return mi;
1805 }
1806 return lo;
1807 }
1808
1809 // Used for HOT.
toku_ftnode_hot_next_child(FTNODE node,const DBT * k,const toku::comparator & cmp)1810 int toku_ftnode_hot_next_child(FTNODE node, const DBT *k, const toku::comparator &cmp) {
1811 DBT pivot;
1812 int low = 0;
1813 int hi = node->n_children - 1;
1814 int mi;
1815 while (low < hi) {
1816 mi = (low + hi) / 2;
1817 int r = ft_compare_pivot(cmp, k, node->pivotkeys.fill_pivot(mi, &pivot));
1818 if (r > 0) {
1819 low = mi + 1;
1820 } else if (r < 0) {
1821 hi = mi;
1822 } else {
1823 // if they were exactly equal, then we want the sub-tree under
1824 // the next pivot.
1825 return mi + 1;
1826 }
1827 }
1828 invariant(low == hi);
1829 return low;
1830 }
1831
toku_ftnode_save_ct_pair(CACHEKEY UU (key),void * value_data,PAIR p)1832 void toku_ftnode_save_ct_pair(CACHEKEY UU(key), void *value_data, PAIR p) {
1833 FTNODE CAST_FROM_VOIDP(node, value_data);
1834 node->ct_pair = p;
1835 }
1836
1837 static void
ft_nonleaf_msg_all(const toku::comparator & cmp,FTNODE node,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1838 ft_nonleaf_msg_all(const toku::comparator &cmp, FTNODE node, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1839 // Effect: Put the message into a nonleaf node. We put it into all children, possibly causing the children to become reactive.
1840 // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1841 // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1842 {
1843 for (int i = 0; i < node->n_children; i++) {
1844 ft_nonleaf_msg_once_to_child(cmp, node, i, msg, is_fresh, flow_deltas);
1845 }
1846 }
1847
1848 static void
ft_nonleaf_put_msg(const toku::comparator & cmp,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,size_t flow_deltas[])1849 ft_nonleaf_put_msg(const toku::comparator &cmp, FTNODE node, int target_childnum, const ft_msg &msg, bool is_fresh, size_t flow_deltas[])
1850 // Effect: Put the message into a nonleaf node. We may put it into a child, possibly causing the child to become reactive.
1851 // We don't do the splitting and merging. That's up to the caller after doing all the puts it wants to do.
1852 // The re_array[i] gets set to the reactivity of any modified child i. (And there may be several such children.)
1853 //
1854 {
1855
1856 //
1857 // see comments in toku_ft_leaf_apply_msg
1858 // to understand why we handle setting
1859 // node->max_msn_applied_to_node_on_disk here,
1860 // and don't do it in toku_ftnode_put_msg
1861 //
1862 MSN msg_msn = msg.msn();
1863 invariant(msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
1864 node->max_msn_applied_to_node_on_disk = msg_msn;
1865
1866 if (ft_msg_type_applies_once(msg.type())) {
1867 ft_nonleaf_msg_once_to_child(cmp, node, target_childnum, msg, is_fresh, flow_deltas);
1868 } else if (ft_msg_type_applies_all(msg.type())) {
1869 ft_nonleaf_msg_all(cmp, node, msg, is_fresh, flow_deltas);
1870 } else {
1871 paranoid_invariant(ft_msg_type_does_nothing(msg.type()));
1872 }
1873 }
1874
1875 // Garbage collect one leaf entry.
1876 static void
ft_basement_node_gc_once(BASEMENTNODE bn,uint32_t index,void * keyp,uint32_t keylen,LEAFENTRY leaf_entry,txn_gc_info * gc_info,STAT64INFO_S * delta)1877 ft_basement_node_gc_once(BASEMENTNODE bn,
1878 uint32_t index,
1879 void* keyp,
1880 uint32_t keylen,
1881 LEAFENTRY leaf_entry,
1882 txn_gc_info *gc_info,
1883 STAT64INFO_S * delta)
1884 {
1885 paranoid_invariant(leaf_entry);
1886
1887 // Don't run garbage collection on non-mvcc leaf entries.
1888 if (leaf_entry->type != LE_MVCC) {
1889 goto exit;
1890 }
1891
1892 // Don't run garbage collection if this leafentry decides it's not worth it.
1893 if (!toku_le_worth_running_garbage_collection(leaf_entry, gc_info)) {
1894 goto exit;
1895 }
1896
1897 LEAFENTRY new_leaf_entry;
1898 new_leaf_entry = NULL;
1899
1900 // The mempool doesn't free itself. When it allocates new memory,
1901 // this pointer will be set to the older memory that must now be
1902 // freed.
1903 void * maybe_free;
1904 maybe_free = NULL;
1905
1906 // These will represent the number of bytes and rows changed as
1907 // part of the garbage collection.
1908 int64_t numbytes_delta;
1909 int64_t numrows_delta;
1910 toku_le_garbage_collect(leaf_entry,
1911 &bn->data_buffer,
1912 index,
1913 keyp,
1914 keylen,
1915 gc_info,
1916 &new_leaf_entry,
1917 &numbytes_delta);
1918
1919 numrows_delta = 0;
1920 if (new_leaf_entry) {
1921 numrows_delta = 0;
1922 } else {
1923 numrows_delta = -1;
1924 }
1925
1926 // If we created a new mempool buffer we must free the
1927 // old/original buffer.
1928 if (maybe_free) {
1929 toku_free(maybe_free);
1930 }
1931
1932 // Update stats.
1933 bn->stat64_delta.numrows += numrows_delta;
1934 bn->stat64_delta.numbytes += numbytes_delta;
1935 delta->numrows += numrows_delta;
1936 delta->numbytes += numbytes_delta;
1937
1938 exit:
1939 return;
1940 }
1941
1942 // Garbage collect all leaf entries for a given basement node.
1943 static void
basement_node_gc_all_les(BASEMENTNODE bn,txn_gc_info * gc_info,STAT64INFO_S * delta)1944 basement_node_gc_all_les(BASEMENTNODE bn,
1945 txn_gc_info *gc_info,
1946 STAT64INFO_S * delta)
1947 {
1948 int r = 0;
1949 uint32_t index = 0;
1950 uint32_t num_leafentries_before;
1951 while (index < (num_leafentries_before = bn->data_buffer.num_klpairs())) {
1952 void* keyp = NULL;
1953 uint32_t keylen = 0;
1954 LEAFENTRY leaf_entry;
1955 r = bn->data_buffer.fetch_klpair(index, &leaf_entry, &keylen, &keyp);
1956 assert_zero(r);
1957 ft_basement_node_gc_once(
1958 bn,
1959 index,
1960 keyp,
1961 keylen,
1962 leaf_entry,
1963 gc_info,
1964 delta
1965 );
1966 // Check if the leaf entry was deleted or not.
1967 if (num_leafentries_before == bn->data_buffer.num_klpairs()) {
1968 ++index;
1969 }
1970 }
1971 }
1972
1973 // Garbage collect all leaf entires in all basement nodes.
1974 static void
ft_leaf_gc_all_les(FT ft,FTNODE node,txn_gc_info * gc_info)1975 ft_leaf_gc_all_les(FT ft, FTNODE node, txn_gc_info *gc_info)
1976 {
1977 toku_ftnode_assert_fully_in_memory(node);
1978 paranoid_invariant_zero(node->height);
1979 // Loop through each leaf entry, garbage collecting as we go.
1980 for (int i = 0; i < node->n_children; ++i) {
1981 // Perform the garbage collection.
1982 BASEMENTNODE bn = BLB(node, i);
1983 STAT64INFO_S delta;
1984 delta.numrows = 0;
1985 delta.numbytes = 0;
1986 basement_node_gc_all_les(bn, gc_info, &delta);
1987 toku_ft_update_stats(&ft->in_memory_stats, delta);
1988 }
1989 }
1990
toku_ftnode_leaf_run_gc(FT ft,FTNODE node)1991 void toku_ftnode_leaf_run_gc(FT ft, FTNODE node) {
1992 TOKULOGGER logger = toku_cachefile_logger(ft->cf);
1993 if (logger) {
1994 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(logger);
1995 txn_manager_state txn_state_for_gc(txn_manager);
1996 txn_state_for_gc.init();
1997 TXNID oldest_referenced_xid_for_simple_gc = toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager);
1998
1999 // Perform full garbage collection.
2000 //
2001 // - txn_state_for_gc
2002 // a fresh snapshot of the transaction system.
2003 // - oldest_referenced_xid_for_simple_gc
2004 // the oldest xid in any live list as of right now - suitible for simple gc
2005 // - node->oldest_referenced_xid_known
2006 // the last known oldest referenced xid for this node and any unapplied messages.
2007 // it is a lower bound on the actual oldest referenced xid - but becasue there
2008 // may be abort messages above us, we need to be careful to only use this value
2009 // for implicit promotion (as opposed to the oldest referenced xid for simple gc)
2010 //
2011 // The node has its own oldest referenced xid because it must be careful not to implicitly promote
2012 // provisional entries for transactions that are no longer live, but may have abort messages
2013 // somewhere above us in the tree.
2014 txn_gc_info gc_info(&txn_state_for_gc,
2015 oldest_referenced_xid_for_simple_gc,
2016 node->oldest_referenced_xid_known,
2017 true);
2018 ft_leaf_gc_all_les(ft, node, &gc_info);
2019 }
2020 }
2021
toku_ftnode_put_msg(const toku::comparator & cmp,ft_update_func update_fun,FTNODE node,int target_childnum,const ft_msg & msg,bool is_fresh,txn_gc_info * gc_info,size_t flow_deltas[],STAT64INFO stats_to_update,int64_t * logical_rows_delta)2022 void toku_ftnode_put_msg(
2023 const toku::comparator &cmp,
2024 ft_update_func update_fun,
2025 FTNODE node,
2026 int target_childnum,
2027 const ft_msg &msg,
2028 bool is_fresh,
2029 txn_gc_info* gc_info,
2030 size_t flow_deltas[],
2031 STAT64INFO stats_to_update,
2032 int64_t* logical_rows_delta) {
2033 // Effect: Push message into the subtree rooted at NODE.
2034 // If NODE is a leaf, then
2035 // put message into leaf, applying it to the leafentries
2036 // If NODE is a nonleaf, then push the message into the message buffer(s) of the relevent child(ren).
2037 // The node may become overfull. That's not our problem.
2038 toku_ftnode_assert_fully_in_memory(node);
2039 //
2040 // see comments in toku_ft_leaf_apply_msg
2041 // to understand why we don't handle setting
2042 // node->max_msn_applied_to_node_on_disk here,
2043 // and instead defer to these functions
2044 //
2045 if (node->height==0) {
2046 toku_ft_leaf_apply_msg(
2047 cmp,
2048 update_fun,
2049 node,
2050 target_childnum, msg,
2051 gc_info,
2052 nullptr,
2053 stats_to_update,
2054 logical_rows_delta);
2055 } else {
2056 ft_nonleaf_put_msg(
2057 cmp,
2058 node,
2059 target_childnum,
2060 msg,
2061 is_fresh,
2062 flow_deltas);
2063 }
2064 }
2065
2066 // Effect: applies the message to the leaf if the appropriate basement node is
2067 // in memory. This function is called during message injection and/or
2068 // flushing, so the entire node MUST be in memory.
toku_ft_leaf_apply_msg(const toku::comparator & cmp,ft_update_func update_fun,FTNODE node,int target_childnum,const ft_msg & msg,txn_gc_info * gc_info,uint64_t * workdone,STAT64INFO stats_to_update,int64_t * logical_rows_delta)2069 void toku_ft_leaf_apply_msg(
2070 const toku::comparator& cmp,
2071 ft_update_func update_fun,
2072 FTNODE node,
2073 int target_childnum, // which child to inject to, or -1 if unknown
2074 const ft_msg& msg,
2075 txn_gc_info* gc_info,
2076 uint64_t* workdone,
2077 STAT64INFO stats_to_update,
2078 int64_t* logical_rows_delta) {
2079
2080 VERIFY_NODE(t, node);
2081 toku_ftnode_assert_fully_in_memory(node);
2082
2083 //
2084 // Because toku_ft_leaf_apply_msg is called with the intent of permanently
2085 // applying a message to a leaf node (meaning the message is permanently applied
2086 // and will be purged from the system after this call, as opposed to
2087 // toku_apply_ancestors_messages_to_node, which applies a message
2088 // for a query, but the message may still reside in the system and
2089 // be reapplied later), we mark the node as dirty and
2090 // take the opportunity to update node->max_msn_applied_to_node_on_disk.
2091 //
2092 node->set_dirty();
2093
2094 //
2095 // we cannot blindly update node->max_msn_applied_to_node_on_disk,
2096 // we must check to see if the msn is greater that the one already stored,
2097 // because the message may have already been applied earlier (via
2098 // toku_apply_ancestors_messages_to_node) to answer a query
2099 //
2100 // This is why we handle node->max_msn_applied_to_node_on_disk both here
2101 // and in ft_nonleaf_put_msg, as opposed to in one location, toku_ftnode_put_msg.
2102 //
2103 MSN msg_msn = msg.msn();
2104 if (msg_msn.msn > node->max_msn_applied_to_node_on_disk.msn) {
2105 node->max_msn_applied_to_node_on_disk = msg_msn;
2106 }
2107
2108 if (ft_msg_type_applies_once(msg.type())) {
2109 unsigned int childnum = (target_childnum >= 0
2110 ? target_childnum
2111 : toku_ftnode_which_child(node, msg.kdbt(), cmp));
2112 BASEMENTNODE bn = BLB(node, childnum);
2113 if (msg.msn().msn > bn->max_msn_applied.msn) {
2114 bn->max_msn_applied = msg.msn();
2115 toku_ft_bn_apply_msg(
2116 cmp,
2117 update_fun,
2118 bn,
2119 msg,
2120 gc_info,
2121 workdone,
2122 stats_to_update,
2123 logical_rows_delta);
2124 } else {
2125 toku_ft_status_note_msn_discard();
2126 }
2127 } else if (ft_msg_type_applies_all(msg.type())) {
2128 for (int childnum=0; childnum<node->n_children; childnum++) {
2129 if (msg.msn().msn > BLB(node, childnum)->max_msn_applied.msn) {
2130 BLB(node, childnum)->max_msn_applied = msg.msn();
2131 toku_ft_bn_apply_msg(
2132 cmp,
2133 update_fun,
2134 BLB(node, childnum),
2135 msg,
2136 gc_info,
2137 workdone,
2138 stats_to_update,
2139 logical_rows_delta);
2140 } else {
2141 toku_ft_status_note_msn_discard();
2142 }
2143 }
2144 } else if (!ft_msg_type_does_nothing(msg.type())) {
2145 invariant(ft_msg_type_does_nothing(msg.type()));
2146 }
2147 VERIFY_NODE(t, node);
2148 }
2149
2150