1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /*
40 
41 Managing the tree shape:  How insertion, deletion, and querying work
42 
43 When we insert a message into the FT_HANDLE, here's what happens.
44 
45 to insert a message at the root
46 
47     - find the root node
48     - capture the next msn of the root node and assign it to the message
49     - split the root if it needs to be split
50     - insert the message into the root buffer
51     - if the root is too full, then toku_ft_flush_some_child() of the root on a flusher thread
52 
53 flusher functions use an advice struct with provides some functions to
54 call that tell it what to do based on the context of the flush. see ft-flusher.h
55 
56 to flush some child, given a parent and some advice
57     - pick the child using advice->pick_child()
58     - remove that childs buffer from the parent
59     - flush the buffer to the child
60     - if the child has stable reactivity and
61       advice->should_recursively_flush() is true, then
62       toku_ft_flush_some_child() of the child
63     - otherwise split the child if it needs to be split
64     - otherwise maybe merge the child if it needs to be merged
65 
66 flusher threads:
67 
68     flusher threads are created on demand as the result of internal nodes
69     becoming gorged by insertions. this allows flushing to be done somewhere
70     other than the client thread. these work items are enqueued onto
71     the cachetable kibbutz and are done in a first in first out order.
72 
73 cleaner threads:
74 
75     the cleaner thread wakes up every so often (say, 1 second) and chooses
76     a small number (say, 5) of nodes as candidates for a flush. the one
77     with the largest cache pressure is chosen to be flushed. cache pressure
78     is a function of the size of the node in the cachetable plus the work done.
79     the cleaner thread need not actually do a flush when awoken, so only
80     nodes that have sufficient cache pressure are flushed.
81 
82 checkpointing:
83 
84     the checkpoint thread wakes up every minute to checkpoint dirty nodes
85     to disk. at the time of this writing, nodes during checkpoint are
86     locked and cannot be queried or flushed to. a design in which nodes
87     are copied before checkpoint is being considered as a way to reduce
88     the performance variability caused by a checkpoint locking too
89     many nodes and preventing other threads from traversing down the tree,
90     for a query or otherwise.
91 
92 To shrink a file: Let X be the size of the reachable data.
93     We define an acceptable bloat constant of C.  For example we set C=2 if we are willing to allow the file to be as much as 2X in size.
94     The goal is to find the smallest amount of stuff we can move to get the file down to size CX.
95     That seems like a difficult problem, so we use the following heuristics:
96        If we can relocate the last block to an lower location, then do so immediately.        (The file gets smaller right away, so even though the new location
97          may even not be in the first CX bytes, we are making the file smaller.)
98        Otherwise all of the earlier blocks are smaller than the last block (of size L).         So find the smallest region that has L free bytes in it.
99          (This can be computed in one pass)
100          Move the first allocated block in that region to some location not in the interior of the region.
101                (Outside of the region is OK, and reallocating the block at the edge of the region is OK).
102             This has the effect of creating a smaller region with at least L free bytes in it.
103          Go back to the top (because by now some other block may have been allocated or freed).
104     Claim: if there are no other allocations going on concurrently, then this algorithm will shrink the file reasonably efficiently.  By this I mean that
105        each block of shrinkage does the smallest amount of work possible.  That doesn't mean that the work overall is minimized.
106     Note: If there are other allocations and deallocations going on concurrently, we might never get enough space to move the last block.  But it takes a lot
107       of allocations and deallocations to make that happen, and it's probably reasonable for the file not to shrink in this case.
108 
109 To split or merge a child of a node:
110 Split_or_merge (node, childnum) {
111   If the child needs to be split (it's a leaf with too much stuff or a nonleaf with too much fanout)
112     fetch the node and the child into main memory.
113     split the child, producing two nodes A and B, and also a pivot.   Don't worry if the resulting child is still too big or too small.         Fix it on the next pass.
114     fixup node to point at the two new children.  Don't worry about the node getting too much fanout.
115     return;
116   If the child needs to be merged (it's a leaf with too little stuff (less than 1/4 full) or a nonleaf with too little fanout (less than 1/4)
117     fetch node, the child  and a sibling of the child into main memory.
118     move all messages from the node to the two children (so that the message buffers are empty)
119     If the two siblings together fit into one node then
120       merge the two siblings.
121       fixup the node to point at one child
122     Otherwise
123       load balance the content of the two nodes
124     Don't worry about the resulting children having too many messages or otherwise being too big or too small.        Fix it on the next pass.
125   }
126 }
127 
128 Here's how querying works:
129 
130 lookups:
131     - As of Dr. No, we don't do any tree shaping on lookup.
132     - We don't promote eagerly or use aggressive promotion or passive-aggressive
133     promotion.        We just push messages down according to the traditional FT_HANDLE
134     algorithm on insertions.
135     - when a node is brought into memory, we apply ancestor messages above it.
136 
137 basement nodes, bulk fetch,  and partial fetch:
138     - leaf nodes are comprised of N basement nodes, each of nominal size. when
139     a query hits a leaf node. it may require one or more basement nodes to be in memory.
140     - for point queries, we do not read the entire node into memory. instead,
141       we only read in the required basement node
142     - for range queries, cursors may return cursor continue in their callback
143       to take a the shortcut path until the end of the basement node.
144     - for range queries, cursors may prelock a range of keys (with or without a txn).
145       the fractal tree will prefetch nodes aggressively until the end of the range.
146     - without a prelocked range, range queries behave like successive point queries.
147 
148 */
149 
150 #include <my_global.h>
151 #include "ft/cachetable/checkpoint.h"
152 #include "ft/cursor.h"
153 #include "ft/ft-cachetable-wrappers.h"
154 #include "ft/ft-flusher.h"
155 #include "ft/ft-internal.h"
156 #include "ft/ft.h"
157 #include "ft/leafentry.h"
158 #include "ft/logger/log-internal.h"
159 #include "ft/msg.h"
160 #include "ft/node.h"
161 #include "ft/serialize/block_table.h"
162 #include "ft/serialize/ft-serialize.h"
163 #include "ft/serialize/ft_layout_version.h"
164 #include "ft/serialize/ft_node-serialize.h"
165 #include "ft/serialize/sub_block.h"
166 #include "ft/txn/txn_manager.h"
167 #include "ft/txn/xids.h"
168 #include "ft/ule.h"
169 #include "src/ydb-internal.h"
170 
171 #include <toku_race_tools.h>
172 
173 #include <portability/toku_atomic.h>
174 
175 #include <util/context.h>
176 #include <util/mempool.h>
177 #include <util/status.h>
178 #include <util/rwlock.h>
179 #include <util/sort.h>
180 #include <util/scoped_malloc.h>
181 
182 #include <stdint.h>
183 
184 #include <memory>
185 /* Status is intended for display to humans to help understand system behavior.
186  * It does not need to be perfectly thread-safe.
187  */
188 
189 static toku_mutex_t ft_open_close_lock;
190 static toku_instr_key *ft_open_close_lock_mutex_key;
191 // FIXME: the instrumentation keys below are defined here even though they
192 // belong to other modules, because they are registered here. If desired, they
193 // can be moved to their proper modules and registration done there in a
194 // one-time init function
195 // locktree
196 toku_instr_key *treenode_mutex_key;
197 toku_instr_key *manager_mutex_key;
198 toku_instr_key *manager_escalation_mutex_key;
199 toku_instr_key *manager_escalator_mutex_key;
200 // src
201 toku_instr_key *db_txn_struct_i_txn_mutex_key;
202 toku_instr_key *indexer_i_indexer_lock_mutex_key;
203 toku_instr_key *indexer_i_indexer_estimate_lock_mutex_key;
204 toku_instr_key *result_i_open_dbs_rwlock_key;
205 // locktree
206 toku_instr_key *lock_request_m_wait_cond_key;
207 toku_instr_key *manager_m_escalator_done_key;
208 toku_instr_key *locktree_request_info_mutex_key;
209 toku_instr_key *locktree_request_info_retry_mutex_key;
210 toku_instr_key *locktree_request_info_retry_cv_key;
211 
212 // this is a sample probe for custom instrumentation
213 static toku_instr_key *fti_probe_1_key;
214 
215 // This is a sample probe for custom instrumentation
216 toku_instr_probe *toku_instr_probe_1;
217 
toku_ft_get_status(FT_STATUS s)218 void toku_ft_get_status(FT_STATUS s) {
219     ft_status.init();
220     *s = ft_status;
221 
222     // Calculate compression ratios for leaf and nonleaf nodes
223     const double compressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES) +
224                                          FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT);
225     const double uncompressed_leaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES) +
226                                            FT_STATUS_VAL(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT);
227     const double compressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES) +
228                                             FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT);
229     const double uncompressed_nonleaf_bytes = FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES) +
230                                               FT_STATUS_VAL(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT);
231 
232     if (compressed_leaf_bytes > 0) {
233         s->status[FT_STATUS_S::FT_DISK_FLUSH_LEAF_COMPRESSION_RATIO].value.dnum
234             = uncompressed_leaf_bytes / compressed_leaf_bytes;
235     }
236     if (compressed_nonleaf_bytes > 0) {
237         s->status[FT_STATUS_S::FT_DISK_FLUSH_NONLEAF_COMPRESSION_RATIO].value.dnum
238             = uncompressed_nonleaf_bytes / compressed_nonleaf_bytes;
239     }
240     if (compressed_leaf_bytes > 0 || compressed_nonleaf_bytes > 0) {
241         s->status[FT_STATUS_S::FT_DISK_FLUSH_OVERALL_COMPRESSION_RATIO].value.dnum
242             = (uncompressed_leaf_bytes + uncompressed_nonleaf_bytes) /
243               (compressed_leaf_bytes + compressed_nonleaf_bytes);
244     }
245 }
246 
toku_note_deserialized_basement_node(bool fixed_key_size)247 void toku_note_deserialized_basement_node(bool fixed_key_size) {
248     if (fixed_key_size) {
249         FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_FIXED_KEYSIZE, 1);
250     } else {
251         FT_STATUS_INC(FT_BASEMENT_DESERIALIZE_VARIABLE_KEYSIZE, 1);
252     }
253 }
254 
ft_verify_flags(FT UU (ft),FTNODE UU (node))255 static void ft_verify_flags(FT UU(ft), FTNODE UU(node)) {
256     paranoid_invariant(ft->h->flags == node->flags);
257 }
258 
259 int toku_ft_debug_mode = 0;
260 
compute_child_fullhash(CACHEFILE cf,FTNODE node,int childnum)261 uint32_t compute_child_fullhash (CACHEFILE cf, FTNODE node, int childnum) {
262     paranoid_invariant(node->height>0);
263     paranoid_invariant(childnum<node->n_children);
264     return toku_cachetable_hash(cf, BP_BLOCKNUM(node, childnum));
265 }
266 
267 //
268 // pivot bounds
269 // TODO: move me to ft/node.cc?
270 //
271 
pivot_bounds(const DBT & lbe_dbt,const DBT & ubi_dbt)272 pivot_bounds::pivot_bounds(const DBT &lbe_dbt, const DBT &ubi_dbt) :
273     _lower_bound_exclusive(lbe_dbt), _upper_bound_inclusive(ubi_dbt) {
274 }
275 
infinite_bounds()276 pivot_bounds pivot_bounds::infinite_bounds() {
277     DBT dbt;
278     toku_init_dbt(&dbt);
279 
280     // infinity is represented by an empty dbt
281     invariant(toku_dbt_is_empty(&dbt));
282     return pivot_bounds(dbt, dbt);
283 }
284 
lbe() const285 const DBT *pivot_bounds::lbe() const {
286     return &_lower_bound_exclusive;
287 }
288 
ubi() const289 const DBT *pivot_bounds::ubi() const {
290     return &_upper_bound_inclusive;
291 }
292 
_prepivotkey(FTNODE node,int childnum,const DBT & lbe_dbt) const293 DBT pivot_bounds::_prepivotkey(FTNODE node, int childnum, const DBT &lbe_dbt) const {
294     if (childnum == 0) {
295         return lbe_dbt;
296     } else {
297         return node->pivotkeys.get_pivot(childnum - 1);
298     }
299 }
300 
_postpivotkey(FTNODE node,int childnum,const DBT & ubi_dbt) const301 DBT pivot_bounds::_postpivotkey(FTNODE node, int childnum, const DBT &ubi_dbt) const {
302     if (childnum + 1 == node->n_children) {
303         return ubi_dbt;
304     } else {
305         return node->pivotkeys.get_pivot(childnum);
306     }
307 }
308 
next_bounds(FTNODE node,int childnum) const309 pivot_bounds pivot_bounds::next_bounds(FTNODE node, int childnum) const {
310     return pivot_bounds(_prepivotkey(node, childnum, _lower_bound_exclusive),
311                         _postpivotkey(node, childnum, _upper_bound_inclusive));
312 }
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 
get_avail_internal_node_partition_size(FTNODE node,int i)316 static long get_avail_internal_node_partition_size(FTNODE node, int i) {
317     paranoid_invariant(node->height > 0);
318     return toku_bnc_memory_size(BNC(node, i));
319 }
320 
ftnode_cachepressure_size(FTNODE node)321 static long ftnode_cachepressure_size(FTNODE node) {
322     long retval = 0;
323     bool totally_empty = true;
324     if (node->height == 0) {
325         goto exit;
326     }
327     else {
328         for (int i = 0; i < node->n_children; i++) {
329             if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
330                 continue;
331             }
332             else if (BP_STATE(node,i) == PT_COMPRESSED) {
333                 SUB_BLOCK sb = BSB(node, i);
334                 totally_empty = false;
335                 retval += sb->compressed_size;
336             }
337             else if (BP_STATE(node,i) == PT_AVAIL) {
338                 totally_empty = totally_empty && (toku_bnc_n_entries(BNC(node, i)) == 0);
339                 retval += get_avail_internal_node_partition_size(node, i);
340                 retval += BP_WORKDONE(node, i);
341             }
342             else {
343                 abort();
344             }
345         }
346     }
347 exit:
348     if (totally_empty) {
349         return 0;
350     }
351     return retval;
352 }
353 
354 static long
ftnode_memory_size(FTNODE node)355 ftnode_memory_size (FTNODE node)
356 // Effect: Estimate how much main memory a node requires.
357 {
358     long retval = 0;
359     int n_children = node->n_children;
360     retval += sizeof(*node);
361     retval += (n_children)*(sizeof(node->bp[0]));
362     retval += node->pivotkeys.total_size();
363 
364     // now calculate the sizes of the partitions
365     for (int i = 0; i < n_children; i++) {
366         if (BP_STATE(node,i) == PT_INVALID || BP_STATE(node,i) == PT_ON_DISK) {
367             continue;
368         }
369         else if (BP_STATE(node,i) == PT_COMPRESSED) {
370             SUB_BLOCK sb = BSB(node, i);
371             retval += sizeof(*sb);
372             retval += sb->compressed_size;
373         }
374         else if (BP_STATE(node,i) == PT_AVAIL) {
375             if (node->height > 0) {
376                 retval += get_avail_internal_node_partition_size(node, i);
377             }
378             else {
379                 BASEMENTNODE bn = BLB(node, i);
380                 retval += sizeof(*bn);
381                 retval += BLB_DATA(node, i)->get_memory_size();
382             }
383         }
384         else {
385             abort();
386         }
387     }
388     return retval;
389 }
390 
make_ftnode_pair_attr(FTNODE node)391 PAIR_ATTR make_ftnode_pair_attr(FTNODE node) {
392     long size = ftnode_memory_size(node);
393     long cachepressure_size = ftnode_cachepressure_size(node);
394     PAIR_ATTR result={
395         .size = size,
396         .nonleaf_size = (node->height > 0) ? size : 0,
397         .leaf_size = (node->height > 0) ? 0 : size,
398         .rollback_size = 0,
399         .cache_pressure_size = cachepressure_size,
400         .is_valid = true
401     };
402     return result;
403 }
404 
make_invalid_pair_attr(void)405 PAIR_ATTR make_invalid_pair_attr(void) {
406     PAIR_ATTR result={
407         .size = 0,
408         .nonleaf_size = 0,
409         .leaf_size = 0,
410         .rollback_size = 0,
411         .cache_pressure_size = 0,
412         .is_valid = false
413     };
414     return result;
415 }
416 
417 
418 // assign unique dictionary id
419 static uint64_t dict_id_serial = 1;
420 static DICTIONARY_ID
next_dict_id(void)421 next_dict_id(void) {
422     uint64_t i = toku_sync_fetch_and_add(&dict_id_serial, 1);
423     assert(i);        // guarantee unique dictionary id by asserting 64-bit counter never wraps
424     DICTIONARY_ID d = {.dictid = i};
425     return d;
426 }
427 
428 // TODO: This isn't so pretty
_create_internal(FT ft_)429 void ftnode_fetch_extra::_create_internal(FT ft_) {
430     ft = ft_;
431     type = ftnode_fetch_none;
432     search = nullptr;
433 
434     toku_init_dbt(&range_lock_left_key);
435     toku_init_dbt(&range_lock_right_key);
436     left_is_neg_infty = false;
437     right_is_pos_infty = false;
438 
439     // -1 means 'unknown', which is the correct default state
440     child_to_read = -1;
441     disable_prefetching = false;
442     read_all_partitions = false;
443 
444     bytes_read = 0;
445     io_time = 0;
446     deserialize_time = 0;
447     decompress_time = 0;
448 }
449 
create_for_full_read(FT ft_)450 void ftnode_fetch_extra::create_for_full_read(FT ft_) {
451     _create_internal(ft_);
452 
453     type = ftnode_fetch_all;
454 }
455 
create_for_keymatch(FT ft_,const DBT * left,const DBT * right,bool disable_prefetching_,bool read_all_partitions_)456 void ftnode_fetch_extra::create_for_keymatch(FT ft_, const DBT *left, const DBT *right,
457                                              bool disable_prefetching_, bool read_all_partitions_) {
458     _create_internal(ft_);
459     invariant(ft->h->type == FT_CURRENT);
460 
461     type = ftnode_fetch_keymatch;
462     if (left != nullptr) {
463         toku_copyref_dbt(&range_lock_left_key, *left);
464     }
465     if (right != nullptr) {
466         toku_copyref_dbt(&range_lock_right_key, *right);
467     }
468     left_is_neg_infty = left == nullptr;
469     right_is_pos_infty = right == nullptr;
470     disable_prefetching = disable_prefetching_;
471     read_all_partitions = read_all_partitions_;
472 }
473 
create_for_subset_read(FT ft_,ft_search * search_,const DBT * left,const DBT * right,bool left_is_neg_infty_,bool right_is_pos_infty_,bool disable_prefetching_,bool read_all_partitions_)474 void ftnode_fetch_extra::create_for_subset_read(FT ft_, ft_search *search_,
475                                                 const DBT *left, const DBT *right,
476                                                 bool left_is_neg_infty_, bool right_is_pos_infty_,
477                                                 bool disable_prefetching_, bool read_all_partitions_) {
478     _create_internal(ft_);
479     invariant(ft->h->type == FT_CURRENT);
480 
481     type = ftnode_fetch_subset;
482     search = search_;
483     if (left != nullptr) {
484         toku_copyref_dbt(&range_lock_left_key, *left);
485     }
486     if (right != nullptr) {
487         toku_copyref_dbt(&range_lock_right_key, *right);
488     }
489     left_is_neg_infty = left_is_neg_infty_;
490     right_is_pos_infty = right_is_pos_infty_;
491     disable_prefetching = disable_prefetching_;
492     read_all_partitions = read_all_partitions_;
493 }
494 
create_for_min_read(FT ft_)495 void ftnode_fetch_extra::create_for_min_read(FT ft_) {
496     _create_internal(ft_);
497     invariant(ft->h->type == FT_CURRENT);
498 
499     type = ftnode_fetch_none;
500 }
501 
create_for_prefetch(FT ft_,struct ft_cursor * cursor)502 void ftnode_fetch_extra::create_for_prefetch(FT ft_, struct ft_cursor *cursor) {
503     _create_internal(ft_);
504     invariant(ft->h->type == FT_CURRENT);
505 
506     type = ftnode_fetch_prefetch;
507     const DBT *left = &cursor->range_lock_left_key;
508     if (left->data) {
509         toku_clone_dbt(&range_lock_left_key, *left);
510     }
511     const DBT *right = &cursor->range_lock_right_key;
512     if (right->data) {
513         toku_clone_dbt(&range_lock_right_key, *right);
514     }
515     left_is_neg_infty = cursor->left_is_neg_infty;
516     right_is_pos_infty = cursor->right_is_pos_infty;
517     disable_prefetching = cursor->disable_prefetching;
518 }
519 
destroy(void)520 void ftnode_fetch_extra::destroy(void) {
521     toku_destroy_dbt(&range_lock_left_key);
522     toku_destroy_dbt(&range_lock_right_key);
523 }
524 
525 // Requires: child_to_read to have been set
wants_child_available(int childnum) const526 bool ftnode_fetch_extra::wants_child_available(int childnum) const {
527     return type == ftnode_fetch_all ||
528         (child_to_read == childnum &&
529          (type == ftnode_fetch_subset || type == ftnode_fetch_keymatch));
530 }
531 
leftmost_child_wanted(FTNODE node) const532 int ftnode_fetch_extra::leftmost_child_wanted(FTNODE node) const {
533     paranoid_invariant(type == ftnode_fetch_subset ||
534                        type == ftnode_fetch_prefetch ||
535                        type == ftnode_fetch_keymatch);
536     if (left_is_neg_infty) {
537         return 0;
538     } else if (range_lock_left_key.data == nullptr) {
539         return -1;
540     } else {
541         return toku_ftnode_which_child(node, &range_lock_left_key, ft->cmp);
542     }
543 }
544 
rightmost_child_wanted(FTNODE node) const545 int ftnode_fetch_extra::rightmost_child_wanted(FTNODE node) const {
546     paranoid_invariant(type == ftnode_fetch_subset ||
547                        type == ftnode_fetch_prefetch ||
548                        type == ftnode_fetch_keymatch);
549     if (right_is_pos_infty) {
550         return node->n_children - 1;
551     } else if (range_lock_right_key.data == nullptr) {
552         return -1;
553     } else {
554         return toku_ftnode_which_child(node, &range_lock_right_key, ft->cmp);
555     }
556 }
557 
558 static int
ft_cursor_rightmost_child_wanted(FT_CURSOR cursor,FT_HANDLE ft_handle,FTNODE node)559 ft_cursor_rightmost_child_wanted(FT_CURSOR cursor, FT_HANDLE ft_handle, FTNODE node)
560 {
561     if (cursor->right_is_pos_infty) {
562         return node->n_children - 1;
563     } else if (cursor->range_lock_right_key.data == nullptr) {
564         return -1;
565     } else {
566         return toku_ftnode_which_child(node, &cursor->range_lock_right_key, ft_handle->ft->cmp);
567     }
568 }
569 
570 STAT64INFO_S
toku_get_and_clear_basement_stats(FTNODE leafnode)571 toku_get_and_clear_basement_stats(FTNODE leafnode) {
572     invariant(leafnode->height == 0);
573     STAT64INFO_S deltas = ZEROSTATS;
574     for (int i = 0; i < leafnode->n_children; i++) {
575         BASEMENTNODE bn = BLB(leafnode, i);
576         invariant(BP_STATE(leafnode,i) == PT_AVAIL);
577         deltas.numrows  += bn->stat64_delta.numrows;
578         deltas.numbytes += bn->stat64_delta.numbytes;
579         bn->stat64_delta = ZEROSTATS;
580     }
581     return deltas;
582 }
583 
toku_ft_status_update_flush_reason(FTNODE node,uint64_t uncompressed_bytes_flushed,uint64_t bytes_written,tokutime_t write_time,bool for_checkpoint)584 void toku_ft_status_update_flush_reason(FTNODE node,
585         uint64_t uncompressed_bytes_flushed, uint64_t bytes_written,
586         tokutime_t write_time, bool for_checkpoint) {
587     if (node->height == 0) {
588         if (for_checkpoint) {
589             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_FOR_CHECKPOINT, 1);
590             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES_FOR_CHECKPOINT, bytes_written);
591             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
592             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
593         }
594         else {
595             FT_STATUS_INC(FT_DISK_FLUSH_LEAF, 1);
596             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_BYTES, bytes_written);
597             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
598             FT_STATUS_INC(FT_DISK_FLUSH_LEAF_TOKUTIME, write_time);
599         }
600     }
601     else {
602         if (for_checkpoint) {
603             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_FOR_CHECKPOINT, 1);
604             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES_FOR_CHECKPOINT, bytes_written);
605             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES_FOR_CHECKPOINT, uncompressed_bytes_flushed);
606             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME_FOR_CHECKPOINT, write_time);
607         }
608         else {
609             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF, 1);
610             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_BYTES, bytes_written);
611             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_UNCOMPRESSED_BYTES, uncompressed_bytes_flushed);
612             FT_STATUS_INC(FT_DISK_FLUSH_NONLEAF_TOKUTIME, write_time);
613         }
614     }
615 }
616 
toku_ftnode_checkpoint_complete_callback(void * value_data)617 void toku_ftnode_checkpoint_complete_callback(void *value_data) {
618     FTNODE node = static_cast<FTNODE>(value_data);
619     if (node->height > 0) {
620         for (int i = 0; i < node->n_children; ++i) {
621             if (BP_STATE(node, i) == PT_AVAIL) {
622                 NONLEAF_CHILDINFO bnc = BNC(node, i);
623                 bnc->flow[1] = bnc->flow[0];
624                 bnc->flow[0] = 0;
625             }
626         }
627     }
628 }
629 
toku_ftnode_clone_callback(void * value_data,void ** cloned_value_data,long * clone_size,PAIR_ATTR * new_attr,bool for_checkpoint,void * write_extraargs)630 void toku_ftnode_clone_callback(void *value_data,
631                                 void **cloned_value_data,
632                                 long *clone_size,
633                                 PAIR_ATTR *new_attr,
634                                 bool for_checkpoint,
635                                 void *write_extraargs) {
636     FTNODE node = static_cast<FTNODE>(value_data);
637     toku_ftnode_assert_fully_in_memory(node);
638     FT ft = static_cast<FT>(write_extraargs);
639     FTNODE XCALLOC(cloned_node);
640     if (node->height == 0) {
641         // set header stats, must be done before rebalancing
642         toku_ftnode_update_disk_stats(node, ft, for_checkpoint);
643         // rebalance the leaf node
644         toku_ftnode_leaf_rebalance(node, ft->h->basementnodesize);
645     }
646 
647     cloned_node->oldest_referenced_xid_known =
648         node->oldest_referenced_xid_known;
649     cloned_node->max_msn_applied_to_node_on_disk =
650         node->max_msn_applied_to_node_on_disk;
651     cloned_node->flags = node->flags;
652     cloned_node->blocknum = node->blocknum;
653     cloned_node->layout_version = node->layout_version;
654     cloned_node->layout_version_original = node->layout_version_original;
655     cloned_node->layout_version_read_from_disk =
656         node->layout_version_read_from_disk;
657     cloned_node->build_id = node->build_id;
658     cloned_node->height = node->height;
659     cloned_node->dirty_ = node->dirty_;
660     cloned_node->fullhash = node->fullhash;
661     cloned_node->n_children = node->n_children;
662 
663     XMALLOC_N(node->n_children, cloned_node->bp);
664     // clone pivots
665     cloned_node->pivotkeys.create_from_pivot_keys(node->pivotkeys);
666     if (node->height > 0) {
667         // need to move messages here so that we don't serialize stale
668         // messages to the fresh tree - ft verify code complains otherwise.
669         toku_move_ftnode_messages_to_stale(ft, node);
670     }
671     // clone partition
672     toku_ftnode_clone_partitions(node, cloned_node);
673 
674     // clear dirty bit
675     node->clear_dirty();
676     cloned_node->clear_dirty();
677     node->layout_version_read_from_disk = FT_LAYOUT_VERSION;
678     // set new pair attr if necessary
679     if (node->height == 0) {
680         *new_attr = make_ftnode_pair_attr(node);
681         for (int i = 0; i < node->n_children; i++) {
682             if (BP_STATE(node, i) == PT_AVAIL) {
683                 BLB_LRD(node, i) = 0;
684                 BLB_LRD(cloned_node, i) = 0;
685             }
686         }
687     } else {
688         new_attr->is_valid = false;
689     }
690     *clone_size = ftnode_memory_size(cloned_node);
691     *cloned_value_data = cloned_node;
692 }
693 
toku_ftnode_flush_callback(CACHEFILE UU (cachefile),int fd,BLOCKNUM blocknum,void * ftnode_v,void ** disk_data,void * extraargs,PAIR_ATTR size,PAIR_ATTR * new_size,bool write_me,bool keep_me,bool for_checkpoint,bool is_clone)694 void toku_ftnode_flush_callback(CACHEFILE UU(cachefile),
695                                 int fd,
696                                 BLOCKNUM blocknum,
697                                 void *ftnode_v,
698                                 void **disk_data,
699                                 void *extraargs,
700                                 PAIR_ATTR size __attribute__((unused)),
701                                 PAIR_ATTR *new_size,
702                                 bool write_me,
703                                 bool keep_me,
704                                 bool for_checkpoint,
705                                 bool is_clone) {
706     FT ft = (FT)extraargs;
707     FTNODE ftnode = (FTNODE)ftnode_v;
708     FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data;
709     assert(ftnode->blocknum.b == blocknum.b);
710     int height = ftnode->height;
711     if (write_me) {
712         toku_ftnode_assert_fully_in_memory(ftnode);
713         if (height > 0 && !is_clone) {
714             // cloned nodes already had their stale messages moved, see
715             // toku_ftnode_clone_callback()
716             toku_move_ftnode_messages_to_stale(ft, ftnode);
717         } else if (height == 0) {
718             toku_ftnode_leaf_run_gc(ft, ftnode);
719             if (!is_clone) {
720                 toku_ftnode_update_disk_stats(ftnode, ft, for_checkpoint);
721             }
722         }
723         int r = toku_serialize_ftnode_to(
724             fd, ftnode->blocknum, ftnode, ndd, !is_clone, ft, for_checkpoint);
725         assert_zero(r);
726         ftnode->layout_version_read_from_disk = FT_LAYOUT_VERSION;
727     }
728     if (!keep_me) {
729         if (!is_clone) {
730             long node_size = ftnode_memory_size(ftnode);
731             if (ftnode->height == 0) {
732                 FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF, 1);
733                 FT_STATUS_INC(FT_FULL_EVICTIONS_LEAF_BYTES, node_size);
734 
735                 // A leaf node (height == 0) is being evicted (!keep_me) and is
736                 // not a checkpoint clone (!is_clone). This leaf node may have
737                 // had messages applied to satisfy a query, but was never
738                 // actually dirtied (!ftnode->dirty && !write_me). **Note that
739                 // if (write_me) would persist the node and clear the dirty
740                 // flag **. This message application may have updated the trees
741                 // logical row count. Since these message applications are not
742                 // persisted, we need undo the logical row count adjustments as
743                 // they may occur again in the future if/when the node is
744                 // re-read from disk for another query or change.
745                 if (!ftnode->dirty() && !write_me) {
746                     int64_t lrc_delta = 0;
747                     for (int i = 0; i < ftnode->n_children; i++) {
748                         if (BP_STATE(ftnode, i) == PT_AVAIL) {
749                             lrc_delta -= BLB_LRD(ftnode, i);
750                             BLB_LRD(ftnode, i) = 0;
751                         }
752                     }
753                     toku_ft_adjust_logical_row_count(ft, lrc_delta);
754                 }
755             } else {
756                 FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF, 1);
757                 FT_STATUS_INC(FT_FULL_EVICTIONS_NONLEAF_BYTES, node_size);
758             }
759             toku_free(*disk_data);
760         } else {
761             if (ftnode->height == 0) {
762                 // No need to adjust logical row counts when flushing a clone
763                 // as they should have been zeroed out anyway when cloned.
764                 // Clones are 'copies' of work already done so doing it again
765                 // (adjusting row counts) would be redundant and leads to
766                 // inaccurate counts.
767                 for (int i = 0; i < ftnode->n_children; i++) {
768                     if (BP_STATE(ftnode, i) == PT_AVAIL) {
769                         BASEMENTNODE bn = BLB(ftnode, i);
770                         toku_ft_decrease_stats(&ft->in_memory_stats,
771                                                bn->stat64_delta);
772                     }
773                 }
774             }
775         }
776         toku_ftnode_free(&ftnode);
777     } else {
778         *new_size = make_ftnode_pair_attr(ftnode);
779     }
780 }
781 
782 void
toku_ft_status_update_pivot_fetch_reason(ftnode_fetch_extra * bfe)783 toku_ft_status_update_pivot_fetch_reason(ftnode_fetch_extra *bfe)
784 {
785     if (bfe->type == ftnode_fetch_prefetch) {
786         FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_PREFETCH, 1);
787         FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_PREFETCH, bfe->bytes_read);
788         FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_PREFETCH, bfe->io_time);
789     } else if (bfe->type == ftnode_fetch_all) {
790         FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_WRITE, 1);
791         FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_WRITE, bfe->bytes_read);
792         FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_WRITE, bfe->io_time);
793     } else if (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_keymatch) {
794         FT_STATUS_INC(FT_NUM_PIVOTS_FETCHED_QUERY, 1);
795         FT_STATUS_INC(FT_BYTES_PIVOTS_FETCHED_QUERY, bfe->bytes_read);
796         FT_STATUS_INC(FT_TOKUTIME_PIVOTS_FETCHED_QUERY, bfe->io_time);
797     }
798 }
799 
toku_ftnode_fetch_callback(CACHEFILE UU (cachefile),PAIR p,int fd,BLOCKNUM blocknum,uint32_t fullhash,void ** ftnode_pv,void ** disk_data,PAIR_ATTR * sizep,int * dirtyp,void * extraargs)800 int toku_ftnode_fetch_callback(CACHEFILE UU(cachefile),
801                                PAIR p,
802                                int fd,
803                                BLOCKNUM blocknum,
804                                uint32_t fullhash,
805                                void **ftnode_pv,
806                                void **disk_data,
807                                PAIR_ATTR *sizep,
808                                int *dirtyp,
809                                void *extraargs) {
810     assert(extraargs);
811     assert(*ftnode_pv == nullptr);
812     FTNODE_DISK_DATA *ndd = (FTNODE_DISK_DATA *)disk_data;
813     ftnode_fetch_extra *bfe = (ftnode_fetch_extra *)extraargs;
814     FTNODE *node = (FTNODE *)ftnode_pv;
815     // deserialize the node, must pass the bfe in because we cannot
816     // evaluate what piece of the the node is necessary until we get it at
817     // least partially into memory
818     int r =
819         toku_deserialize_ftnode_from(fd, blocknum, fullhash, node, ndd, bfe);
820     if (r != 0) {
821         if (r == TOKUDB_BAD_CHECKSUM) {
822             fprintf(
823                 stderr,
824                 "%s:%d:toku_ftnode_fetch_callback - "
825                 "file[%s], blocknum[%lld], toku_deserialize_ftnode_from "
826                 "failed with a checksum error.\n",
827                 __FILE__,
828                 __LINE__,
829                 toku_cachefile_fname_in_env(cachefile),
830                 (longlong)blocknum.b);
831         } else {
832             fprintf(
833                 stderr,
834                 "%s:%d:toku_ftnode_fetch_callback - "
835                 "file[%s], blocknum[%lld], toku_deserialize_ftnode_from "
836                 "failed with %d.\n",
837                 __FILE__,
838                 __LINE__,
839                 toku_cachefile_fname_in_env(cachefile),
840                 (longlong)blocknum.b,
841                 r);
842         }
843         // make absolutely sure we crash before doing anything else.
844         abort();
845     }
846 
847     if (r == 0) {
848         *sizep = make_ftnode_pair_attr(*node);
849         (*node)->ct_pair = p;
850         *dirtyp = (*node)->dirty();  // deserialize could mark the node as dirty
851                                      // (presumably for upgrade)
852     }
853     return r;
854 }
855 
856 static bool ft_compress_buffers_before_eviction = true;
857 
toku_ft_set_compress_buffers_before_eviction(bool compress_buffers)858 void toku_ft_set_compress_buffers_before_eviction(bool compress_buffers) {
859     ft_compress_buffers_before_eviction = compress_buffers;
860 }
861 
toku_ftnode_pe_est_callback(void * ftnode_pv,void * disk_data,long * bytes_freed_estimate,enum partial_eviction_cost * cost,void * UU (write_extraargs))862 void toku_ftnode_pe_est_callback(
863     void* ftnode_pv,
864     void* disk_data,
865     long* bytes_freed_estimate,
866     enum partial_eviction_cost *cost,
867     void* UU(write_extraargs)
868     )
869 {
870     paranoid_invariant(ftnode_pv != NULL);
871     long bytes_to_free = 0;
872     FTNODE node = static_cast<FTNODE>(ftnode_pv);
873     if (node->dirty() || node->height == 0 ||
874         node->layout_version_read_from_disk < FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
875         *bytes_freed_estimate = 0;
876         *cost = PE_CHEAP;
877         goto exit;
878     }
879 
880     //
881     // we are dealing with a clean internal node
882     //
883     *cost = PE_EXPENSIVE;
884     // now lets get an estimate for how much data we can free up
885     // we estimate the compressed size of data to be how large
886     // the compressed data is on disk
887     for (int i = 0; i < node->n_children; i++) {
888         if (BP_STATE(node,i) == PT_AVAIL && BP_SHOULD_EVICT(node,i)) {
889             // calculate how much data would be freed if
890             // we compress this node and add it to
891             // bytes_to_free
892 
893             if (ft_compress_buffers_before_eviction) {
894                 // first get an estimate for how much space will be taken
895                 // after compression, it is simply the size of compressed
896                 // data on disk plus the size of the struct that holds it
897                 FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
898                 uint32_t compressed_data_size = BP_SIZE(ndd, i);
899                 compressed_data_size += sizeof(struct sub_block);
900 
901                 // now get the space taken now
902                 uint32_t decompressed_data_size = get_avail_internal_node_partition_size(node,i);
903                 bytes_to_free += (decompressed_data_size - compressed_data_size);
904             } else {
905                 bytes_to_free += get_avail_internal_node_partition_size(node, i);
906             }
907         }
908     }
909 
910     *bytes_freed_estimate = bytes_to_free;
911 exit:
912     return;
913 }
914 
915 // replace the child buffer with a compressed version of itself.
compress_internal_node_partition(FTNODE node,int i,enum toku_compression_method compression_method)916 static void compress_internal_node_partition(FTNODE node, int i, enum toku_compression_method compression_method) {
917     // if we should evict, compress the
918     // message buffer into a sub_block
919     assert(BP_STATE(node, i) == PT_AVAIL);
920     assert(node->height > 0);
921     SUB_BLOCK XMALLOC(sb);
922     sub_block_init(sb);
923     toku_create_compressed_partition_from_available(node, i, compression_method, sb);
924 
925     // now set the state to compressed
926     set_BSB(node, i, sb);
927     BP_STATE(node,i) = PT_COMPRESSED;
928 }
929 
930 // callback for partially evicting a node
toku_ftnode_pe_callback(void * ftnode_pv,PAIR_ATTR old_attr,void * write_extraargs,void (* finalize)(PAIR_ATTR new_attr,void * extra),void * finalize_extra)931 int toku_ftnode_pe_callback(void *ftnode_pv,
932                             PAIR_ATTR old_attr,
933                             void *write_extraargs,
934                             void (*finalize)(PAIR_ATTR new_attr, void *extra),
935                             void *finalize_extra) {
936     FTNODE node = (FTNODE)ftnode_pv;
937     FT ft = (FT)write_extraargs;
938     int num_partial_evictions = 0;
939 
940     // Hold things we intend to destroy here.
941     // They will be taken care of after finalize().
942     int num_basements_to_destroy = 0;
943     int num_buffers_to_destroy = 0;
944     int num_pointers_to_free = 0;
945     BASEMENTNODE basements_to_destroy[node->n_children];
946     NONLEAF_CHILDINFO buffers_to_destroy[node->n_children];
947     void *pointers_to_free[node->n_children * 2];
948 
949     // Don't partially evict dirty nodes
950     if (node->dirty()) {
951         goto exit;
952     }
953     // Don't partially evict nodes whose partitions can't be read back
954     // from disk individually
955     if (node->layout_version_read_from_disk <
956         FT_FIRST_LAYOUT_VERSION_WITH_BASEMENT_NODES) {
957         goto exit;
958     }
959     //
960     // partial eviction for nonleaf nodes
961     //
962     if (node->height > 0) {
963         for (int i = 0; i < node->n_children; i++) {
964             if (BP_STATE(node, i) == PT_AVAIL) {
965                 if (BP_SHOULD_EVICT(node, i)) {
966                     NONLEAF_CHILDINFO bnc = BNC(node, i);
967                     if (ft_compress_buffers_before_eviction &&
968                         // We may not serialize and compress a partition in
969                         // memory if its in memory layout version is different
970                         // than what's on disk (and therefore requires upgrade).
971                         //
972                         // Auto-upgrade code assumes that if a node's layout
973                         // version read from disk is not current, it MUST
974                         // require upgrade.
975                         // Breaking this rule would cause upgrade code to
976                         // upgrade this partition again after we serialize it as
977                         // the current version, which is bad.
978                         node->layout_version ==
979                             node->layout_version_read_from_disk) {
980                         toku_ft_bnc_move_messages_to_stale(ft, bnc);
981                         compress_internal_node_partition(
982                             node,
983                             i,
984                             // Always compress with quicklz
985                             TOKU_QUICKLZ_METHOD);
986                     } else {
987                         // We're not compressing buffers before eviction. Simply
988                         // detach the buffer and set the child's state to
989                         // on-disk.
990                         set_BNULL(node, i);
991                         BP_STATE(node, i) = PT_ON_DISK;
992                     }
993                     buffers_to_destroy[num_buffers_to_destroy++] = bnc;
994                     num_partial_evictions++;
995                 } else {
996                     BP_SWEEP_CLOCK(node, i);
997                 }
998             } else {
999                 continue;
1000             }
1001         }
1002     } else {
1003         //
1004         // partial eviction strategy for basement nodes:
1005         //  if the bn is compressed, evict it
1006         //  else: check if it requires eviction, if it does, evict it, if not,
1007         //  sweep the clock count
1008         //
1009         for (int i = 0; i < node->n_children; i++) {
1010             // Get rid of compressed stuff no matter what.
1011             if (BP_STATE(node, i) == PT_COMPRESSED) {
1012                 SUB_BLOCK sb = BSB(node, i);
1013                 pointers_to_free[num_pointers_to_free++] = sb->compressed_ptr;
1014                 pointers_to_free[num_pointers_to_free++] = sb;
1015                 set_BNULL(node, i);
1016                 BP_STATE(node, i) = PT_ON_DISK;
1017                 num_partial_evictions++;
1018             } else if (BP_STATE(node, i) == PT_AVAIL) {
1019                 if (BP_SHOULD_EVICT(node, i)) {
1020                     BASEMENTNODE bn = BLB(node, i);
1021                     basements_to_destroy[num_basements_to_destroy++] = bn;
1022                     toku_ft_decrease_stats(&ft->in_memory_stats,
1023                                            bn->stat64_delta);
1024                     // A basement node is being partially evicted.
1025                     // This masement node may have had messages applied to it to
1026                     // satisfy a query, but was never actually dirtied.
1027                     // This message application may have updated the trees
1028                     // logical row count. Since these message applications are
1029                     // not being persisted, we need undo the logical row count
1030                     // adjustments as they may occur again in the future if/when
1031                     // the node is re-read from disk for another query or change.
1032                     toku_ft_adjust_logical_row_count(ft,
1033                                                      -bn->logical_rows_delta);
1034                     set_BNULL(node, i);
1035                     BP_STATE(node, i) = PT_ON_DISK;
1036                     num_partial_evictions++;
1037                 } else {
1038                     BP_SWEEP_CLOCK(node, i);
1039                 }
1040             } else if (BP_STATE(node, i) == PT_ON_DISK) {
1041                 continue;
1042             } else {
1043                 abort();
1044             }
1045         }
1046     }
1047 
1048 exit:
1049     // call the finalize callback with a new pair attr
1050     int height = node->height;
1051     PAIR_ATTR new_attr = make_ftnode_pair_attr(node);
1052     finalize(new_attr, finalize_extra);
1053 
1054     // destroy everything now that we've called finalize(),
1055     // and, by contract, and it's safe to do expensive work.
1056     for (int i = 0; i < num_basements_to_destroy; i++) {
1057         destroy_basement_node(basements_to_destroy[i]);
1058     }
1059     for (int i = 0; i < num_buffers_to_destroy; i++) {
1060         destroy_nonleaf_childinfo(buffers_to_destroy[i]);
1061     }
1062     for (int i = 0; i < num_pointers_to_free; i++) {
1063         toku_free(pointers_to_free[i]);
1064     }
1065     // stats
1066     if (num_partial_evictions > 0) {
1067         if (height == 0) {
1068             long delta = old_attr.leaf_size - new_attr.leaf_size;
1069             FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF, num_partial_evictions);
1070             FT_STATUS_INC(FT_PARTIAL_EVICTIONS_LEAF_BYTES, delta);
1071         } else {
1072             long delta = old_attr.nonleaf_size - new_attr.nonleaf_size;
1073             FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF, num_partial_evictions);
1074             FT_STATUS_INC(FT_PARTIAL_EVICTIONS_NONLEAF_BYTES, delta);
1075         }
1076     }
1077     return 0;
1078 }
1079 
1080 // We touch the clock while holding a read lock.
1081 // DRD reports a race but we want to ignore it.
1082 // Using a valgrind suppressions file is better than the DRD_IGNORE_VAR macro because it's more targeted.
1083 // We need a function to have something a drd suppression can reference
1084 // see src/tests/drd.suppressions (unsafe_touch_clock)
unsafe_touch_clock(FTNODE node,int i)1085 static void unsafe_touch_clock(FTNODE node, int i) {
1086     toku_unsafe_set(&node->bp[i].clock_count, static_cast<unsigned char>(1));
1087 }
1088 
1089 // Callback that states if a partial fetch of the node is necessary
1090 // Currently, this function is responsible for the following things:
1091 //  - reporting to the cachetable whether a partial fetch is required (as required by the contract of the callback)
1092 //  - A couple of things that are NOT required by the callback, but we do for efficiency and simplicity reasons:
1093 //   - for queries, set the value of bfe->child_to_read so that the query that called this can proceed with the query
1094 //      as opposed to having to evaluate toku_ft_search_which_child again. This is done to make the in-memory query faster
1095 //   - touch the necessary partition's clock. The reason we do it here is so that there is one central place it is done, and not done
1096 //      by all the various callers
1097 //
toku_ftnode_pf_req_callback(void * ftnode_pv,void * read_extraargs)1098 bool toku_ftnode_pf_req_callback(void* ftnode_pv, void* read_extraargs) {
1099     // placeholder for now
1100     bool retval = false;
1101     FTNODE node = (FTNODE) ftnode_pv;
1102     ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs;
1103     //
1104     // The three types of fetches that the ft layer may request are:
1105     //  - ftnode_fetch_none: no partitions are necessary (example use: stat64)
1106     //  - ftnode_fetch_subset: some subset is necessary (example use: toku_ft_search)
1107     //  - ftnode_fetch_all: entire node is necessary (example use: flush, split, merge)
1108     // The code below checks if the necessary partitions are already in memory,
1109     // and if they are, return false, and if not, return true
1110     //
1111     if (bfe->type == ftnode_fetch_none) {
1112         retval = false;
1113     }
1114     else if (bfe->type == ftnode_fetch_all) {
1115         retval = false;
1116         for (int i = 0; i < node->n_children; i++) {
1117             unsafe_touch_clock(node,i);
1118             // if we find a partition that is not available,
1119             // then a partial fetch is required because
1120             // the entire node must be made available
1121             if (BP_STATE(node,i) != PT_AVAIL) {
1122                 retval = true;
1123             }
1124         }
1125     }
1126     else if (bfe->type == ftnode_fetch_subset) {
1127         // we do not take into account prefetching yet
1128         // as of now, if we need a subset, the only thing
1129         // we can possibly require is a single basement node
1130         // we find out what basement node the query cares about
1131         // and check if it is available
1132         paranoid_invariant(bfe->search);
1133         bfe->child_to_read = toku_ft_search_which_child(
1134             bfe->ft->cmp,
1135             node,
1136             bfe->search
1137             );
1138         unsafe_touch_clock(node,bfe->child_to_read);
1139         // child we want to read is not available, must set retval to true
1140         retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
1141     }
1142     else if (bfe->type == ftnode_fetch_prefetch) {
1143         // makes no sense to have prefetching disabled
1144         // and still call this function
1145         paranoid_invariant(!bfe->disable_prefetching);
1146         int lc = bfe->leftmost_child_wanted(node);
1147         int rc = bfe->rightmost_child_wanted(node);
1148         for (int i = lc; i <= rc; ++i) {
1149             if (BP_STATE(node, i) != PT_AVAIL) {
1150                 retval = true;
1151             }
1152         }
1153     } else if (bfe->type == ftnode_fetch_keymatch) {
1154         // we do not take into account prefetching yet
1155         // as of now, if we need a subset, the only thing
1156         // we can possibly require is a single basement node
1157         // we find out what basement node the query cares about
1158         // and check if it is available
1159         if (node->height == 0) {
1160             int left_child = bfe->leftmost_child_wanted(node);
1161             int right_child = bfe->rightmost_child_wanted(node);
1162             if (left_child == right_child) {
1163                 bfe->child_to_read = left_child;
1164                 unsafe_touch_clock(node,bfe->child_to_read);
1165                 // child we want to read is not available, must set retval to true
1166                 retval = (BP_STATE(node, bfe->child_to_read) != PT_AVAIL);
1167             }
1168         }
1169     } else {
1170         // we have a bug. The type should be known
1171         abort();
1172     }
1173     return retval;
1174 }
1175 
1176 static void
ft_status_update_partial_fetch_reason(ftnode_fetch_extra * bfe,int childnum,enum pt_state state,bool is_leaf)1177 ft_status_update_partial_fetch_reason(
1178     ftnode_fetch_extra *bfe,
1179     int childnum,
1180     enum pt_state state,
1181     bool is_leaf
1182     )
1183 {
1184     invariant(state == PT_COMPRESSED || state == PT_ON_DISK);
1185     if (is_leaf) {
1186         if (bfe->type == ftnode_fetch_prefetch) {
1187             if (state == PT_COMPRESSED) {
1188                 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_PREFETCH, 1);
1189             } else {
1190                 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_PREFETCH, 1);
1191                 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_PREFETCH, bfe->bytes_read);
1192                 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_PREFETCH, bfe->io_time);
1193             }
1194         } else if (bfe->type == ftnode_fetch_all) {
1195             if (state == PT_COMPRESSED) {
1196                 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_WRITE, 1);
1197             } else {
1198                 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_WRITE, 1);
1199                 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_WRITE, bfe->bytes_read);
1200                 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_WRITE, bfe->io_time);
1201             }
1202         } else if (childnum == bfe->child_to_read) {
1203             if (state == PT_COMPRESSED) {
1204                 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_NORMAL, 1);
1205             } else {
1206                 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_NORMAL, 1);
1207                 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_NORMAL, bfe->bytes_read);
1208                 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_NORMAL, bfe->io_time);
1209             }
1210         } else {
1211             if (state == PT_COMPRESSED) {
1212                 FT_STATUS_INC(FT_NUM_BASEMENTS_DECOMPRESSED_AGGRESSIVE, 1);
1213             } else {
1214                 FT_STATUS_INC(FT_NUM_BASEMENTS_FETCHED_AGGRESSIVE, 1);
1215                 FT_STATUS_INC(FT_BYTES_BASEMENTS_FETCHED_AGGRESSIVE, bfe->bytes_read);
1216                 FT_STATUS_INC(FT_TOKUTIME_BASEMENTS_FETCHED_AGGRESSIVE, bfe->io_time);
1217             }
1218         }
1219     }
1220     else {
1221         if (bfe->type == ftnode_fetch_prefetch) {
1222             if (state == PT_COMPRESSED) {
1223                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_PREFETCH, 1);
1224             } else {
1225                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_PREFETCH, 1);
1226                 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_PREFETCH, bfe->bytes_read);
1227                 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_PREFETCH, bfe->io_time);
1228             }
1229         } else if (bfe->type == ftnode_fetch_all) {
1230             if (state == PT_COMPRESSED) {
1231                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_WRITE, 1);
1232             } else {
1233                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_WRITE, 1);
1234                 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_WRITE, bfe->bytes_read);
1235                 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_WRITE, bfe->io_time);
1236             }
1237         } else if (childnum == bfe->child_to_read) {
1238             if (state == PT_COMPRESSED) {
1239                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_NORMAL, 1);
1240             } else {
1241                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_NORMAL, 1);
1242                 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_NORMAL, bfe->bytes_read);
1243                 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_NORMAL, bfe->io_time);
1244             }
1245         } else {
1246             if (state == PT_COMPRESSED) {
1247                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_DECOMPRESSED_AGGRESSIVE, 1);
1248             } else {
1249                 FT_STATUS_INC(FT_NUM_MSG_BUFFER_FETCHED_AGGRESSIVE, 1);
1250                 FT_STATUS_INC(FT_BYTES_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->bytes_read);
1251                 FT_STATUS_INC(FT_TOKUTIME_MSG_BUFFER_FETCHED_AGGRESSIVE, bfe->io_time);
1252             }
1253         }
1254     }
1255 }
1256 
toku_ft_status_update_serialize_times(FTNODE node,tokutime_t serialize_time,tokutime_t compress_time)1257 void toku_ft_status_update_serialize_times(FTNODE node, tokutime_t serialize_time, tokutime_t compress_time) {
1258     if (node->height == 0) {
1259         FT_STATUS_INC(FT_LEAF_SERIALIZE_TOKUTIME, serialize_time);
1260         FT_STATUS_INC(FT_LEAF_COMPRESS_TOKUTIME, compress_time);
1261     } else {
1262         FT_STATUS_INC(FT_NONLEAF_SERIALIZE_TOKUTIME, serialize_time);
1263         FT_STATUS_INC(FT_NONLEAF_COMPRESS_TOKUTIME, compress_time);
1264     }
1265 }
1266 
toku_ft_status_update_deserialize_times(FTNODE node,tokutime_t deserialize_time,tokutime_t decompress_time)1267 void toku_ft_status_update_deserialize_times(FTNODE node, tokutime_t deserialize_time, tokutime_t decompress_time) {
1268     if (node->height == 0) {
1269         FT_STATUS_INC(FT_LEAF_DESERIALIZE_TOKUTIME, deserialize_time);
1270         FT_STATUS_INC(FT_LEAF_DECOMPRESS_TOKUTIME, decompress_time);
1271     } else {
1272         FT_STATUS_INC(FT_NONLEAF_DESERIALIZE_TOKUTIME, deserialize_time);
1273         FT_STATUS_INC(FT_NONLEAF_DECOMPRESS_TOKUTIME, decompress_time);
1274     }
1275 }
1276 
toku_ft_status_note_msn_discard(void)1277 void toku_ft_status_note_msn_discard(void) {
1278     FT_STATUS_INC(FT_MSN_DISCARDS, 1);
1279 }
1280 
toku_ft_status_note_update(bool broadcast)1281 void toku_ft_status_note_update(bool broadcast) {
1282     if (broadcast) {
1283         FT_STATUS_INC(FT_UPDATES_BROADCAST, 1);
1284     } else {
1285         FT_STATUS_INC(FT_UPDATES, 1);
1286     }
1287 }
1288 
toku_ft_status_note_msg_bytes_out(size_t buffsize)1289 void toku_ft_status_note_msg_bytes_out(size_t buffsize) {
1290     FT_STATUS_INC(FT_MSG_BYTES_OUT, buffsize);
1291     FT_STATUS_INC(FT_MSG_BYTES_CURR, -buffsize);
1292 }
toku_ft_status_note_ftnode(int height,bool created)1293 void toku_ft_status_note_ftnode(int height, bool created) {
1294     if (created) {
1295         if (height == 0) {
1296             FT_STATUS_INC(FT_CREATE_LEAF, 1);
1297         } else {
1298             FT_STATUS_INC(FT_CREATE_NONLEAF, 1);
1299         }
1300     } else {
1301         // created = false means destroyed
1302     }
1303 }
1304 
1305 // callback for partially reading a node
1306 // could have just used toku_ftnode_fetch_callback, but wanted to separate the two cases to separate functions
toku_ftnode_pf_callback(void * ftnode_pv,void * disk_data,void * read_extraargs,int fd,PAIR_ATTR * sizep)1307 int toku_ftnode_pf_callback(void* ftnode_pv, void* disk_data, void* read_extraargs, int fd, PAIR_ATTR* sizep) {
1308     int r = 0;
1309     FTNODE node = (FTNODE) ftnode_pv;
1310     FTNODE_DISK_DATA ndd = (FTNODE_DISK_DATA) disk_data;
1311     ftnode_fetch_extra *bfe = (ftnode_fetch_extra *) read_extraargs;
1312     // there must be a reason this is being called. If we get a garbage type or the type is ftnode_fetch_none,
1313     // then something went wrong
1314     assert((bfe->type == ftnode_fetch_subset) || (bfe->type == ftnode_fetch_all) || (bfe->type == ftnode_fetch_prefetch) || (bfe->type == ftnode_fetch_keymatch));
1315     // determine the range to prefetch
1316     int lc, rc;
1317     if (!bfe->disable_prefetching &&
1318         (bfe->type == ftnode_fetch_subset || bfe->type == ftnode_fetch_prefetch)
1319         )
1320     {
1321         lc = bfe->leftmost_child_wanted(node);
1322         rc = bfe->rightmost_child_wanted(node);
1323     } else {
1324         lc = -1;
1325         rc = -1;
1326     }
1327     for (int i = 0; i < node->n_children; i++) {
1328         if (BP_STATE(node,i) == PT_AVAIL) {
1329             continue;
1330         }
1331         if ((lc <= i && i <= rc) || bfe->wants_child_available(i)) {
1332             enum pt_state state = BP_STATE(node, i);
1333             if (state == PT_COMPRESSED) {
1334                 r = toku_deserialize_bp_from_compressed(node, i, bfe);
1335             } else {
1336                 invariant(state == PT_ON_DISK);
1337                 r = toku_deserialize_bp_from_disk(node, ndd, i, fd, bfe);
1338             }
1339             ft_status_update_partial_fetch_reason(bfe, i, state, (node->height == 0));
1340         }
1341 
1342         if (r != 0) {
1343             if (r == TOKUDB_BAD_CHECKSUM) {
1344                 fprintf(stderr,
1345                         "Checksum failure while reading node partition in file %s.\n",
1346                         toku_cachefile_fname_in_env(bfe->ft->cf));
1347             } else {
1348                 fprintf(stderr,
1349                         "Error while reading node partition %d\n",
1350                         get_maybe_error_errno());
1351             }
1352             abort();
1353         }
1354     }
1355 
1356     *sizep = make_ftnode_pair_attr(node);
1357 
1358     return 0;
1359 }
1360 
toku_msg_leafval_heaviside(DBT const & kdbt,const struct toku_msg_leafval_heaviside_extra & be)1361 int toku_msg_leafval_heaviside(DBT const &kdbt, const struct toku_msg_leafval_heaviside_extra &be) {
1362     return be.cmp(&kdbt, be.key);
1363 }
1364 
1365 static void
ft_init_new_root(FT ft,FTNODE oldroot,FTNODE * newrootp)1366 ft_init_new_root(FT ft, FTNODE oldroot, FTNODE *newrootp)
1367 // Effect:  Create a new root node whose two children are the split of oldroot.
1368 //  oldroot is unpinned in the process.
1369 //  Leave the new root pinned.
1370 {
1371     FTNODE newroot;
1372 
1373     BLOCKNUM old_blocknum = oldroot->blocknum;
1374     uint32_t old_fullhash = oldroot->fullhash;
1375 
1376     int new_height = oldroot->height+1;
1377     uint32_t new_fullhash;
1378     BLOCKNUM new_blocknum;
1379 
1380     cachetable_put_empty_node_with_dep_nodes(
1381         ft,
1382         1,
1383         &oldroot,
1384         &new_blocknum,
1385         &new_fullhash,
1386         &newroot
1387         );
1388 
1389     assert(newroot);
1390     assert(new_height > 0);
1391     toku_initialize_empty_ftnode (
1392         newroot,
1393         new_blocknum,
1394         new_height,
1395         1,
1396         ft->h->layout_version,
1397         ft->h->flags
1398         );
1399     newroot->fullhash = new_fullhash;
1400     MSN msna = oldroot->max_msn_applied_to_node_on_disk;
1401     newroot->max_msn_applied_to_node_on_disk = msna;
1402     BP_STATE(newroot,0) = PT_AVAIL;
1403     newroot->set_dirty();
1404 
1405     // Set the first child to have the new blocknum,
1406     // and then swap newroot with oldroot. The new root
1407     // will inherit the hash/blocknum/pair from oldroot,
1408     // keeping the root blocknum constant.
1409     BP_BLOCKNUM(newroot, 0) = new_blocknum;
1410     toku_ftnode_swap_pair_values(newroot, oldroot);
1411 
1412     toku_ft_split_child(
1413         ft,
1414         newroot,
1415         0, // childnum to split
1416         oldroot,
1417         SPLIT_EVENLY
1418         );
1419 
1420     // ft_split_child released locks on newroot
1421     // and oldroot, so now we repin and
1422     // return to caller
1423     ftnode_fetch_extra bfe;
1424     bfe.create_for_full_read(ft);
1425     toku_pin_ftnode(
1426         ft,
1427         old_blocknum,
1428         old_fullhash,
1429         &bfe,
1430         PL_WRITE_EXPENSIVE, // may_modify_node
1431         newrootp,
1432         true
1433         );
1434 }
1435 
inject_message_in_locked_node(FT ft,FTNODE node,int childnum,const ft_msg & msg,size_t flow_deltas[],txn_gc_info * gc_info)1436 static void inject_message_in_locked_node(
1437     FT ft,
1438     FTNODE node,
1439     int childnum,
1440     const ft_msg &msg,
1441     size_t flow_deltas[],
1442     txn_gc_info *gc_info
1443     )
1444 {
1445     // No guarantee that we're the writer, but oh well.
1446     // TODO(leif): Implement "do I have the lock or is it someone else?"
1447     // check in frwlock.  Should be possible with TOKU_PTHREAD_DEBUG, nop
1448     // otherwise.
1449     invariant(toku_ctpair_is_write_locked(node->ct_pair));
1450     toku_ftnode_assert_fully_in_memory(node);
1451 
1452     // Take the newer of the two oldest referenced xid values from the node and gc_info.
1453     // The gc_info usually has a newer value, because we got it at the top of this call
1454     // stack from the txn manager. But sometimes the node has a newer value, if some
1455     // other thread sees a newer value and writes to this node before we got the lock.
1456     if (gc_info->oldest_referenced_xid_for_implicit_promotion > node->oldest_referenced_xid_known) {
1457         node->oldest_referenced_xid_known = gc_info->oldest_referenced_xid_for_implicit_promotion;
1458     } else if (gc_info->oldest_referenced_xid_for_implicit_promotion < node->oldest_referenced_xid_known) {
1459         gc_info->oldest_referenced_xid_for_implicit_promotion = node->oldest_referenced_xid_known;
1460     }
1461 
1462     // Get the MSN from the header.  Now that we have a write lock on the
1463     // node we're injecting into, we know no other thread will get an MSN
1464     // after us and get that message into our subtree before us.
1465     MSN msg_msn = { .msn = toku_sync_add_and_fetch(&ft->h->max_msn_in_ft.msn, 1) };
1466     ft_msg msg_with_msn(msg.kdbt(), msg.vdbt(), msg.type(), msg_msn, msg.xids());
1467     paranoid_invariant(msg_with_msn.msn().msn > node->max_msn_applied_to_node_on_disk.msn);
1468 
1469     STAT64INFO_S stats_delta = { 0,0 };
1470     int64_t logical_rows_delta = 0;
1471     toku_ftnode_put_msg(
1472         ft->cmp,
1473         ft->update_fun,
1474         node,
1475         childnum,
1476         msg_with_msn,
1477         true,
1478         gc_info,
1479         flow_deltas,
1480         &stats_delta,
1481         &logical_rows_delta);
1482     if (stats_delta.numbytes || stats_delta.numrows) {
1483         toku_ft_update_stats(&ft->in_memory_stats, stats_delta);
1484     }
1485     toku_ft_adjust_logical_row_count(ft, logical_rows_delta);
1486     //
1487     // assumption is that toku_ftnode_put_msg will
1488     // mark the node as dirty.
1489     // enforcing invariant here.
1490     //
1491     paranoid_invariant(node->dirty() != 0);
1492 
1493     // update some status variables
1494     if (node->height != 0) {
1495         size_t msgsize = msg.total_size();
1496         FT_STATUS_INC(FT_MSG_BYTES_IN, msgsize);
1497         FT_STATUS_INC(FT_MSG_BYTES_CURR, msgsize);
1498         FT_STATUS_INC(FT_MSG_NUM, 1);
1499         if (ft_msg_type_applies_all(msg.type())) {
1500             FT_STATUS_INC(FT_MSG_NUM_BROADCAST, 1);
1501         }
1502     }
1503 
1504     // verify that msn of latest message was captured in root node
1505     paranoid_invariant(msg_with_msn.msn().msn == node->max_msn_applied_to_node_on_disk.msn);
1506 
1507     if (node->blocknum.b == ft->rightmost_blocknum.b) {
1508         if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) {
1509             // we promoted to the rightmost leaf node and the seqinsert score has not yet saturated.
1510             toku_sync_fetch_and_add(&ft->seqinsert_score, 1);
1511         }
1512     } else if (toku_unsafe_fetch(&ft->seqinsert_score) != 0) {
1513         // we promoted to something other than the rightmost leaf node and the score should reset
1514         toku_unsafe_set(&ft->seqinsert_score, static_cast<uint32_t>(0));
1515     }
1516 
1517     // if we call toku_ft_flush_some_child, then that function unpins the root
1518     // otherwise, we unpin ourselves
1519     if (node->height > 0 && toku_ftnode_nonleaf_is_gorged(node, ft->h->nodesize)) {
1520         toku_ft_flush_node_on_background_thread(ft, node);
1521     }
1522     else {
1523         toku_unpin_ftnode(ft, node);
1524     }
1525 }
1526 
1527 // seqinsert_loc is a bitmask.
1528 // The root counts as being both on the "left extreme" and on the "right extreme".
1529 // Therefore, at the root, you're at LEFT_EXTREME | RIGHT_EXTREME.
1530 typedef char seqinsert_loc;
1531 static const seqinsert_loc NEITHER_EXTREME = 0;
1532 static const seqinsert_loc LEFT_EXTREME = 1;
1533 static const seqinsert_loc RIGHT_EXTREME = 2;
1534 
process_maybe_reactive_child(FT ft,FTNODE parent,FTNODE child,int childnum,seqinsert_loc loc)1535 static bool process_maybe_reactive_child(FT ft, FTNODE parent, FTNODE child, int childnum, seqinsert_loc loc)
1536 // Effect:
1537 //  If child needs to be split or merged, do that.
1538 //  parent and child will be unlocked if this happens
1539 // Requires: parent and child are read locked
1540 // Returns:
1541 //  true if relocking is needed
1542 //  false otherwise
1543 {
1544     enum reactivity re = toku_ftnode_get_reactivity(ft, child);
1545     enum reactivity newre;
1546     BLOCKNUM child_blocknum;
1547     uint32_t child_fullhash;
1548     switch (re) {
1549     case RE_STABLE:
1550         return false;
1551     case RE_FISSIBLE:
1552         {
1553             // We only have a read lock on the parent.  We need to drop both locks, and get write locks.
1554             BLOCKNUM parent_blocknum = parent->blocknum;
1555             uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
1556             int parent_height = parent->height;
1557             int parent_n_children = parent->n_children;
1558             toku_unpin_ftnode_read_only(ft, child);
1559             toku_unpin_ftnode_read_only(ft, parent);
1560             ftnode_fetch_extra bfe;
1561             bfe.create_for_full_read(ft);
1562             FTNODE newparent, newchild;
1563             toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true);
1564             if (newparent->height != parent_height || newparent->n_children != parent_n_children ||
1565                 childnum >= newparent->n_children || toku_bnc_n_entries(BNC(newparent, childnum))) {
1566                 // If the height changed or childnum is now off the end, something clearly got split or merged out from under us.
1567                 // If something got injected in this node, then it got split or merged and we shouldn't be splitting it.
1568                 // But we already unpinned the child so we need to have the caller re-try the pins.
1569                 toku_unpin_ftnode_read_only(ft, newparent);
1570                 return true;
1571             }
1572             // It's ok to reuse the same childnum because if we get something
1573             // else we need to split, well, that's crazy, but let's go ahead
1574             // and split it.
1575             child_blocknum = BP_BLOCKNUM(newparent, childnum);
1576             child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
1577             toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_WRITE_CHEAP, 1, &newparent, &newchild, true);
1578             newre = toku_ftnode_get_reactivity(ft, newchild);
1579             if (newre == RE_FISSIBLE) {
1580                 enum split_mode split_mode;
1581                 if (newparent->height == 1 && (loc & LEFT_EXTREME) && childnum == 0) {
1582                     split_mode = SPLIT_RIGHT_HEAVY;
1583                 } else if (newparent->height == 1 && (loc & RIGHT_EXTREME) && childnum == newparent->n_children - 1) {
1584                     split_mode = SPLIT_LEFT_HEAVY;
1585                 } else {
1586                     split_mode = SPLIT_EVENLY;
1587                 }
1588                 toku_ft_split_child(ft, newparent, childnum, newchild, split_mode);
1589             } else {
1590                 // some other thread already got it, just unpin and tell the
1591                 // caller to retry
1592                 toku_unpin_ftnode_read_only(ft, newchild);
1593                 toku_unpin_ftnode_read_only(ft, newparent);
1594             }
1595             return true;
1596         }
1597     case RE_FUSIBLE:
1598         {
1599             if (parent->height == 1) {
1600                 // prevent re-merging of recently unevenly-split nodes
1601                 if (((loc & LEFT_EXTREME) && childnum <= 1) ||
1602                     ((loc & RIGHT_EXTREME) && childnum >= parent->n_children - 2)) {
1603                     return false;
1604                 }
1605             }
1606 
1607             int parent_height = parent->height;
1608             BLOCKNUM parent_blocknum = parent->blocknum;
1609             uint32_t parent_fullhash = toku_cachetable_hash(ft->cf, parent_blocknum);
1610             toku_unpin_ftnode_read_only(ft, child);
1611             toku_unpin_ftnode_read_only(ft, parent);
1612             ftnode_fetch_extra bfe;
1613             bfe.create_for_full_read(ft);
1614             FTNODE newparent, newchild;
1615             toku_pin_ftnode(ft, parent_blocknum, parent_fullhash, &bfe, PL_WRITE_CHEAP, &newparent, true);
1616             if (newparent->height != parent_height || childnum >= newparent->n_children) {
1617                 // looks like this is the root and it got merged, let's just start over (like in the split case above)
1618                 toku_unpin_ftnode_read_only(ft, newparent);
1619                 return true;
1620             }
1621             child_blocknum = BP_BLOCKNUM(newparent, childnum);
1622             child_fullhash = compute_child_fullhash(ft->cf, newparent, childnum);
1623             toku_pin_ftnode_with_dep_nodes(ft, child_blocknum, child_fullhash, &bfe, PL_READ, 1, &newparent, &newchild, true);
1624             newre = toku_ftnode_get_reactivity(ft, newchild);
1625             if (newre == RE_FUSIBLE && newparent->n_children >= 2) {
1626                 toku_unpin_ftnode_read_only(ft, newchild);
1627                 toku_ft_merge_child(ft, newparent, childnum);
1628             } else {
1629                 // Could be a weird case where newparent has only one
1630                 // child.  In this case, we want to inject here but we've
1631                 // already unpinned the caller's copy of parent so we have
1632                 // to ask them to re-pin, or they could (very rarely)
1633                 // dereferenced memory in a freed node.  TODO: we could
1634                 // give them back the copy of the parent we pinned.
1635                 //
1636                 // Otherwise, some other thread already got it, just unpin
1637                 // and tell the caller to retry
1638                 toku_unpin_ftnode_read_only(ft, newchild);
1639                 toku_unpin_ftnode_read_only(ft, newparent);
1640             }
1641             return true;
1642         }
1643     }
1644     abort();
1645 }
1646 
inject_message_at_this_blocknum(FT ft,CACHEKEY cachekey,uint32_t fullhash,const ft_msg & msg,size_t flow_deltas[],txn_gc_info * gc_info)1647 static void inject_message_at_this_blocknum(FT ft, CACHEKEY cachekey, uint32_t fullhash, const ft_msg &msg, size_t flow_deltas[], txn_gc_info *gc_info)
1648 // Effect:
1649 //  Inject message into the node at this blocknum (cachekey).
1650 //  Gets a write lock on the node for you.
1651 {
1652     toku::context inject_ctx(CTX_MESSAGE_INJECTION);
1653     FTNODE node;
1654     ftnode_fetch_extra bfe;
1655     bfe.create_for_full_read(ft);
1656     toku_pin_ftnode(ft, cachekey, fullhash, &bfe, PL_WRITE_CHEAP, &node, true);
1657     toku_ftnode_assert_fully_in_memory(node);
1658     paranoid_invariant(node->fullhash==fullhash);
1659     ft_verify_flags(ft, node);
1660     inject_message_in_locked_node(ft, node, -1, msg, flow_deltas, gc_info);
1661 }
1662 
1663 __attribute__((const))
should_inject_in_node(seqinsert_loc loc,int height,int depth)1664 static inline bool should_inject_in_node(seqinsert_loc loc, int height, int depth)
1665 // We should inject directly in a node if:
1666 //  - it's a leaf, or
1667 //  - it's a height 1 node not at either extreme, or
1668 //  - it's a depth 2 node not at either extreme
1669 {
1670     return (height == 0 || (loc == NEITHER_EXTREME && (height <= 1 || depth >= 2)));
1671 }
1672 
ft_verify_or_set_rightmost_blocknum(FT ft,BLOCKNUM b)1673 static void ft_verify_or_set_rightmost_blocknum(FT ft, BLOCKNUM b)
1674 // Given: 'b', the _definitive_ and constant rightmost blocknum of 'ft'
1675 {
1676     if (toku_unsafe_fetch(&ft->rightmost_blocknum.b) == RESERVED_BLOCKNUM_NULL) {
1677         toku_ft_lock(ft);
1678         if (ft->rightmost_blocknum.b == RESERVED_BLOCKNUM_NULL) {
1679             toku_unsafe_set(&ft->rightmost_blocknum, b);
1680         }
1681         toku_ft_unlock(ft);
1682     }
1683     // The rightmost blocknum only transitions from RESERVED_BLOCKNUM_NULL to non-null.
1684     // If it's already set, verify that the stored value is consistent with 'b'
1685     invariant(toku_unsafe_fetch(&ft->rightmost_blocknum.b) == b.b);
1686 }
1687 
toku_bnc_should_promote(FT ft,NONLEAF_CHILDINFO bnc)1688 bool toku_bnc_should_promote(FT ft, NONLEAF_CHILDINFO bnc) {
1689     static const double factor = 0.125;
1690     const uint64_t flow_threshold = ft->h->nodesize * factor;
1691     return bnc->flow[0] >= flow_threshold || bnc->flow[1] >= flow_threshold;
1692 }
1693 
push_something_in_subtree(FT ft,FTNODE subtree_root,int target_childnum,const ft_msg & msg,size_t flow_deltas[],txn_gc_info * gc_info,int depth,seqinsert_loc loc,bool just_did_split_or_merge)1694 static void push_something_in_subtree(
1695     FT ft,
1696     FTNODE subtree_root,
1697     int target_childnum,
1698     const ft_msg &msg,
1699     size_t flow_deltas[],
1700     txn_gc_info *gc_info,
1701     int depth,
1702     seqinsert_loc loc,
1703     bool just_did_split_or_merge
1704     )
1705 // Effects:
1706 //  Assign message an MSN from ft->h.
1707 //  Put message in the subtree rooted at node.  Due to promotion the message may not be injected directly in this node.
1708 //  Unlock node or schedule it to be unlocked (after a background flush).
1709 //   Either way, the caller is not responsible for unlocking node.
1710 // Requires:
1711 //  subtree_root is read locked and fully in memory.
1712 // Notes:
1713 //  In Ming, the basic rules of promotion are as follows:
1714 //   Don't promote broadcast messages.
1715 //   Don't promote past non-empty buffers.
1716 //   Otherwise, promote at most to height 1 or depth 2 (whichever is highest), as far as the birdie asks you to promote.
1717 //    We don't promote to leaves because injecting into leaves is expensive, mostly because of #5605 and some of #5552.
1718 //    We don't promote past depth 2 because we found that gives us enough parallelism without costing us too much pinning work.
1719 //
1720 //    This is true with the following caveats:
1721 //     We always promote all the way to the leaves on the rightmost and leftmost edges of the tree, for sequential insertions.
1722 //      (That means we can promote past depth 2 near the edges of the tree.)
1723 //
1724 //   When the birdie is still saying we should promote, we use get_and_pin so that we wait to get the node.
1725 //   If the birdie doesn't say to promote, we try maybe_get_and_pin.  If we get the node cheaply, and it's dirty, we promote anyway.
1726 {
1727     toku_ftnode_assert_fully_in_memory(subtree_root);
1728     if (should_inject_in_node(loc, subtree_root->height, depth)) {
1729         switch (depth) {
1730         case 0:
1731             FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
1732         case 1:
1733             FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
1734         case 2:
1735             FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
1736         case 3:
1737             FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
1738         default:
1739             FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
1740         }
1741         // If the target node is a non-root leaf node on the right extreme,
1742         // set the rightmost blocknum. We know there are no messages above us
1743         // because promotion would not chose to inject directly into this leaf
1744         // otherwise. We explicitly skip the root node because then we don't have
1745         // to worry about changing the rightmost blocknum when the root splits.
1746         if (subtree_root->height == 0 && loc == RIGHT_EXTREME && subtree_root->blocknum.b != ft->h->root_blocknum.b) {
1747             ft_verify_or_set_rightmost_blocknum(ft, subtree_root->blocknum);
1748         }
1749         inject_message_in_locked_node(ft, subtree_root, target_childnum, msg, flow_deltas, gc_info);
1750     } else {
1751         int r;
1752         int childnum;
1753         NONLEAF_CHILDINFO bnc;
1754 
1755         // toku_ft_root_put_msg should not have called us otherwise.
1756         paranoid_invariant(ft_msg_type_applies_once(msg.type()));
1757 
1758         childnum = (target_childnum >= 0 ? target_childnum
1759                     : toku_ftnode_which_child(subtree_root, msg.kdbt(), ft->cmp));
1760         bnc = BNC(subtree_root, childnum);
1761 
1762         if (toku_bnc_n_entries(bnc) > 0) {
1763             // The buffer is non-empty, give up on promoting.
1764             FT_STATUS_INC(FT_PRO_NUM_STOP_NONEMPTY_BUF, 1);
1765             goto relock_and_push_here;
1766         }
1767 
1768         seqinsert_loc next_loc;
1769         if ((loc & LEFT_EXTREME) && childnum == 0) {
1770             next_loc = LEFT_EXTREME;
1771         } else if ((loc & RIGHT_EXTREME) && childnum == subtree_root->n_children - 1) {
1772             next_loc = RIGHT_EXTREME;
1773         } else {
1774             next_loc = NEITHER_EXTREME;
1775         }
1776 
1777         if (next_loc == NEITHER_EXTREME && subtree_root->height <= 1) {
1778             // Never promote to leaf nodes except on the edges
1779             FT_STATUS_INC(FT_PRO_NUM_STOP_H1, 1);
1780             goto relock_and_push_here;
1781         }
1782 
1783         {
1784             const BLOCKNUM child_blocknum = BP_BLOCKNUM(subtree_root, childnum);
1785             ft->blocktable.verify_blocknum_allocated(child_blocknum);
1786             const uint32_t child_fullhash = toku_cachetable_hash(ft->cf, child_blocknum);
1787 
1788             FTNODE child;
1789             {
1790                 const int child_height = subtree_root->height - 1;
1791                 const int child_depth = depth + 1;
1792                 // If we're locking a leaf, or a height 1 node or depth 2
1793                 // node in the middle, we know we won't promote further
1794                 // than that, so just get a write lock now.
1795                 const pair_lock_type lock_type = (should_inject_in_node(next_loc, child_height, child_depth)
1796                                                   ? PL_WRITE_CHEAP
1797                                                   : PL_READ);
1798                 if (next_loc != NEITHER_EXTREME || (toku_bnc_should_promote(ft, bnc) && depth <= 1)) {
1799                     // If we're on either extreme, or the birdie wants to
1800                     // promote and we're in the top two levels of the
1801                     // tree, don't stop just because someone else has the
1802                     // node locked.
1803                     ftnode_fetch_extra bfe;
1804                     bfe.create_for_full_read(ft);
1805                     if (lock_type == PL_WRITE_CHEAP) {
1806                         // We intend to take the write lock for message injection
1807                         toku::context inject_ctx(CTX_MESSAGE_INJECTION);
1808                         toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true);
1809                     } else {
1810                         // We're going to keep promoting
1811                         toku::context promo_ctx(CTX_PROMO);
1812                         toku_pin_ftnode(ft, child_blocknum, child_fullhash, &bfe, lock_type, &child, true);
1813                     }
1814                 } else {
1815                     r = toku_maybe_pin_ftnode_clean(ft, child_blocknum, child_fullhash, lock_type, &child);
1816                     if (r != 0) {
1817                         // We couldn't get the child cheaply, so give up on promoting.
1818                         FT_STATUS_INC(FT_PRO_NUM_STOP_LOCK_CHILD, 1);
1819                         goto relock_and_push_here;
1820                     }
1821                     if (toku_ftnode_fully_in_memory(child)) {
1822                         // toku_pin_ftnode... touches the clock but toku_maybe_pin_ftnode... doesn't.
1823                         // This prevents partial eviction.
1824                         for (int i = 0; i < child->n_children; ++i) {
1825                             BP_TOUCH_CLOCK(child, i);
1826                         }
1827                     } else {
1828                         // We got the child, but it's not fully in memory.  Give up on promoting.
1829                         FT_STATUS_INC(FT_PRO_NUM_STOP_CHILD_INMEM, 1);
1830                         goto unlock_child_and_push_here;
1831                     }
1832                 }
1833             }
1834             paranoid_invariant_notnull(child);
1835 
1836             if (!just_did_split_or_merge) {
1837                 BLOCKNUM subtree_root_blocknum = subtree_root->blocknum;
1838                 uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
1839                 const bool did_split_or_merge = process_maybe_reactive_child(ft, subtree_root, child, childnum, loc);
1840                 if (did_split_or_merge) {
1841                     // Need to re-pin this node and try at this level again.
1842                     FTNODE newparent;
1843                     ftnode_fetch_extra bfe;
1844                     bfe.create_for_full_read(ft); // should be fully in memory, we just split it
1845                     toku_pin_ftnode(ft, subtree_root_blocknum, subtree_root_fullhash, &bfe, PL_READ, &newparent, true);
1846                     push_something_in_subtree(ft, newparent, -1, msg, flow_deltas, gc_info, depth, loc, true);
1847                     return;
1848                 }
1849             }
1850 
1851             if (next_loc != NEITHER_EXTREME || child->dirty() || toku_bnc_should_promote(ft, bnc)) {
1852                 push_something_in_subtree(ft, child, -1, msg, flow_deltas, gc_info, depth + 1, next_loc, false);
1853                 toku_sync_fetch_and_add(&bnc->flow[0], flow_deltas[0]);
1854                 // The recursive call unpinned the child, but
1855                 // we're responsible for unpinning subtree_root.
1856                 toku_unpin_ftnode_read_only(ft, subtree_root);
1857                 return;
1858             }
1859 
1860             FT_STATUS_INC(FT_PRO_NUM_DIDNT_WANT_PROMOTE, 1);
1861         unlock_child_and_push_here:
1862             // We locked the child, but we decided not to promote.
1863             // Unlock the child, and fall through to the next case.
1864             toku_unpin_ftnode_read_only(ft, child);
1865         }
1866     relock_and_push_here:
1867         // Give up on promoting.
1868         // We have subtree_root read-locked and we don't have a child locked.
1869         // Drop the read lock, grab a write lock, and inject here.
1870         {
1871             // Right now we have a read lock on subtree_root, but we want
1872             // to inject into it so we get a write lock instead.
1873             BLOCKNUM subtree_root_blocknum = subtree_root->blocknum;
1874             uint32_t subtree_root_fullhash = toku_cachetable_hash(ft->cf, subtree_root_blocknum);
1875             toku_unpin_ftnode_read_only(ft, subtree_root);
1876             switch (depth) {
1877             case 0:
1878                 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_0, 1); break;
1879             case 1:
1880                 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_1, 1); break;
1881             case 2:
1882                 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_2, 1); break;
1883             case 3:
1884                 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_3, 1); break;
1885             default:
1886                 FT_STATUS_INC(FT_PRO_NUM_INJECT_DEPTH_GT3, 1); break;
1887             }
1888             inject_message_at_this_blocknum(ft, subtree_root_blocknum, subtree_root_fullhash, msg, flow_deltas, gc_info);
1889         }
1890     }
1891 }
1892 
toku_ft_root_put_msg(FT ft,const ft_msg & msg,txn_gc_info * gc_info)1893 void toku_ft_root_put_msg(
1894     FT ft,
1895     const ft_msg &msg,
1896     txn_gc_info *gc_info
1897     )
1898 // Effect:
1899 //  - assign msn to message and update msn in the header
1900 //  - push the message into the ft
1901 
1902 // As of Clayface, the root blocknum is a constant, so preventing a
1903 // race between message injection and the split of a root is the job
1904 // of the cachetable's locking rules.
1905 //
1906 // We also hold the MO lock for a number of reasons, but an important
1907 // one is to make sure that a begin_checkpoint may not start while
1908 // this code is executing. A begin_checkpoint does (at least) two things
1909 // that can interfere with the operations here:
1910 //  - Copies the header to a checkpoint header. Because we may change
1911 //    the max_msn_in_ft below, we don't want the header to be copied in
1912 //    the middle of these operations.
1913 //  - Takes note of the log's LSN. Because this put operation has
1914 //    already been logged, this message injection must be included
1915 //    in any checkpoint that contains this put's logentry.
1916 //    Holding the mo lock throughout this function ensures that fact.
1917 {
1918     toku::context promo_ctx(CTX_PROMO);
1919 
1920     // blackhole fractal trees drop all messages, so do nothing.
1921     if (ft->blackhole) {
1922         return;
1923     }
1924 
1925     FTNODE node;
1926 
1927     uint32_t fullhash;
1928     CACHEKEY root_key;
1929     toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
1930     ftnode_fetch_extra bfe;
1931     bfe.create_for_full_read(ft);
1932 
1933     size_t flow_deltas[] = { message_buffer::msg_memsize_in_buffer(msg), 0 };
1934 
1935     pair_lock_type lock_type;
1936     lock_type = PL_READ; // try first for a read lock
1937     // If we need to split the root, we'll have to change from a read lock
1938     // to a write lock and check again.  We change the variable lock_type
1939     // and jump back to here.
1940  change_lock_type:
1941     // get the root node
1942     toku_pin_ftnode(ft, root_key, fullhash, &bfe, lock_type, &node, true);
1943     toku_ftnode_assert_fully_in_memory(node);
1944     paranoid_invariant(node->fullhash==fullhash);
1945     ft_verify_flags(ft, node);
1946 
1947     // First handle a reactive root.
1948     // This relocking for split algorithm will cause every message
1949     // injection thread to change lock type back and forth, when only one
1950     // of them needs to in order to handle the split.  That's not great,
1951     // but root splits are incredibly rare.
1952     enum reactivity re = toku_ftnode_get_reactivity(ft, node);
1953     switch (re) {
1954     case RE_STABLE:
1955     case RE_FUSIBLE: // cannot merge anything at the root
1956         if (lock_type != PL_READ) {
1957             // We thought we needed to split, but someone else got to
1958             // it before us.  Downgrade to a read lock.
1959             toku_unpin_ftnode_read_only(ft, node);
1960             lock_type = PL_READ;
1961             goto change_lock_type;
1962         }
1963         break;
1964     case RE_FISSIBLE:
1965         if (lock_type == PL_READ) {
1966             // Here, we only have a read lock on the root.  In order
1967             // to split it, we need a write lock, but in the course of
1968             // gaining the write lock, someone else may have gotten in
1969             // before us and split it.  So we upgrade to a write lock
1970             // and check again.
1971             toku_unpin_ftnode_read_only(ft, node);
1972             lock_type = PL_WRITE_CHEAP;
1973             goto change_lock_type;
1974         } else {
1975             // We have a write lock, now we can split.
1976             ft_init_new_root(ft, node, &node);
1977             // Then downgrade back to a read lock, and we can finally
1978             // do the injection.
1979             toku_unpin_ftnode(ft, node);
1980             lock_type = PL_READ;
1981             FT_STATUS_INC(FT_PRO_NUM_ROOT_SPLIT, 1);
1982             goto change_lock_type;
1983         }
1984         break;
1985     }
1986     // If we get to here, we have a read lock and the root doesn't
1987     // need to be split.  It's safe to inject the message.
1988     paranoid_invariant(lock_type == PL_READ);
1989     // We cannot assert that we have the read lock because frwlock asserts
1990     // that its mutex is locked when we check if there are any readers.
1991     // That wouldn't give us a strong guarantee that we have the read lock
1992     // anyway.
1993 
1994     // Now, either inject here or promote.  We decide based on a heuristic:
1995     if (node->height == 0 || !ft_msg_type_applies_once(msg.type())) {
1996         // If the root's a leaf or we're injecting a broadcast, drop the read lock and inject here.
1997         toku_unpin_ftnode_read_only(ft, node);
1998         FT_STATUS_INC(FT_PRO_NUM_ROOT_H0_INJECT, 1);
1999         inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info);
2000     } else if (node->height > 1) {
2001         // If the root's above height 1, we are definitely eligible for promotion.
2002         push_something_in_subtree(ft, node, -1, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
2003     } else {
2004         // The root's height 1.  We may be eligible for promotion here.
2005         // On the extremes, we want to promote, in the middle, we don't.
2006         int childnum = toku_ftnode_which_child(node, msg.kdbt(), ft->cmp);
2007         if (childnum == 0 || childnum == node->n_children - 1) {
2008             // On the extremes, promote.  We know which childnum we're going to, so pass that down too.
2009             push_something_in_subtree(ft, node, childnum, msg, flow_deltas, gc_info, 0, LEFT_EXTREME | RIGHT_EXTREME, false);
2010         } else {
2011             // At height 1 in the middle, don't promote, drop the read lock and inject here.
2012             toku_unpin_ftnode_read_only(ft, node);
2013             FT_STATUS_INC(FT_PRO_NUM_ROOT_H1_INJECT, 1);
2014             inject_message_at_this_blocknum(ft, root_key, fullhash, msg, flow_deltas, gc_info);
2015         }
2016     }
2017 }
2018 
2019 // TODO: Remove me, I'm boring.
ft_compare_keys(FT ft,const DBT * a,const DBT * b)2020 static int ft_compare_keys(FT ft, const DBT *a, const DBT *b)
2021 // Effect: Compare two keys using the given fractal tree's comparator/descriptor
2022 {
2023     return ft->cmp(a, b);
2024 }
2025 
bn_get_le_and_key(BASEMENTNODE bn,int idx,DBT * key)2026 static LEAFENTRY bn_get_le_and_key(BASEMENTNODE bn, int idx, DBT *key)
2027 // Effect: Gets the i'th leafentry from the given basement node and
2028 //         fill its key in *key
2029 // Requires: The i'th leafentry exists.
2030 {
2031     LEAFENTRY le;
2032     uint32_t le_len;
2033     void *le_key;
2034     int r = bn->data_buffer.fetch_klpair(idx, &le, &le_len, &le_key);
2035     invariant_zero(r);
2036     toku_fill_dbt(key, le_key, le_len);
2037     return le;
2038 }
2039 
ft_leaf_leftmost_le_and_key(FTNODE leaf,DBT * leftmost_key)2040 static LEAFENTRY ft_leaf_leftmost_le_and_key(FTNODE leaf, DBT *leftmost_key)
2041 // Effect: If a leftmost key exists in the given leaf, toku_fill_dbt()
2042 //         the key into *leftmost_key
2043 // Requires: Leaf is fully in memory and pinned for read or write.
2044 // Return: leafentry if it exists, nullptr otherwise
2045 {
2046     for (int i = 0; i < leaf->n_children; i++) {
2047         BASEMENTNODE bn = BLB(leaf, i);
2048         if (bn->data_buffer.num_klpairs() > 0) {
2049             // Get the first (leftmost) leafentry and its key
2050             return bn_get_le_and_key(bn, 0, leftmost_key);
2051         }
2052     }
2053     return nullptr;
2054 }
2055 
ft_leaf_rightmost_le_and_key(FTNODE leaf,DBT * rightmost_key)2056 static LEAFENTRY ft_leaf_rightmost_le_and_key(FTNODE leaf, DBT *rightmost_key)
2057 // Effect: If a rightmost key exists in the given leaf, toku_fill_dbt()
2058 //         the key into *rightmost_key
2059 // Requires: Leaf is fully in memory and pinned for read or write.
2060 // Return: leafentry if it exists, nullptr otherwise
2061 {
2062     for (int i = leaf->n_children - 1; i >= 0; i--) {
2063         BASEMENTNODE bn = BLB(leaf, i);
2064         size_t num_les = bn->data_buffer.num_klpairs();
2065         if (num_les > 0) {
2066             // Get the last (rightmost) leafentry and its key
2067             return bn_get_le_and_key(bn, num_les - 1, rightmost_key);
2068         }
2069     }
2070     return nullptr;
2071 }
2072 
ft_leaf_get_relative_key_pos(FT ft,FTNODE leaf,const DBT * key,bool * nondeleted_key_found,int * target_childnum)2073 static int ft_leaf_get_relative_key_pos(FT ft, FTNODE leaf, const DBT *key, bool *nondeleted_key_found, int *target_childnum)
2074 // Effect: Determines what the relative position of the given key is with
2075 //         respect to a leaf node, and if it exists.
2076 // Requires: Leaf is fully in memory and pinned for read or write.
2077 // Requires: target_childnum is non-null
2078 // Return: < 0 if key is less than the leftmost key in the leaf OR the relative position is unknown, for any reason.
2079 //         0 if key is in the bounds [leftmost_key, rightmost_key] for this leaf or the leaf is empty
2080 //         > 0 if key is greater than the rightmost key in the leaf
2081 //         *nondeleted_key_found is set (if non-null) if the target key was found and is not deleted, unmodified otherwise
2082 //         *target_childnum is set to the child that (does or would) contain the key, if calculated, unmodified otherwise
2083 {
2084     DBT rightmost_key;
2085     LEAFENTRY rightmost_le = ft_leaf_rightmost_le_and_key(leaf, &rightmost_key);
2086     if (rightmost_le == nullptr) {
2087         // If we can't get a rightmost key then the leaf is empty.
2088         // In such a case, we don't have any information about what keys would be in this leaf.
2089         // We have to assume the leaf node that would contain this key is to the left.
2090         return -1;
2091     }
2092     // We have a rightmost leafentry, so it must exist in some child node
2093     invariant(leaf->n_children > 0);
2094 
2095     int relative_pos = 0;
2096     int c = ft_compare_keys(ft, key, &rightmost_key);
2097     if (c > 0) {
2098         relative_pos = 1;
2099         *target_childnum = leaf->n_children - 1;
2100     } else if (c == 0) {
2101         if (nondeleted_key_found != nullptr && !le_latest_is_del(rightmost_le)) {
2102             *nondeleted_key_found = true;
2103         }
2104         relative_pos = 0;
2105         *target_childnum = leaf->n_children - 1;
2106     } else {
2107         // The key is less than the rightmost. It may still be in bounds if it's >= the leftmost.
2108         DBT leftmost_key;
2109         LEAFENTRY leftmost_le = ft_leaf_leftmost_le_and_key(leaf, &leftmost_key);
2110         invariant_notnull(leftmost_le); // Must exist because a rightmost exists
2111         c = ft_compare_keys(ft, key, &leftmost_key);
2112         if (c > 0) {
2113             if (nondeleted_key_found != nullptr) {
2114                 // The caller wants to know if a nondeleted key can be found.
2115                 LEAFENTRY target_le;
2116                 int childnum = toku_ftnode_which_child(leaf, key, ft->cmp);
2117                 BASEMENTNODE bn = BLB(leaf, childnum);
2118                 struct toku_msg_leafval_heaviside_extra extra(ft->cmp, key);
2119                 int r = bn->data_buffer.find_zero<decltype(extra), toku_msg_leafval_heaviside>(
2120                     extra,
2121                     &target_le,
2122                     nullptr, nullptr, nullptr
2123                     );
2124                 *target_childnum = childnum;
2125                 if (r == 0 && !le_latest_is_del(target_le)) {
2126                     *nondeleted_key_found = true;
2127                 }
2128             }
2129             relative_pos = 0;
2130         } else if (c == 0) {
2131             if (nondeleted_key_found != nullptr && !le_latest_is_del(leftmost_le)) {
2132                 *nondeleted_key_found = true;
2133             }
2134             relative_pos = 0;
2135             *target_childnum = 0;
2136         } else {
2137             relative_pos = -1;
2138         }
2139     }
2140 
2141     return relative_pos;
2142 }
2143 
2144 static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val,
2145                                          XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info);
2146 static int getf_nothing(uint32_t, const void *, uint32_t, const void *, void *, bool);
2147 
ft_maybe_insert_into_rightmost_leaf(FT ft,DBT * key,DBT * val,XIDS message_xids,enum ft_msg_type type,txn_gc_info * gc_info,bool unique)2148 static int ft_maybe_insert_into_rightmost_leaf(FT ft, DBT *key, DBT *val, XIDS message_xids, enum ft_msg_type type,
2149                                                txn_gc_info *gc_info, bool unique)
2150 // Effect: Pins the rightmost leaf node and attempts to do an insert.
2151 //         There are three reasons why we may not succeed.
2152 //         - The rightmost leaf is too full and needs a split.
2153 //         - The key to insert is not within the provable bounds of this leaf node.
2154 //         - The key is within bounds, but it already exists.
2155 // Return: 0 if this function did insert, DB_KEYEXIST if a unique key constraint exists and
2156 //         some nondeleted leafentry with the same key exists
2157 //         < 0 if this function did not insert, for a reason other than DB_KEYEXIST.
2158 // Note: Treat this function as a possible, but not necessary, optimization for insert.
2159 // Rationale: We want O(1) insertions down the rightmost path of the tree.
2160 {
2161     int r = -1;
2162 
2163     uint32_t rightmost_fullhash;
2164     BLOCKNUM rightmost_blocknum;
2165     FTNODE rightmost_leaf = nullptr;
2166 
2167     // Don't do the optimization if our heurstic suggests that
2168     // insertion pattern is not sequential.
2169     if (toku_unsafe_fetch(&ft->seqinsert_score) < FT_SEQINSERT_SCORE_THRESHOLD) {
2170         goto cleanup;
2171     }
2172 
2173     // We know the seqinsert score is high enough that we should
2174     // attempt to directly insert into the rightmost leaf. Because
2175     // the score is non-zero, the rightmost blocknum must have been
2176     // set. See inject_message_in_locked_node(), which only increases
2177     // the score if the target node blocknum == rightmost_blocknum
2178     rightmost_blocknum = ft->rightmost_blocknum;
2179     invariant(rightmost_blocknum.b != RESERVED_BLOCKNUM_NULL);
2180 
2181     // Pin the rightmost leaf with a write lock.
2182     rightmost_fullhash = toku_cachetable_hash(ft->cf, rightmost_blocknum);
2183     ftnode_fetch_extra bfe;
2184     bfe.create_for_full_read(ft);
2185     toku_pin_ftnode(ft, rightmost_blocknum, rightmost_fullhash, &bfe, PL_WRITE_CHEAP, &rightmost_leaf, true);
2186 
2187     // The rightmost blocknum never chances once it is initialized to something
2188     // other than null. Verify that the pinned node has the correct blocknum.
2189     invariant(rightmost_leaf->blocknum.b == rightmost_blocknum.b);
2190 
2191     // If the rightmost leaf is reactive, bail out out and let the normal promotion pass
2192     // take care of it. This also ensures that if any of our ancestors are reactive,
2193     // they'll be taken care of too.
2194     if (toku_ftnode_get_leaf_reactivity(rightmost_leaf, ft->h->nodesize) != RE_STABLE) {
2195         FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_REACTIVE, 1);
2196         goto cleanup;
2197     }
2198 
2199     // The groundwork has been laid for an insertion directly into the rightmost
2200     // leaf node. We know that it is pinned for write, fully in memory, has
2201     // no messages above it, and is not reactive.
2202     //
2203     // Now, two more things must be true for this insertion to actually happen:
2204     // 1. The key to insert is within the bounds of this leafnode, or to the right.
2205     // 2. If there is a uniqueness constraint, it passes.
2206     bool nondeleted_key_found;
2207     int relative_pos;
2208     int target_childnum;
2209 
2210     nondeleted_key_found = false;
2211     target_childnum = -1;
2212     relative_pos = ft_leaf_get_relative_key_pos(ft, rightmost_leaf, key,
2213                                                 unique ? &nondeleted_key_found : nullptr,
2214                                                 &target_childnum);
2215     if (relative_pos >= 0) {
2216         FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_SUCCESS, 1);
2217         if (unique && nondeleted_key_found) {
2218             r = DB_KEYEXIST;
2219         } else {
2220             ft_insert_directly_into_leaf(ft, rightmost_leaf, target_childnum,
2221                                          key, val, message_xids, type, gc_info);
2222             r = 0;
2223         }
2224     } else {
2225         FT_STATUS_INC(FT_PRO_RIGHTMOST_LEAF_SHORTCUT_FAIL_POS, 1);
2226         r = -1;
2227     }
2228 
2229 cleanup:
2230     // If we did the insert, the rightmost leaf was unpinned for us.
2231     if (r != 0 && rightmost_leaf != nullptr) {
2232         toku_unpin_ftnode(ft, rightmost_leaf);
2233     }
2234 
2235     return r;
2236 }
2237 
2238 static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type);
2239 
toku_ft_insert_unique(FT_HANDLE ft_h,DBT * key,DBT * val,TOKUTXN txn,bool do_logging)2240 int toku_ft_insert_unique(FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool do_logging) {
2241 // Effect: Insert a unique key-val pair into the fractal tree.
2242 // Return: 0 on success, DB_KEYEXIST if the overwrite constraint failed
2243     XIDS message_xids = txn != nullptr ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2244 
2245     TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2246     txn_manager_state txn_state_for_gc(txn_manager);
2247 
2248     TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2249     txn_gc_info gc_info(&txn_state_for_gc,
2250                         oldest_referenced_xid_estimate,
2251                         // no messages above us, we can implicitly promote uxrs based on this xid
2252                         oldest_referenced_xid_estimate,
2253                         true);
2254     int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, true);
2255     if (r != 0 && r != DB_KEYEXIST) {
2256         // Default to a regular unique check + insert algorithm if we couldn't
2257         // do it based on the rightmost leaf alone.
2258         int lookup_r = toku_ft_lookup(ft_h, key, getf_nothing, nullptr);
2259         if (lookup_r == DB_NOTFOUND) {
2260             toku_ft_send_insert(ft_h, key, val, message_xids, FT_INSERT, &gc_info);
2261             r = 0;
2262         } else {
2263             r = DB_KEYEXIST;
2264         }
2265     }
2266 
2267     if (r == 0) {
2268         ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, FT_INSERT);
2269         toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2270     }
2271     return r;
2272 }
2273 
2274 // Effect: Insert the key-val pair into an ft.
toku_ft_insert(FT_HANDLE ft_handle,DBT * key,DBT * val,TOKUTXN txn)2275 void toku_ft_insert (FT_HANDLE ft_handle, DBT *key, DBT *val, TOKUTXN txn) {
2276     toku_ft_maybe_insert(ft_handle, key, val, txn, false, ZERO_LSN, true, FT_INSERT);
2277 }
2278 
toku_ft_load_recovery(TOKUTXN txn,FILENUM old_filenum,char const * new_iname,int do_fsync,int do_log,LSN * load_lsn)2279 void toku_ft_load_recovery(TOKUTXN txn, FILENUM old_filenum, char const * new_iname, int do_fsync, int do_log, LSN *load_lsn) {
2280     paranoid_invariant(txn);
2281     toku_txn_force_fsync_on_commit(txn);  //If the txn commits, the commit MUST be in the log
2282                                           //before the (old) file is actually unlinked
2283     TOKULOGGER logger = toku_txn_logger(txn);
2284 
2285     BYTESTRING new_iname_bs = {.len=(uint32_t) strlen(new_iname), .data=(char*)new_iname};
2286     toku_logger_save_rollback_load(txn, old_filenum, &new_iname_bs);
2287     if (do_log && logger) {
2288         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2289         toku_log_load(logger, load_lsn, do_fsync, txn, xid, old_filenum, new_iname_bs);
2290     }
2291 }
2292 
2293 // 2954
2294 // this function handles the tasks needed to be recoverable
2295 //  - write to rollback log
2296 //  - write to recovery log
toku_ft_hot_index_recovery(TOKUTXN txn,FILENUMS filenums,int do_fsync,int do_log,LSN * hot_index_lsn)2297 void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn)
2298 {
2299     paranoid_invariant(txn);
2300     TOKULOGGER logger = toku_txn_logger(txn);
2301 
2302     // write to the rollback log
2303     toku_logger_save_rollback_hot_index(txn, &filenums);
2304     if (do_log && logger) {
2305         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2306         // write to the recovery log
2307         toku_log_hot_index(logger, hot_index_lsn, do_fsync, txn, xid, filenums);
2308     }
2309 }
2310 
2311 // Effect: Optimize the ft.
toku_ft_optimize(FT_HANDLE ft_h)2312 void toku_ft_optimize (FT_HANDLE ft_h) {
2313     TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
2314     if (logger) {
2315         TXNID oldest = toku_txn_manager_get_oldest_living_xid(logger->txn_manager);
2316 
2317         XIDS root_xids = toku_xids_get_root_xids();
2318         XIDS message_xids;
2319         if (oldest == TXNID_NONE_LIVING) {
2320             message_xids = root_xids;
2321         }
2322         else {
2323             int r = toku_xids_create_child(root_xids, &message_xids, oldest);
2324             invariant(r == 0);
2325         }
2326 
2327         DBT key;
2328         DBT val;
2329         toku_init_dbt(&key);
2330         toku_init_dbt(&val);
2331         ft_msg msg(&key, &val, FT_OPTIMIZE, ZERO_MSN, message_xids);
2332 
2333         TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2334         txn_manager_state txn_state_for_gc(txn_manager);
2335 
2336         TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2337         txn_gc_info gc_info(&txn_state_for_gc,
2338                             oldest_referenced_xid_estimate,
2339                             // no messages above us, we can implicitly promote uxrs based on this xid
2340                             oldest_referenced_xid_estimate,
2341                             true);
2342         toku_ft_root_put_msg(ft_h->ft, msg, &gc_info);
2343         toku_xids_destroy(&message_xids);
2344     }
2345 }
2346 
toku_ft_load(FT_HANDLE ft_handle,TOKUTXN txn,char const * new_iname,int do_fsync,LSN * load_lsn)2347 void toku_ft_load(FT_HANDLE ft_handle, TOKUTXN txn, char const * new_iname, int do_fsync, LSN *load_lsn) {
2348     FILENUM old_filenum = toku_cachefile_filenum(ft_handle->ft->cf);
2349     int do_log = 1;
2350     toku_ft_load_recovery(txn, old_filenum, new_iname, do_fsync, do_log, load_lsn);
2351 }
2352 
2353 // ft actions for logging hot index filenums
toku_ft_hot_index(FT_HANDLE ft_handle,TOKUTXN txn,FILENUMS filenums,int do_fsync,LSN * lsn)2354 void toku_ft_hot_index(FT_HANDLE ft_handle __attribute__ ((unused)), TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn) {
2355     int do_log = 1;
2356     toku_ft_hot_index_recovery(txn, filenums, do_fsync, do_log, lsn);
2357 }
2358 
2359 void
toku_ft_log_put(TOKUTXN txn,FT_HANDLE ft_handle,const DBT * key,const DBT * val)2360 toku_ft_log_put (TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key, const DBT *val) {
2361     TOKULOGGER logger = toku_txn_logger(txn);
2362     if (logger) {
2363         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2364         BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2365         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2366         toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs, valbs);
2367     }
2368 }
2369 
2370 void
toku_ft_log_put_multiple(TOKUTXN txn,FT_HANDLE src_ft,FT_HANDLE * fts,uint32_t num_fts,const DBT * key,const DBT * val)2371 toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) {
2372     assert(txn);
2373     assert(num_fts > 0);
2374     TOKULOGGER logger = toku_txn_logger(txn);
2375     if (logger) {
2376         FILENUM         fnums[num_fts];
2377         uint32_t i;
2378         for (i = 0; i < num_fts; i++) {
2379             fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf);
2380         }
2381         FILENUMS filenums = {.num = num_fts, .filenums = fnums};
2382         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2383         BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2384         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2385         FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2386         toku_log_enq_insert_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2387     }
2388 }
2389 
toku_ft_get_txn_manager(FT_HANDLE ft_h)2390 TXN_MANAGER toku_ft_get_txn_manager(FT_HANDLE ft_h) {
2391     TOKULOGGER logger = toku_cachefile_logger(ft_h->ft->cf);
2392     return logger != nullptr ? toku_logger_get_txn_manager(logger) : nullptr;
2393 }
2394 
toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h)2395 TXNID toku_ft_get_oldest_referenced_xid_estimate(FT_HANDLE ft_h) {
2396     TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2397     return txn_manager != nullptr ? toku_txn_manager_get_oldest_referenced_xid_estimate(txn_manager) : TXNID_NONE;
2398 }
2399 
ft_txn_log_insert(FT ft,DBT * key,DBT * val,TOKUTXN txn,bool do_logging,enum ft_msg_type type)2400 static void ft_txn_log_insert(FT ft, DBT *key, DBT *val, TOKUTXN txn, bool do_logging, enum ft_msg_type type) {
2401     paranoid_invariant(type == FT_INSERT || type == FT_INSERT_NO_OVERWRITE);
2402 
2403     //By default use committed messages
2404     TXNID_PAIR xid = toku_txn_get_txnid(txn);
2405     if (txn) {
2406         BYTESTRING keybs = {key->size, (char *) key->data};
2407         toku_logger_save_rollback_cmdinsert(txn, toku_cachefile_filenum(ft->cf), &keybs);
2408         toku_txn_maybe_note_ft(txn, ft);
2409     }
2410     TOKULOGGER logger = toku_txn_logger(txn);
2411     if (do_logging && logger) {
2412         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2413         BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2414         if (type == FT_INSERT) {
2415             toku_log_enq_insert(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs);
2416         }
2417         else {
2418             toku_log_enq_insert_no_overwrite(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft->cf), xid, keybs, valbs);
2419         }
2420     }
2421 }
2422 
toku_ft_maybe_insert(FT_HANDLE ft_h,DBT * key,DBT * val,TOKUTXN txn,bool oplsn_valid,LSN oplsn,bool do_logging,enum ft_msg_type type)2423 void toku_ft_maybe_insert (FT_HANDLE ft_h, DBT *key, DBT *val, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging, enum ft_msg_type type) {
2424     ft_txn_log_insert(ft_h->ft, key, val, txn, do_logging, type);
2425 
2426     LSN treelsn;
2427     if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2428         // do nothing
2429     } else {
2430         XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2431 
2432         TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2433         txn_manager_state txn_state_for_gc(txn_manager);
2434 
2435         TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2436         txn_gc_info gc_info(&txn_state_for_gc,
2437                             oldest_referenced_xid_estimate,
2438                             // no messages above us, we can implicitly promote uxrs based on this xid
2439                             oldest_referenced_xid_estimate,
2440                             txn != nullptr ? !txn->for_recovery : false);
2441         int r = ft_maybe_insert_into_rightmost_leaf(ft_h->ft, key, val, message_xids, FT_INSERT, &gc_info, false);
2442         if (r != 0) {
2443             toku_ft_send_insert(ft_h, key, val, message_xids, type, &gc_info);
2444         }
2445         toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2446     }
2447 }
2448 
ft_insert_directly_into_leaf(FT ft,FTNODE leaf,int target_childnum,DBT * key,DBT * val,XIDS message_xids,enum ft_msg_type type,txn_gc_info * gc_info)2449 static void ft_insert_directly_into_leaf(FT ft, FTNODE leaf, int target_childnum, DBT *key, DBT *val,
2450                                          XIDS message_xids, enum ft_msg_type type, txn_gc_info *gc_info)
2451 // Effect: Insert directly into a leaf node a fractal tree. Does not do any logging.
2452 // Requires: Leaf is fully in memory and pinned for write.
2453 // Requires: If this insertion were to happen through the root node, the promotion
2454 //           algorithm would have selected the given leaf node as the point of injection.
2455 //           That means this function relies on the current implementation of promotion.
2456 {
2457     ft_msg msg(key, val, type, ZERO_MSN, message_xids);
2458     size_t flow_deltas[] = { 0, 0 };
2459     inject_message_in_locked_node(ft, leaf, target_childnum, msg, flow_deltas, gc_info);
2460 }
2461 
2462 static void
ft_send_update_msg(FT_HANDLE ft_h,const ft_msg & msg,TOKUTXN txn)2463 ft_send_update_msg(FT_HANDLE ft_h, const ft_msg &msg, TOKUTXN txn) {
2464     TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2465     txn_manager_state txn_state_for_gc(txn_manager);
2466 
2467     TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2468     txn_gc_info gc_info(&txn_state_for_gc,
2469                         oldest_referenced_xid_estimate,
2470                         // no messages above us, we can implicitly promote uxrs based on this xid
2471                         oldest_referenced_xid_estimate,
2472                         txn != nullptr ? !txn->for_recovery : false);
2473     toku_ft_root_put_msg(ft_h->ft, msg, &gc_info);
2474 }
2475 
toku_ft_maybe_update(FT_HANDLE ft_h,const DBT * key,const DBT * update_function_extra,TOKUTXN txn,bool oplsn_valid,LSN oplsn,bool do_logging)2476 void toku_ft_maybe_update(FT_HANDLE ft_h,
2477                           const DBT *key,
2478                           const DBT *update_function_extra,
2479                           TOKUTXN txn,
2480                           bool oplsn_valid,
2481                           LSN oplsn,
2482                           bool do_logging) {
2483     TXNID_PAIR xid = toku_txn_get_txnid(txn);
2484     if (txn) {
2485         BYTESTRING keybs = {key->size, (char *)key->data};
2486         toku_logger_save_rollback_cmdupdate(
2487             txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2488         toku_txn_maybe_note_ft(txn, ft_h->ft);
2489     }
2490 
2491     TOKULOGGER logger;
2492     logger = toku_txn_logger(txn);
2493     if (do_logging && logger) {
2494         BYTESTRING keybs = {.len = key->size, .data = (char *)key->data};
2495         BYTESTRING extrabs = {.len = update_function_extra->size,
2496                               .data = (char *)update_function_extra->data};
2497         toku_log_enq_update(logger,
2498                             NULL,
2499                             0,
2500                             txn,
2501                             toku_cachefile_filenum(ft_h->ft->cf),
2502                             xid,
2503                             keybs,
2504                             extrabs);
2505     }
2506 
2507     LSN treelsn;
2508     if (oplsn_valid &&
2509         oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2510         // do nothing
2511     } else {
2512         XIDS message_xids =
2513             txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2514         ft_msg msg(
2515             key, update_function_extra, FT_UPDATE, ZERO_MSN, message_xids);
2516         ft_send_update_msg(ft_h, msg, txn);
2517     }
2518     // updates get converted to insert messages, which should do a -1 on the
2519     // logical row count when the messages are permanently applied
2520     toku_ft_adjust_logical_row_count(ft_h->ft, 1);
2521 }
2522 
toku_ft_maybe_update_broadcast(FT_HANDLE ft_h,const DBT * update_function_extra,TOKUTXN txn,bool oplsn_valid,LSN oplsn,bool do_logging,bool is_resetting_op)2523 void toku_ft_maybe_update_broadcast(FT_HANDLE ft_h, const DBT *update_function_extra,
2524                                 TOKUTXN txn, bool oplsn_valid, LSN oplsn,
2525                                 bool do_logging, bool is_resetting_op) {
2526     TXNID_PAIR xid = toku_txn_get_txnid(txn);
2527     uint8_t  resetting = is_resetting_op ? 1 : 0;
2528     if (txn) {
2529         toku_logger_save_rollback_cmdupdatebroadcast(txn, toku_cachefile_filenum(ft_h->ft->cf), resetting);
2530         toku_txn_maybe_note_ft(txn, ft_h->ft);
2531     }
2532 
2533     TOKULOGGER logger;
2534     logger = toku_txn_logger(txn);
2535     if (do_logging && logger) {
2536         BYTESTRING extrabs = {.len=update_function_extra->size,
2537                               .data = (char *) update_function_extra->data};
2538         toku_log_enq_updatebroadcast(logger, NULL, 0, txn,
2539                                          toku_cachefile_filenum(ft_h->ft->cf),
2540                                          xid, extrabs, resetting);
2541     }
2542 
2543     //TODO(yoni): remove treelsn here and similar calls (no longer being used)
2544     LSN treelsn;
2545     if (oplsn_valid &&
2546         oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2547 
2548     } else {
2549         DBT empty_dbt;
2550         XIDS message_xids = txn ? toku_txn_get_xids(txn) : toku_xids_get_root_xids();
2551         ft_msg msg(toku_init_dbt(&empty_dbt), update_function_extra, FT_UPDATE_BROADCAST_ALL, ZERO_MSN, message_xids);
2552         ft_send_update_msg(ft_h, msg, txn);
2553     }
2554 }
2555 
toku_ft_send_insert(FT_HANDLE ft_handle,DBT * key,DBT * val,XIDS xids,enum ft_msg_type type,txn_gc_info * gc_info)2556 void toku_ft_send_insert(FT_HANDLE ft_handle, DBT *key, DBT *val, XIDS xids, enum ft_msg_type type, txn_gc_info *gc_info) {
2557     ft_msg msg(key, val, type, ZERO_MSN, xids);
2558     toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2559 }
2560 
toku_ft_send_commit_any(FT_HANDLE ft_handle,DBT * key,XIDS xids,txn_gc_info * gc_info)2561 void toku_ft_send_commit_any(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) {
2562     DBT val;
2563     ft_msg msg(key, toku_init_dbt(&val), FT_COMMIT_ANY, ZERO_MSN, xids);
2564     toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2565 }
2566 
toku_ft_delete(FT_HANDLE ft_handle,DBT * key,TOKUTXN txn)2567 void toku_ft_delete(FT_HANDLE ft_handle, DBT *key, TOKUTXN txn) {
2568     toku_ft_maybe_delete(ft_handle, key, txn, false, ZERO_LSN, true);
2569 }
2570 
2571 void
toku_ft_log_del(TOKUTXN txn,FT_HANDLE ft_handle,const DBT * key)2572 toku_ft_log_del(TOKUTXN txn, FT_HANDLE ft_handle, const DBT *key) {
2573     TOKULOGGER logger = toku_txn_logger(txn);
2574     if (logger) {
2575         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2576         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2577         toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_handle->ft->cf), xid, keybs);
2578     }
2579 }
2580 
2581 void
toku_ft_log_del_multiple(TOKUTXN txn,FT_HANDLE src_ft,FT_HANDLE * fts,uint32_t num_fts,const DBT * key,const DBT * val)2582 toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *fts, uint32_t num_fts, const DBT *key, const DBT *val) {
2583     assert(txn);
2584     assert(num_fts > 0);
2585     TOKULOGGER logger = toku_txn_logger(txn);
2586     if (logger) {
2587         FILENUM         fnums[num_fts];
2588         uint32_t i;
2589         for (i = 0; i < num_fts; i++) {
2590             fnums[i] = toku_cachefile_filenum(fts[i]->ft->cf);
2591         }
2592         FILENUMS filenums = {.num = num_fts, .filenums = fnums};
2593         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2594         BYTESTRING valbs = {.len=val->size, .data=(char *) val->data};
2595         TXNID_PAIR xid = toku_txn_get_txnid(txn);
2596         FILENUM src_filenum = src_ft ? toku_cachefile_filenum(src_ft->ft->cf) : FILENUM_NONE;
2597         toku_log_enq_delete_multiple(logger, (LSN*)0, 0, txn, src_filenum, filenums, xid, keybs, valbs);
2598     }
2599 }
2600 
toku_ft_maybe_delete(FT_HANDLE ft_h,DBT * key,TOKUTXN txn,bool oplsn_valid,LSN oplsn,bool do_logging)2601 void toku_ft_maybe_delete(FT_HANDLE ft_h, DBT *key, TOKUTXN txn, bool oplsn_valid, LSN oplsn, bool do_logging) {
2602     XIDS message_xids = toku_xids_get_root_xids(); //By default use committed messages
2603     TXNID_PAIR xid = toku_txn_get_txnid(txn);
2604     if (txn) {
2605         BYTESTRING keybs = {key->size, (char *) key->data};
2606         toku_logger_save_rollback_cmddelete(txn, toku_cachefile_filenum(ft_h->ft->cf), &keybs);
2607         toku_txn_maybe_note_ft(txn, ft_h->ft);
2608         message_xids = toku_txn_get_xids(txn);
2609     }
2610     TOKULOGGER logger = toku_txn_logger(txn);
2611     if (do_logging && logger) {
2612         BYTESTRING keybs = {.len=key->size, .data=(char *) key->data};
2613         toku_log_enq_delete_any(logger, (LSN*)0, 0, txn, toku_cachefile_filenum(ft_h->ft->cf), xid, keybs);
2614     }
2615 
2616     LSN treelsn;
2617     if (oplsn_valid && oplsn.lsn <= (treelsn = toku_ft_checkpoint_lsn(ft_h->ft)).lsn) {
2618         // do nothing
2619     } else {
2620         TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
2621         txn_manager_state txn_state_for_gc(txn_manager);
2622 
2623         TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
2624         txn_gc_info gc_info(&txn_state_for_gc,
2625                             oldest_referenced_xid_estimate,
2626                             // no messages above us, we can implicitly promote uxrs based on this xid
2627                             oldest_referenced_xid_estimate,
2628                             txn != nullptr ? !txn->for_recovery : false);
2629         toku_ft_send_delete(ft_h, key, message_xids, &gc_info);
2630         toku_ft_adjust_logical_row_count(ft_h->ft, -1);
2631     }
2632 }
2633 
toku_ft_send_delete(FT_HANDLE ft_handle,DBT * key,XIDS xids,txn_gc_info * gc_info)2634 void toku_ft_send_delete(FT_HANDLE ft_handle, DBT *key, XIDS xids, txn_gc_info *gc_info) {
2635     DBT val; toku_init_dbt(&val);
2636     ft_msg msg(key, toku_init_dbt(&val), FT_DELETE_ANY, ZERO_MSN, xids);
2637     toku_ft_root_put_msg(ft_handle->ft, msg, gc_info);
2638 }
2639 
2640 /* ******************** open,close and create  ********************** */
2641 
2642 // Test only function (not used in running system). This one has no env
toku_open_ft_handle(const char * fname,int is_create,FT_HANDLE * ft_handle_p,int nodesize,int basementnodesize,enum toku_compression_method compression_method,CACHETABLE cachetable,TOKUTXN txn,int (* compare_fun)(DB *,const DBT *,const DBT *))2643 int toku_open_ft_handle (const char *fname, int is_create, FT_HANDLE *ft_handle_p, int nodesize,
2644                    int basementnodesize,
2645                    enum toku_compression_method compression_method,
2646                    CACHETABLE cachetable, TOKUTXN txn,
2647                    int (*compare_fun)(DB *, const DBT*,const DBT*)) {
2648     FT_HANDLE ft_handle;
2649     const int only_create = 0;
2650 
2651     toku_ft_handle_create(&ft_handle);
2652     toku_ft_handle_set_nodesize(ft_handle, nodesize);
2653     toku_ft_handle_set_basementnodesize(ft_handle, basementnodesize);
2654     toku_ft_handle_set_compression_method(ft_handle, compression_method);
2655     toku_ft_handle_set_fanout(ft_handle, 16);
2656     toku_ft_set_bt_compare(ft_handle, compare_fun);
2657 
2658     int r = toku_ft_handle_open(ft_handle, fname, is_create, only_create, cachetable, txn);
2659     if (r != 0) {
2660         return r;
2661     }
2662 
2663     *ft_handle_p = ft_handle;
2664     return r;
2665 }
2666 
2667 static bool use_direct_io = true;
2668 
toku_ft_set_direct_io(bool direct_io_on)2669 void toku_ft_set_direct_io (bool direct_io_on) {
2670     use_direct_io = direct_io_on;
2671 }
2672 
ft_open_maybe_direct(const char * filename,int oflag,int mode)2673 static inline int ft_open_maybe_direct(const char *filename,
2674                                        int oflag,
2675                                        int mode) {
2676     if (use_direct_io) {
2677         return toku_os_open_direct(
2678             filename, oflag, mode, *tokudb_file_data_key);
2679     } else {
2680         return toku_os_open(filename, oflag, mode, *tokudb_file_data_key);
2681     }
2682 }
2683 
2684 static const mode_t file_mode = S_IRUSR+S_IWUSR+S_IRGRP+S_IWGRP+S_IROTH+S_IWOTH;
2685 
toku_file_is_root(const char * path,const char * last_slash)2686 inline bool toku_file_is_root(const char *path, const char *last_slash) {
2687     return last_slash == path;
2688 }
2689 
toku_file_get_parent_dir(const char * path)2690 static std::unique_ptr<char[], decltype(&toku_free)> toku_file_get_parent_dir(
2691     const char *path) {
2692     std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free);
2693 
2694     bool has_trailing_slash = false;
2695 
2696     /* Find the offset of the last slash */
2697     const char *last_slash = strrchr(path, OS_PATH_SEPARATOR);
2698 
2699     if (!last_slash) {
2700         /* No slash in the path, return NULL */
2701         return result;
2702     }
2703 
2704     /* Ok, there is a slash. Is there anything after it? */
2705     if (static_cast<size_t>(last_slash - path + 1) == strlen(path)) {
2706         has_trailing_slash = true;
2707     }
2708 
2709     /* Reduce repetative slashes. */
2710     while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) {
2711         last_slash--;
2712     }
2713 
2714     /* Check for the root of a drive. */
2715     if (toku_file_is_root(path, last_slash)) {
2716         return result;
2717     }
2718 
2719     /* If a trailing slash prevented the first strrchr() from trimming
2720     the last component of the path, trim that component now. */
2721     if (has_trailing_slash) {
2722         /* Back up to the previous slash. */
2723         last_slash--;
2724         while (last_slash > path && last_slash[0] != OS_PATH_SEPARATOR) {
2725             last_slash--;
2726         }
2727 
2728         /* Reduce repetative slashes. */
2729         while (last_slash > path && last_slash[-1] == OS_PATH_SEPARATOR) {
2730             last_slash--;
2731         }
2732     }
2733 
2734     /* Check for the root of a drive. */
2735     if (toku_file_is_root(path, last_slash)) {
2736         return result;
2737     }
2738 
2739     result.reset(toku_strndup(path, last_slash - path));
2740     return result;
2741 }
2742 
toku_create_subdirs_if_needed(const char * path)2743 bool toku_create_subdirs_if_needed(const char *path) {
2744     static const mode_t dir_mode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP |
2745                                    S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH;
2746 
2747     toku_struct_stat stat;
2748     bool subdir_exists = true;
2749     auto subdir = toku_file_get_parent_dir(path);
2750 
2751     if (!subdir.get())
2752         return true;
2753 
2754     if (toku_stat(subdir.get(), &stat, toku_uninstrumented) == -1) {
2755         if (ENOENT == get_error_errno())
2756             subdir_exists = false;
2757         else
2758             return false;
2759     }
2760 
2761     if (subdir_exists) {
2762         if (!S_ISDIR(stat.st_mode))
2763             return false;
2764         return true;
2765     }
2766 
2767     if (!toku_create_subdirs_if_needed(subdir.get()))
2768         return false;
2769 
2770     if (toku_os_mkdir(subdir.get(), dir_mode))
2771         return false;
2772 
2773     return true;
2774 }
2775 
2776 // open a file for use by the ft
2777 // Requires:  File does not exist.
ft_create_file(FT_HANDLE UU (ft_handle),const char * fname,int * fdp)2778 static int ft_create_file(FT_HANDLE UU(ft_handle), const char *fname, int *fdp) {
2779     int r;
2780     int fd;
2781     int er;
2782     if (!toku_create_subdirs_if_needed(fname))
2783         return get_error_errno();
2784     fd = ft_open_maybe_direct(fname, O_RDWR | O_BINARY, file_mode);
2785     assert(fd==-1);
2786     if ((er = get_maybe_error_errno()) != ENOENT) {
2787         return er;
2788     }
2789     fd = ft_open_maybe_direct(fname, O_RDWR | O_CREAT | O_BINARY, file_mode);
2790     if (fd==-1) {
2791         r = get_error_errno();
2792         return r;
2793     }
2794 
2795     r = toku_fsync_directory(fname);
2796     if (r == 0) {
2797         *fdp = fd;
2798     } else {
2799         int rr = close(fd);
2800         assert_zero(rr);
2801     }
2802     return r;
2803 }
2804 
2805 // open a file for use by the ft.  if the file does not exist, error
ft_open_file(const char * fname,int * fdp,bool rw)2806 static int ft_open_file(const char *fname, int *fdp, bool rw) {
2807     int fd;
2808     fd = ft_open_maybe_direct(fname, (rw ? O_RDWR : O_RDONLY) | O_BINARY, file_mode);
2809     if (fd==-1) {
2810         return get_error_errno();
2811     }
2812     *fdp = fd;
2813     return 0;
2814 }
2815 
2816 void
toku_ft_handle_set_compression_method(FT_HANDLE t,enum toku_compression_method method)2817 toku_ft_handle_set_compression_method(FT_HANDLE t, enum toku_compression_method method)
2818 {
2819     if (t->ft) {
2820         toku_ft_set_compression_method(t->ft, method);
2821     }
2822     else {
2823         t->options.compression_method = method;
2824     }
2825 }
2826 
2827 void
toku_ft_handle_get_compression_method(FT_HANDLE t,enum toku_compression_method * methodp)2828 toku_ft_handle_get_compression_method(FT_HANDLE t, enum toku_compression_method *methodp)
2829 {
2830     if (t->ft) {
2831         toku_ft_get_compression_method(t->ft, methodp);
2832     }
2833     else {
2834         *methodp = t->options.compression_method;
2835     }
2836 }
2837 
2838 void
toku_ft_handle_set_fanout(FT_HANDLE ft_handle,unsigned int fanout)2839 toku_ft_handle_set_fanout(FT_HANDLE ft_handle, unsigned int fanout)
2840 {
2841     if (ft_handle->ft) {
2842         toku_ft_set_fanout(ft_handle->ft, fanout);
2843     }
2844     else {
2845         ft_handle->options.fanout = fanout;
2846     }
2847 }
2848 
2849 void
toku_ft_handle_get_fanout(FT_HANDLE ft_handle,unsigned int * fanout)2850 toku_ft_handle_get_fanout(FT_HANDLE ft_handle, unsigned int *fanout)
2851 {
2852     if (ft_handle->ft) {
2853         toku_ft_get_fanout(ft_handle->ft, fanout);
2854     }
2855     else {
2856         *fanout = ft_handle->options.fanout;
2857     }
2858 }
2859 
2860 // The memcmp magic byte may be set on a per fractal tree basis to communicate
2861 // that if two keys begin with this byte, they may be compared with the builtin
2862 // key comparison function. This greatly optimizes certain in-memory workloads,
2863 // such as lookups by OID primary key in TokuMX.
toku_ft_handle_set_memcmp_magic(FT_HANDLE ft_handle,uint8_t magic)2864 int toku_ft_handle_set_memcmp_magic(FT_HANDLE ft_handle, uint8_t magic) {
2865     if (magic == comparator::MEMCMP_MAGIC_NONE) {
2866         return EINVAL;
2867     }
2868     if (ft_handle->ft != nullptr) {
2869         // if the handle is already open, then we cannot set the memcmp magic
2870         // (because it may or may not have been set by someone else already)
2871         return EINVAL;
2872     }
2873     ft_handle->options.memcmp_magic = magic;
2874     return 0;
2875 }
2876 
2877 static int
verify_builtin_comparisons_consistent(FT_HANDLE t,uint32_t flags)2878 verify_builtin_comparisons_consistent(FT_HANDLE t, uint32_t flags) {
2879     if ((flags & TOKU_DB_KEYCMP_BUILTIN) && (t->options.compare_fun != toku_builtin_compare_fun)) {
2880         return EINVAL;
2881     }
2882     return 0;
2883 }
2884 
2885 //
2886 // See comments in toku_db_change_descriptor to understand invariants
2887 // in the system when this function is called
2888 //
toku_ft_change_descriptor(FT_HANDLE ft_h,const DBT * old_descriptor,const DBT * new_descriptor,bool do_log,TOKUTXN txn,bool update_cmp_descriptor)2889 void toku_ft_change_descriptor(
2890     FT_HANDLE ft_h,
2891     const DBT* old_descriptor,
2892     const DBT* new_descriptor,
2893     bool do_log,
2894     TOKUTXN txn,
2895     bool update_cmp_descriptor
2896     )
2897 {
2898     DESCRIPTOR_S new_d;
2899 
2900     // if running with txns, save to rollback + write to recovery log
2901     if (txn) {
2902         // put information into rollback file
2903         BYTESTRING old_desc_bs = { old_descriptor->size, (char *) old_descriptor->data };
2904         BYTESTRING new_desc_bs = { new_descriptor->size, (char *) new_descriptor->data };
2905         toku_logger_save_rollback_change_fdescriptor(
2906             txn,
2907             toku_cachefile_filenum(ft_h->ft->cf),
2908             &old_desc_bs
2909             );
2910         toku_txn_maybe_note_ft(txn, ft_h->ft);
2911 
2912         if (do_log) {
2913             TOKULOGGER logger = toku_txn_logger(txn);
2914             TXNID_PAIR xid = toku_txn_get_txnid(txn);
2915             toku_log_change_fdescriptor(
2916                 logger, NULL, 0,
2917                 txn,
2918                 toku_cachefile_filenum(ft_h->ft->cf),
2919                 xid,
2920                 old_desc_bs,
2921                 new_desc_bs,
2922                 update_cmp_descriptor
2923                 );
2924         }
2925     }
2926 
2927     // write new_descriptor to header
2928     new_d.dbt = *new_descriptor;
2929     toku_ft_update_descriptor(ft_h->ft, &new_d);
2930     // very infrequent operation, worth precise threadsafe count
2931     FT_STATUS_INC(FT_DESCRIPTOR_SET, 1);
2932 
2933     if (update_cmp_descriptor) {
2934         toku_ft_update_cmp_descriptor(ft_h->ft);
2935     }
2936 }
2937 
2938 static void
toku_ft_handle_inherit_options(FT_HANDLE t,FT ft)2939 toku_ft_handle_inherit_options(FT_HANDLE t, FT ft) {
2940     struct ft_options options = {
2941         .nodesize = ft->h->nodesize,
2942         .basementnodesize = ft->h->basementnodesize,
2943         .compression_method = ft->h->compression_method,
2944         .fanout = ft->h->fanout,
2945         .flags = ft->h->flags,
2946         .memcmp_magic = ft->cmp.get_memcmp_magic(),
2947         .compare_fun = ft->cmp.get_compare_func(),
2948         .update_fun = ft->update_fun
2949     };
2950     t->options = options;
2951     t->did_set_flags = true;
2952 }
2953 
2954 // This is the actual open, used for various purposes, such as normal use, recovery, and redirect.
2955 // fname_in_env is the iname, relative to the env_dir  (data_dir is already in iname as prefix).
2956 // The checkpointed version (checkpoint_lsn) of the dictionary must be no later than max_acceptable_lsn .
2957 // Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
2958 static int
ft_handle_open(FT_HANDLE ft_h,const char * fname_in_env,int is_create,int only_create,CACHETABLE cachetable,TOKUTXN txn,FILENUM use_filenum,DICTIONARY_ID use_dictionary_id,LSN max_acceptable_lsn,bool open_rw=true)2959 ft_handle_open(FT_HANDLE ft_h, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, DICTIONARY_ID use_dictionary_id, LSN max_acceptable_lsn, bool open_rw = true) {
2960     int r;
2961     bool txn_created = false;
2962     char *fname_in_cwd = NULL;
2963     CACHEFILE cf = NULL;
2964     FT ft = NULL;
2965     bool did_create = false;
2966     bool was_already_open = false;
2967 
2968     toku_ft_open_close_lock();
2969 
2970     if (ft_h->did_set_flags) {
2971         r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
2972         if (r!=0) { goto exit; }
2973     }
2974 
2975     assert(is_create || !only_create);
2976     FILENUM reserved_filenum;
2977     reserved_filenum = use_filenum;
2978     fname_in_cwd = toku_cachetable_get_fname_in_cwd(cachetable, fname_in_env);
2979     {
2980         int fd = -1;
2981         r = ft_open_file(fname_in_cwd, &fd, open_rw);
2982         if (reserved_filenum.fileid == FILENUM_NONE.fileid) {
2983             reserved_filenum = toku_cachetable_reserve_filenum(cachetable);
2984         }
2985         if (r==ENOENT && is_create) {
2986             did_create = true;
2987             if (txn) {
2988                 BYTESTRING bs = { .len=(uint32_t) strlen(fname_in_env), .data = (char*)fname_in_env };
2989                 toku_logger_save_rollback_fcreate(txn, reserved_filenum, &bs); // bs is a copy of the fname relative to the environment
2990             }
2991             txn_created = (bool)(txn!=NULL);
2992             toku_logger_log_fcreate(txn, fname_in_env, reserved_filenum, file_mode, ft_h->options.flags, ft_h->options.nodesize, ft_h->options.basementnodesize, ft_h->options.compression_method);
2993             r = ft_create_file(ft_h, fname_in_cwd, &fd);
2994             if (r) { goto exit; }
2995         }
2996         if (r) { goto exit; }
2997         r=toku_cachetable_openfd_with_filenum(&cf, cachetable, fd, fname_in_env, reserved_filenum, &was_already_open);
2998         if (r) { goto exit; }
2999     }
3000     assert(ft_h->options.nodesize>0);
3001     if (is_create) {
3002         r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
3003         if (r==TOKUDB_DICTIONARY_NO_HEADER) {
3004             toku_ft_create(&ft, &ft_h->options, cf, txn);
3005         }
3006         else if (r!=0) {
3007             goto exit;
3008         }
3009         else if (only_create) {
3010             assert_zero(r);
3011             r = EEXIST;
3012             goto exit;
3013         }
3014         // if we get here, then is_create was true but only_create was false,
3015         // so it is ok for toku_read_ft_and_store_in_cachefile to have read
3016         // the header via toku_read_ft_and_store_in_cachefile
3017     } else {
3018         r = toku_read_ft_and_store_in_cachefile(ft_h, cf, max_acceptable_lsn, &ft);
3019         if (r) { goto exit; }
3020     }
3021     if (!ft_h->did_set_flags) {
3022         r = verify_builtin_comparisons_consistent(ft_h, ft_h->options.flags);
3023         if (r) { goto exit; }
3024     } else if (ft_h->options.flags != ft->h->flags) {                  /* if flags have been set then flags must match */
3025         r = EINVAL;
3026         goto exit;
3027     }
3028 
3029     // Ensure that the memcmp magic bits are consistent, if set.
3030     if (ft->cmp.get_memcmp_magic() != toku::comparator::MEMCMP_MAGIC_NONE &&
3031         ft_h->options.memcmp_magic != toku::comparator::MEMCMP_MAGIC_NONE &&
3032         ft_h->options.memcmp_magic != ft->cmp.get_memcmp_magic()) {
3033         r = EINVAL;
3034         goto exit;
3035     }
3036     toku_ft_handle_inherit_options(ft_h, ft);
3037 
3038     if (!was_already_open) {
3039         if (!did_create) { //Only log the fopen that OPENs the file.  If it was already open, don't log.
3040             toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(cf), ft_h->options.flags);
3041         }
3042     }
3043     int use_reserved_dict_id;
3044     use_reserved_dict_id = use_dictionary_id.dictid != DICTIONARY_ID_NONE.dictid;
3045     if (!was_already_open) {
3046         DICTIONARY_ID dict_id;
3047         if (use_reserved_dict_id) {
3048             dict_id = use_dictionary_id;
3049         }
3050         else {
3051             dict_id = next_dict_id();
3052         }
3053         ft->dict_id = dict_id;
3054     }
3055     else {
3056         // dict_id is already in header
3057         if (use_reserved_dict_id) {
3058             assert(ft->dict_id.dictid == use_dictionary_id.dictid);
3059         }
3060     }
3061     assert(ft);
3062     assert(ft->dict_id.dictid != DICTIONARY_ID_NONE.dictid);
3063     assert(ft->dict_id.dictid < dict_id_serial);
3064 
3065     // important note here,
3066     // after this point, where we associate the header
3067     // with the ft_handle, the function is not allowed to fail
3068     // Code that handles failure (located below "exit"),
3069     // depends on this
3070     toku_ft_note_ft_handle_open(ft, ft_h);
3071     if (txn_created) {
3072         assert(txn);
3073         toku_txn_maybe_note_ft(txn, ft);
3074     }
3075 
3076     // Opening an ft may restore to previous checkpoint.
3077     // Truncate if necessary.
3078     {
3079         int fd = toku_cachefile_get_fd (ft->cf);
3080         ft->blocktable.maybe_truncate_file_on_open(fd);
3081     }
3082 
3083     r = 0;
3084 exit:
3085     if (fname_in_cwd) {
3086         toku_free(fname_in_cwd);
3087     }
3088     if (r != 0 && cf) {
3089         if (ft) {
3090             // we only call toku_ft_note_ft_handle_open
3091             // when the function succeeds, so if we are here,
3092             // then that means we have a reference to the header
3093             // but we have not linked it to this ft. So,
3094             // we can simply try to remove the header.
3095             // We don't need to unlink this ft from the header
3096             toku_ft_grab_reflock(ft);
3097             bool needed = toku_ft_needed_unlocked(ft);
3098             toku_ft_release_reflock(ft);
3099             if (!needed) {
3100                 // close immediately.
3101                 toku_ft_evict_from_memory(ft, false, ZERO_LSN);
3102             }
3103         }
3104         else {
3105             toku_cachefile_close(&cf, false, ZERO_LSN);
3106         }
3107     }
3108     toku_ft_open_close_unlock();
3109     return r;
3110 }
3111 
3112 // Open an ft for the purpose of recovery, which requires that the ft be open to a pre-determined FILENUM
3113 // and may require a specific checkpointed version of the file.
3114 // (dict_id is assigned by the ft_handle_open() function.)
3115 int
toku_ft_handle_open_recovery(FT_HANDLE t,const char * fname_in_env,int is_create,int only_create,CACHETABLE cachetable,TOKUTXN txn,FILENUM use_filenum,LSN max_acceptable_lsn)3116 toku_ft_handle_open_recovery(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, FILENUM use_filenum, LSN max_acceptable_lsn) {
3117     int r;
3118     assert(use_filenum.fileid != FILENUM_NONE.fileid);
3119     r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable,
3120                  txn, use_filenum, DICTIONARY_ID_NONE, max_acceptable_lsn);
3121     return r;
3122 }
3123 
3124 // Open an ft in normal use.  The FILENUM and dict_id are assigned by the ft_handle_open() function.
3125 // Requires: The multi-operation client lock must be held to prevent a checkpoint from occuring.
3126 int
toku_ft_handle_open(FT_HANDLE t,const char * fname_in_env,int is_create,int only_create,CACHETABLE cachetable,TOKUTXN txn,bool open_rw)3127 toku_ft_handle_open(FT_HANDLE t, const char *fname_in_env, int is_create, int only_create, CACHETABLE cachetable, TOKUTXN txn, bool open_rw) {
3128     int r;
3129     r = ft_handle_open(t, fname_in_env, is_create, only_create, cachetable, txn, FILENUM_NONE, DICTIONARY_ID_NONE, MAX_LSN, open_rw);
3130     return r;
3131 }
3132 
3133 // clone an ft handle. the cloned handle has a new dict_id but refers to the same fractal tree
3134 int
toku_ft_handle_clone(FT_HANDLE * cloned_ft_handle,FT_HANDLE ft_handle,TOKUTXN txn,bool open_rw)3135 toku_ft_handle_clone(FT_HANDLE *cloned_ft_handle, FT_HANDLE ft_handle, TOKUTXN txn, bool open_rw) {
3136     FT_HANDLE result_ft_handle;
3137     toku_ft_handle_create(&result_ft_handle);
3138 
3139     // we're cloning, so the handle better have an open ft and open cf
3140     invariant(ft_handle->ft);
3141     invariant(ft_handle->ft->cf);
3142 
3143     // inherit the options of the ft whose handle is being cloned.
3144     toku_ft_handle_inherit_options(result_ft_handle, ft_handle->ft);
3145 
3146     // we can clone the handle by creating a new handle with the same fname
3147     CACHEFILE cf = ft_handle->ft->cf;
3148     CACHETABLE ct = toku_cachefile_get_cachetable(cf);
3149     const char *fname_in_env = toku_cachefile_fname_in_env(cf);
3150     int r = toku_ft_handle_open(result_ft_handle, fname_in_env, false, false, ct, txn, open_rw);
3151     if (r != 0) {
3152         toku_ft_handle_close(result_ft_handle);
3153         result_ft_handle = NULL;
3154     }
3155     *cloned_ft_handle = result_ft_handle;
3156     return r;
3157 }
3158 
3159 // Open an ft in normal use.  The FILENUM and dict_id are assigned by the ft_handle_open() function.
3160 int
toku_ft_handle_open_with_dict_id(FT_HANDLE t,const char * fname_in_env,int is_create,int only_create,CACHETABLE cachetable,TOKUTXN txn,DICTIONARY_ID use_dictionary_id)3161 toku_ft_handle_open_with_dict_id(
3162     FT_HANDLE t,
3163     const char *fname_in_env,
3164     int is_create,
3165     int only_create,
3166     CACHETABLE cachetable,
3167     TOKUTXN txn,
3168     DICTIONARY_ID use_dictionary_id
3169     )
3170 {
3171     int r;
3172     r = ft_handle_open(
3173         t,
3174         fname_in_env,
3175         is_create,
3176         only_create,
3177         cachetable,
3178         txn,
3179         FILENUM_NONE,
3180         use_dictionary_id,
3181         MAX_LSN
3182         );
3183     return r;
3184 }
3185 
3186 DICTIONARY_ID
toku_ft_get_dictionary_id(FT_HANDLE ft_handle)3187 toku_ft_get_dictionary_id(FT_HANDLE ft_handle) {
3188     FT ft = ft_handle->ft;
3189     return ft->dict_id;
3190 }
3191 
toku_ft_set_flags(FT_HANDLE ft_handle,unsigned int flags)3192 void toku_ft_set_flags(FT_HANDLE ft_handle, unsigned int flags) {
3193     ft_handle->did_set_flags = true;
3194     ft_handle->options.flags = flags;
3195 }
3196 
toku_ft_get_flags(FT_HANDLE ft_handle,unsigned int * flags)3197 void toku_ft_get_flags(FT_HANDLE ft_handle, unsigned int *flags) {
3198     *flags = ft_handle->options.flags;
3199 }
3200 
toku_ft_get_maximum_advised_key_value_lengths(unsigned int * max_key_len,unsigned int * max_val_len)3201 void toku_ft_get_maximum_advised_key_value_lengths (unsigned int *max_key_len, unsigned int *max_val_len)
3202 // return the maximum advisable key value lengths.  The ft doesn't enforce these.
3203 {
3204     *max_key_len = 32*1024;
3205     *max_val_len = 32*1024*1024;
3206 }
3207 
3208 
toku_ft_handle_set_nodesize(FT_HANDLE ft_handle,unsigned int nodesize)3209 void toku_ft_handle_set_nodesize(FT_HANDLE ft_handle, unsigned int nodesize) {
3210     if (ft_handle->ft) {
3211         toku_ft_set_nodesize(ft_handle->ft, nodesize);
3212     }
3213     else {
3214         ft_handle->options.nodesize = nodesize;
3215     }
3216 }
3217 
toku_ft_handle_get_nodesize(FT_HANDLE ft_handle,unsigned int * nodesize)3218 void toku_ft_handle_get_nodesize(FT_HANDLE ft_handle, unsigned int *nodesize) {
3219     if (ft_handle->ft) {
3220         toku_ft_get_nodesize(ft_handle->ft, nodesize);
3221     }
3222     else {
3223         *nodesize = ft_handle->options.nodesize;
3224     }
3225 }
3226 
toku_ft_handle_set_basementnodesize(FT_HANDLE ft_handle,unsigned int basementnodesize)3227 void toku_ft_handle_set_basementnodesize(FT_HANDLE ft_handle, unsigned int basementnodesize) {
3228     if (ft_handle->ft) {
3229         toku_ft_set_basementnodesize(ft_handle->ft, basementnodesize);
3230     }
3231     else {
3232         ft_handle->options.basementnodesize = basementnodesize;
3233     }
3234 }
3235 
toku_ft_handle_get_basementnodesize(FT_HANDLE ft_handle,unsigned int * basementnodesize)3236 void toku_ft_handle_get_basementnodesize(FT_HANDLE ft_handle, unsigned int *basementnodesize) {
3237     if (ft_handle->ft) {
3238         toku_ft_get_basementnodesize(ft_handle->ft, basementnodesize);
3239     }
3240     else {
3241         *basementnodesize = ft_handle->options.basementnodesize;
3242     }
3243 }
3244 
toku_ft_set_bt_compare(FT_HANDLE ft_handle,int (* bt_compare)(DB *,const DBT *,const DBT *))3245 void toku_ft_set_bt_compare(FT_HANDLE ft_handle, int (*bt_compare)(DB*, const DBT*, const DBT*)) {
3246     ft_handle->options.compare_fun = bt_compare;
3247 }
3248 
toku_ft_set_redirect_callback(FT_HANDLE ft_handle,on_redirect_callback redir_cb,void * extra)3249 void toku_ft_set_redirect_callback(FT_HANDLE ft_handle, on_redirect_callback redir_cb, void* extra) {
3250     ft_handle->redirect_callback = redir_cb;
3251     ft_handle->redirect_callback_extra = extra;
3252 }
3253 
toku_ft_set_update(FT_HANDLE ft_handle,ft_update_func update_fun)3254 void toku_ft_set_update(FT_HANDLE ft_handle, ft_update_func update_fun) {
3255     ft_handle->options.update_fun = update_fun;
3256 }
3257 
toku_ft_get_comparator(FT_HANDLE ft_handle)3258 const toku::comparator &toku_ft_get_comparator(FT_HANDLE ft_handle) {
3259     invariant_notnull(ft_handle->ft);
3260     return ft_handle->ft->cmp;
3261 }
3262 
3263 static void
ft_remove_handle_ref_callback(FT UU (ft),void * extra)3264 ft_remove_handle_ref_callback(FT UU(ft), void *extra) {
3265     FT_HANDLE CAST_FROM_VOIDP(handle, extra);
3266     toku_list_remove(&handle->live_ft_handle_link);
3267 }
3268 
ft_handle_close(FT_HANDLE ft_handle,bool oplsn_valid,LSN oplsn)3269 static void ft_handle_close(FT_HANDLE ft_handle, bool oplsn_valid, LSN oplsn) {
3270     FT ft = ft_handle->ft;
3271     // There are error paths in the ft_handle_open that end with ft_handle->ft == nullptr.
3272     if (ft != nullptr) {
3273         toku_ft_remove_reference(ft, oplsn_valid, oplsn, ft_remove_handle_ref_callback, ft_handle);
3274     }
3275     toku_free(ft_handle);
3276 }
3277 
3278 // close an ft handle during normal operation. the underlying ft may or may not close,
3279 // depending if there are still references. an lsn for this close will come from the logger.
toku_ft_handle_close(FT_HANDLE ft_handle)3280 void toku_ft_handle_close(FT_HANDLE ft_handle) {
3281     ft_handle_close(ft_handle, false, ZERO_LSN);
3282 }
3283 
3284 // close an ft handle during recovery. the underlying ft must close, and will use the given lsn.
toku_ft_handle_close_recovery(FT_HANDLE ft_handle,LSN oplsn)3285 void toku_ft_handle_close_recovery(FT_HANDLE ft_handle, LSN oplsn) {
3286     // the ft must exist if closing during recovery. error paths during
3287     // open for recovery should close handles using toku_ft_handle_close()
3288     invariant_notnull(ft_handle->ft);
3289     ft_handle_close(ft_handle, true, oplsn);
3290 }
3291 
3292 // TODO: remove this, callers should instead just use toku_ft_handle_close()
toku_close_ft_handle_nolsn(FT_HANDLE ft_handle,char ** UU (error_string))3293 int toku_close_ft_handle_nolsn(FT_HANDLE ft_handle, char **UU(error_string)) {
3294     toku_ft_handle_close(ft_handle);
3295     return 0;
3296 }
3297 
toku_ft_handle_create(FT_HANDLE * ft_handle_ptr)3298 void toku_ft_handle_create(FT_HANDLE *ft_handle_ptr) {
3299     FT_HANDLE XMALLOC(ft_handle);
3300     memset(ft_handle, 0, sizeof *ft_handle);
3301     toku_list_init(&ft_handle->live_ft_handle_link);
3302     ft_handle->options.flags = 0;
3303     ft_handle->did_set_flags = false;
3304     ft_handle->options.nodesize = FT_DEFAULT_NODE_SIZE;
3305     ft_handle->options.basementnodesize = FT_DEFAULT_BASEMENT_NODE_SIZE;
3306     ft_handle->options.compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
3307     ft_handle->options.fanout = FT_DEFAULT_FANOUT;
3308     ft_handle->options.compare_fun = toku_builtin_compare_fun;
3309     ft_handle->options.update_fun = NULL;
3310     *ft_handle_ptr = ft_handle;
3311 }
3312 
3313 /******************************* search ***************************************/
3314 
3315 // Return true if this key is within the search bound.  If there is no search bound then the tree search continues.
search_continue(ft_search * search,void * key,uint32_t key_len)3316 static bool search_continue(ft_search *search, void *key, uint32_t key_len) {
3317     bool result = true;
3318     if (search->direction == FT_SEARCH_LEFT && search->k_bound) {
3319         FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
3320         DBT this_key = { .data = key, .size = key_len };
3321         // search continues if this key <= key bound
3322         result = (ft_handle->ft->cmp(&this_key, search->k_bound) <= 0);
3323     }
3324     return result;
3325 }
3326 
heaviside_from_search_t(const DBT & kdbt,ft_search & search)3327 static int heaviside_from_search_t(const DBT &kdbt, ft_search &search) {
3328     int cmp = search.compare(search,
3329                               search.k ? &kdbt : 0);
3330     // The search->compare function returns only 0 or 1
3331     switch (search.direction) {
3332     case FT_SEARCH_LEFT:   return cmp==0 ? -1 : +1;
3333     case FT_SEARCH_RIGHT:  return cmp==0 ? +1 : -1; // Because the comparison runs backwards for right searches.
3334     }
3335     abort(); return 0;
3336 }
3337 
3338 // This is a bottom layer of the search functions.
3339 static int
ft_search_basement_node(BASEMENTNODE bn,ft_search * search,FT_GET_CALLBACK_FUNCTION getf,void * getf_v,bool * doprefetch,FT_CURSOR ftcursor,bool can_bulk_fetch)3340 ft_search_basement_node(
3341     BASEMENTNODE bn,
3342     ft_search *search,
3343     FT_GET_CALLBACK_FUNCTION getf,
3344     void *getf_v,
3345     bool *doprefetch,
3346     FT_CURSOR ftcursor,
3347     bool can_bulk_fetch
3348     )
3349 {
3350     // Now we have to convert from ft_search to the heaviside function with a direction.  What a pain...
3351 
3352     int direction;
3353     switch (search->direction) {
3354     case FT_SEARCH_LEFT:   direction = +1; goto ok;
3355     case FT_SEARCH_RIGHT:  direction = -1; goto ok;
3356     }
3357     return EINVAL;  // This return and the goto are a hack to get both compile-time and run-time checking on enum
3358 ok: ;
3359     uint32_t idx = 0;
3360     LEAFENTRY le;
3361     uint32_t keylen;
3362     void *key;
3363     int r = bn->data_buffer.find<decltype(*search), heaviside_from_search_t>(
3364         *search,
3365         direction,
3366         &le,
3367         &key,
3368         &keylen,
3369         &idx
3370         );
3371     if (r!=0) return r;
3372 
3373     if (toku_ft_cursor_is_leaf_mode(ftcursor))
3374         goto got_a_good_value;        // leaf mode cursors see all leaf entries
3375     if (le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) {
3376         // Provisionally deleted stuff is gone.
3377         // So we need to scan in the direction to see if we can find something.
3378         // Every 64 deleted leaf entries check if the leaf's key is within the search bounds.
3379         for (uint64_t n_deleted = 1; ; n_deleted++) {
3380             switch (search->direction) {
3381             case FT_SEARCH_LEFT:
3382                 idx++;
3383                 if (idx >= bn->data_buffer.num_klpairs() || ((n_deleted % 64) == 0 && !search_continue(search, key, keylen))) {
3384                     FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3385                     if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
3386                         return TOKUDB_INTERRUPTED;
3387                     }
3388                     return DB_NOTFOUND;
3389                 }
3390                 break;
3391             case FT_SEARCH_RIGHT:
3392                 if (idx == 0) {
3393                     FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3394                     if (ftcursor->interrupt_cb && ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted)) {
3395                         return TOKUDB_INTERRUPTED;
3396                     }
3397                     return DB_NOTFOUND;
3398                 }
3399                 idx--;
3400                 break;
3401             default:
3402                 abort();
3403             }
3404             r = bn->data_buffer.fetch_klpair(idx, &le, &keylen, &key);
3405             assert_zero(r); // we just validated the index
3406             if (!le_val_is_del(le, ftcursor->read_type, ftcursor->ttxn)) {
3407                 FT_STATUS_INC(FT_CURSOR_SKIP_DELETED_LEAF_ENTRY, n_deleted);
3408                 if (ftcursor->interrupt_cb)
3409                     ftcursor->interrupt_cb(ftcursor->interrupt_cb_extra, n_deleted);
3410                 goto got_a_good_value;
3411             }
3412         }
3413     }
3414 got_a_good_value:
3415     {
3416         uint32_t vallen;
3417         void *val;
3418 
3419         le_extract_val(le, toku_ft_cursor_is_leaf_mode(ftcursor),
3420                        ftcursor->read_type, ftcursor->ttxn,
3421                        &vallen, &val);
3422         r = toku_ft_cursor_check_restricted_range(ftcursor, key, keylen);
3423         if (r == 0) {
3424             r = getf(keylen, key, vallen, val, getf_v, false);
3425         }
3426         if (r == 0 || r == TOKUDB_CURSOR_CONTINUE) {
3427             //
3428             // IMPORTANT: bulk fetch CANNOT go past the current basement node,
3429             // because there is no guarantee that messages have been applied
3430             // to other basement nodes, as part of #5770
3431             //
3432             if (r == TOKUDB_CURSOR_CONTINUE && can_bulk_fetch) {
3433                 r = toku_ft_cursor_shortcut(ftcursor, direction, idx, &bn->data_buffer,
3434                                             getf, getf_v, &keylen, &key, &vallen, &val);
3435             }
3436 
3437             toku_destroy_dbt(&ftcursor->key);
3438             toku_destroy_dbt(&ftcursor->val);
3439             if (!ftcursor->is_temporary) {
3440                 toku_memdup_dbt(&ftcursor->key, key, keylen);
3441                 toku_memdup_dbt(&ftcursor->val, val, vallen);
3442             }
3443             // The search was successful.  Prefetching can continue.
3444             *doprefetch = true;
3445         }
3446     }
3447     if (r == TOKUDB_CURSOR_CONTINUE) r = 0;
3448     return r;
3449 }
3450 
3451 static int
3452 ft_search_node (
3453     FT_HANDLE ft_handle,
3454     FTNODE node,
3455     ft_search *search,
3456     int child_to_search,
3457     FT_GET_CALLBACK_FUNCTION getf,
3458     void *getf_v,
3459     bool *doprefetch,
3460     FT_CURSOR ftcursor,
3461     UNLOCKERS unlockers,
3462     ANCESTORS,
3463     const pivot_bounds &bounds,
3464     bool can_bulk_fetch
3465     );
3466 
3467 static int
ftnode_fetch_callback_and_free_bfe(CACHEFILE cf,PAIR p,int fd,BLOCKNUM blocknum,uint32_t fullhash,void ** ftnode_pv,void ** UU (disk_data),PAIR_ATTR * sizep,int * dirtyp,void * extraargs)3468 ftnode_fetch_callback_and_free_bfe(CACHEFILE cf, PAIR p, int fd, BLOCKNUM blocknum, uint32_t fullhash, void **ftnode_pv, void** UU(disk_data), PAIR_ATTR *sizep, int *dirtyp, void *extraargs)
3469 {
3470     int r = toku_ftnode_fetch_callback(cf, p, fd, blocknum, fullhash, ftnode_pv, disk_data, sizep, dirtyp, extraargs);
3471     ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, extraargs);
3472     bfe->destroy();
3473     toku_free(bfe);
3474     return r;
3475 }
3476 
3477 static int
ftnode_pf_callback_and_free_bfe(void * ftnode_pv,void * disk_data,void * read_extraargs,int fd,PAIR_ATTR * sizep)3478 ftnode_pf_callback_and_free_bfe(void *ftnode_pv, void* disk_data, void *read_extraargs, int fd, PAIR_ATTR *sizep)
3479 {
3480     int r = toku_ftnode_pf_callback(ftnode_pv, disk_data, read_extraargs, fd, sizep);
3481     ftnode_fetch_extra *CAST_FROM_VOIDP(bfe, read_extraargs);
3482     bfe->destroy();
3483     toku_free(bfe);
3484     return r;
3485 }
3486 
get_write_callbacks_for_node(FT ft)3487 CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_node(FT ft) {
3488     CACHETABLE_WRITE_CALLBACK wc;
3489     wc.flush_callback = toku_ftnode_flush_callback;
3490     wc.pe_est_callback = toku_ftnode_pe_est_callback;
3491     wc.pe_callback = toku_ftnode_pe_callback;
3492     wc.cleaner_callback = toku_ftnode_cleaner_callback;
3493     wc.clone_callback = toku_ftnode_clone_callback;
3494     wc.checkpoint_complete_callback = toku_ftnode_checkpoint_complete_callback;
3495     wc.write_extraargs = ft;
3496     return wc;
3497 }
3498 
3499 static void
ft_node_maybe_prefetch(FT_HANDLE ft_handle,FTNODE node,int childnum,FT_CURSOR ftcursor,bool * doprefetch)3500 ft_node_maybe_prefetch(FT_HANDLE ft_handle, FTNODE node, int childnum, FT_CURSOR ftcursor, bool *doprefetch) {
3501     // the number of nodes to prefetch
3502     const int num_nodes_to_prefetch = 1;
3503 
3504     // if we want to prefetch in the tree
3505     // then prefetch the next children if there are any
3506     if (*doprefetch && toku_ft_cursor_prefetching(ftcursor) && !ftcursor->disable_prefetching) {
3507         int rc = ft_cursor_rightmost_child_wanted(ftcursor, ft_handle, node);
3508         for (int i = childnum + 1; (i <= childnum + num_nodes_to_prefetch) && (i <= rc); i++) {
3509             BLOCKNUM nextchildblocknum = BP_BLOCKNUM(node, i);
3510             uint32_t nextfullhash = compute_child_fullhash(ft_handle->ft->cf, node, i);
3511             ftnode_fetch_extra *XCALLOC(bfe);
3512             bfe->create_for_prefetch(ft_handle->ft, ftcursor);
3513             bool doing_prefetch = false;
3514             toku_cachefile_prefetch(
3515                 ft_handle->ft->cf,
3516                 nextchildblocknum,
3517                 nextfullhash,
3518                 get_write_callbacks_for_node(ft_handle->ft),
3519                 ftnode_fetch_callback_and_free_bfe,
3520                 toku_ftnode_pf_req_callback,
3521                 ftnode_pf_callback_and_free_bfe,
3522                 bfe,
3523                 &doing_prefetch
3524                 );
3525             if (!doing_prefetch) {
3526                 bfe->destroy();
3527                 toku_free(bfe);
3528             }
3529             *doprefetch = false;
3530         }
3531     }
3532 }
3533 
3534 struct unlock_ftnode_extra {
3535     FT_HANDLE ft_handle;
3536     FTNODE node;
3537     bool msgs_applied;
3538 };
3539 
3540 // When this is called, the cachetable lock is held
3541 static void
unlock_ftnode_fun(void * v)3542 unlock_ftnode_fun (void *v) {
3543     struct unlock_ftnode_extra *x = NULL;
3544     CAST_FROM_VOIDP(x, v);
3545     FT_HANDLE ft_handle = x->ft_handle;
3546     FTNODE node = x->node;
3547     // CT lock is held
3548     int r = toku_cachetable_unpin_ct_prelocked_no_flush(
3549         ft_handle->ft->cf,
3550         node->ct_pair,
3551         (enum cachetable_dirty) node->dirty(),
3552         x->msgs_applied ? make_ftnode_pair_attr(node) : make_invalid_pair_attr()
3553         );
3554     assert_zero(r);
3555 }
3556 
3557 /* search in a node's child */
3558 static int
ft_search_child(FT_HANDLE ft_handle,FTNODE node,int childnum,ft_search * search,FT_GET_CALLBACK_FUNCTION getf,void * getf_v,bool * doprefetch,FT_CURSOR ftcursor,UNLOCKERS unlockers,ANCESTORS ancestors,const pivot_bounds & bounds,bool can_bulk_fetch)3559 ft_search_child(FT_HANDLE ft_handle, FTNODE node, int childnum, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, bool *doprefetch, FT_CURSOR ftcursor, UNLOCKERS unlockers,
3560                  ANCESTORS ancestors, const pivot_bounds &bounds, bool can_bulk_fetch)
3561 // Effect: Search in a node's child.  Searches are read-only now (at least as far as the hardcopy is concerned).
3562 {
3563     struct ancestors next_ancestors = {node, childnum, ancestors};
3564 
3565     BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
3566     uint32_t fullhash = compute_child_fullhash(ft_handle->ft->cf, node, childnum);
3567     FTNODE childnode = nullptr;
3568 
3569     // If the current node's height is greater than 1, then its child is an internal node.
3570     // Therefore, to warm the cache better (#5798), we want to read all the partitions off disk in one shot.
3571     bool read_all_partitions = node->height > 1;
3572     ftnode_fetch_extra bfe;
3573     bfe.create_for_subset_read(
3574         ft_handle->ft,
3575         search,
3576         &ftcursor->range_lock_left_key,
3577         &ftcursor->range_lock_right_key,
3578         ftcursor->left_is_neg_infty,
3579         ftcursor->right_is_pos_infty,
3580         ftcursor->disable_prefetching,
3581         read_all_partitions
3582         );
3583     bool msgs_applied = false;
3584     {
3585         int rr = toku_pin_ftnode_for_query(ft_handle, childblocknum, fullhash,
3586                                          unlockers,
3587                                          &next_ancestors, bounds,
3588                                          &bfe,
3589                                          true,
3590                                          &childnode,
3591                                          &msgs_applied);
3592         if (rr==TOKUDB_TRY_AGAIN) {
3593             return rr;
3594         }
3595         invariant_zero(rr);
3596     }
3597 
3598     struct unlock_ftnode_extra unlock_extra = { ft_handle, childnode, msgs_applied };
3599     struct unlockers next_unlockers = { true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers };
3600     int r = ft_search_node(ft_handle, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, ftcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
3601     if (r!=TOKUDB_TRY_AGAIN) {
3602         // maybe prefetch the next child
3603         if (r == 0 && node->height == 1) {
3604             ft_node_maybe_prefetch(ft_handle, node, childnum, ftcursor, doprefetch);
3605         }
3606 
3607         assert(next_unlockers.locked);
3608         if (msgs_applied) {
3609             toku_unpin_ftnode(ft_handle->ft, childnode);
3610         }
3611         else {
3612             toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
3613         }
3614     } else {
3615         // try again.
3616 
3617         // there are two cases where we get TOKUDB_TRY_AGAIN
3618         //  case 1 is when some later call to toku_pin_ftnode returned
3619         //  that value and unpinned all the nodes anyway. case 2
3620         //  is when ft_search_node had to stop its search because
3621         //  some piece of a node that it needed was not in memory. In this case,
3622         //  the node was not unpinned, so we unpin it here
3623         if (next_unlockers.locked) {
3624             if (msgs_applied) {
3625                 toku_unpin_ftnode(ft_handle->ft, childnode);
3626             }
3627             else {
3628                 toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
3629             }
3630         }
3631     }
3632 
3633     return r;
3634 }
3635 
3636 static inline int
search_which_child_cmp_with_bound(const toku::comparator & cmp,FTNODE node,int childnum,ft_search * search,DBT * dbt)3637 search_which_child_cmp_with_bound(const toku::comparator &cmp, FTNODE node, int childnum,
3638                                   ft_search *search, DBT *dbt) {
3639     return cmp(toku_copyref_dbt(dbt, node->pivotkeys.get_pivot(childnum)), &search->pivot_bound);
3640 }
3641 
3642 int
toku_ft_search_which_child(const toku::comparator & cmp,FTNODE node,ft_search * search)3643 toku_ft_search_which_child(const toku::comparator &cmp, FTNODE node, ft_search *search) {
3644     if (node->n_children <= 1) return 0;
3645 
3646     DBT pivotkey;
3647     toku_init_dbt(&pivotkey);
3648     int lo = 0;
3649     int hi = node->n_children - 1;
3650     int mi;
3651     while (lo < hi) {
3652         mi = (lo + hi) / 2;
3653         node->pivotkeys.fill_pivot(mi, &pivotkey);
3654         // search->compare is really strange, and only works well with a
3655         // linear search, it makes binary search a pita.
3656         //
3657         // if you are searching left to right, it returns
3658         //   "0" for pivots that are < the target, and
3659         //   "1" for pivots that are >= the target
3660         // if you are searching right to left, it's the opposite.
3661         //
3662         // so if we're searching from the left and search->compare says
3663         // "1", we want to go left from here, if it says "0" we want to go
3664         // right.  searching from the right does the opposite.
3665         bool c = search->compare(*search, &pivotkey);
3666         if (((search->direction == FT_SEARCH_LEFT) && c) ||
3667             ((search->direction == FT_SEARCH_RIGHT) && !c)) {
3668             hi = mi;
3669         } else {
3670             assert(((search->direction == FT_SEARCH_LEFT) && !c) ||
3671                    ((search->direction == FT_SEARCH_RIGHT) && c));
3672             lo = mi + 1;
3673         }
3674     }
3675     // ready to return something, if the pivot is bounded, we have to move
3676     // over a bit to get away from what we've already searched
3677     if (search->pivot_bound.data != nullptr) {
3678         if (search->direction == FT_SEARCH_LEFT) {
3679             while (lo < node->n_children - 1 &&
3680                    search_which_child_cmp_with_bound(cmp, node, lo, search, &pivotkey) <= 0) {
3681                 // searching left to right, if the comparison says the
3682                 // current pivot (lo) is left of or equal to our bound,
3683                 // don't search that child again
3684                 lo++;
3685             }
3686         } else {
3687             while (lo > 0 &&
3688                    search_which_child_cmp_with_bound(cmp, node, lo - 1, search, &pivotkey) >= 0) {
3689                 // searching right to left, same argument as just above
3690                 // (but we had to pass lo - 1 because the pivot between lo
3691                 // and the thing just less than it is at that position in
3692                 // the pivot keys array)
3693                 lo--;
3694             }
3695         }
3696     }
3697     return lo;
3698 }
3699 
3700 static void
maybe_search_save_bound(FTNODE node,int child_searched,ft_search * search)3701 maybe_search_save_bound(
3702     FTNODE node,
3703     int child_searched,
3704     ft_search *search)
3705 {
3706     int p = (search->direction == FT_SEARCH_LEFT) ? child_searched : child_searched - 1;
3707     if (p >= 0 && p < node->n_children-1) {
3708         toku_destroy_dbt(&search->pivot_bound);
3709         toku_clone_dbt(&search->pivot_bound, node->pivotkeys.get_pivot(p));
3710     }
3711 }
3712 
3713 // Returns true if there are still children left to search in this node within the search bound (if any).
search_try_again(FTNODE node,int child_to_search,ft_search * search)3714 static bool search_try_again(FTNODE node, int child_to_search, ft_search *search) {
3715     bool try_again = false;
3716     if (search->direction == FT_SEARCH_LEFT) {
3717         if (child_to_search < node->n_children-1) {
3718             try_again = true;
3719             // if there is a search bound and the bound is within the search pivot then continue the search
3720             if (search->k_bound) {
3721                 FT_HANDLE CAST_FROM_VOIDP(ft_handle, search->context);
3722                 try_again = (ft_handle->ft->cmp(search->k_bound, &search->pivot_bound) > 0);
3723             }
3724         }
3725     } else if (search->direction == FT_SEARCH_RIGHT) {
3726         if (child_to_search > 0)
3727             try_again = true;
3728     }
3729     return try_again;
3730 }
3731 
3732 static int
ft_search_node(FT_HANDLE ft_handle,FTNODE node,ft_search * search,int child_to_search,FT_GET_CALLBACK_FUNCTION getf,void * getf_v,bool * doprefetch,FT_CURSOR ftcursor,UNLOCKERS unlockers,ANCESTORS ancestors,const pivot_bounds & bounds,bool can_bulk_fetch)3733 ft_search_node(
3734     FT_HANDLE ft_handle,
3735     FTNODE node,
3736     ft_search *search,
3737     int child_to_search,
3738     FT_GET_CALLBACK_FUNCTION getf,
3739     void *getf_v,
3740     bool *doprefetch,
3741     FT_CURSOR ftcursor,
3742     UNLOCKERS unlockers,
3743     ANCESTORS ancestors,
3744     const pivot_bounds &bounds,
3745     bool can_bulk_fetch
3746     )
3747 {
3748     int r = 0;
3749     // assert that we got a valid child_to_search
3750     invariant(child_to_search >= 0);
3751     invariant(child_to_search < node->n_children);
3752     //
3753     // At this point, we must have the necessary partition available to continue the search
3754     //
3755     assert(BP_STATE(node,child_to_search) == PT_AVAIL);
3756     const pivot_bounds next_bounds = bounds.next_bounds(node, child_to_search);
3757     if (node->height > 0) {
3758         r = ft_search_child(
3759             ft_handle,
3760             node,
3761             child_to_search,
3762             search,
3763             getf,
3764             getf_v,
3765             doprefetch,
3766             ftcursor,
3767             unlockers,
3768             ancestors,
3769             next_bounds,
3770             can_bulk_fetch
3771             );
3772     }
3773     else {
3774         r = ft_search_basement_node(
3775             BLB(node, child_to_search),
3776             search,
3777             getf,
3778             getf_v,
3779             doprefetch,
3780             ftcursor,
3781             can_bulk_fetch
3782             );
3783     }
3784     if (r == 0) {
3785         return r; //Success
3786     }
3787 
3788     if (r != DB_NOTFOUND) {
3789         return r; //Error (or message to quit early, such as TOKUDB_FOUND_BUT_REJECTED or TOKUDB_TRY_AGAIN)
3790     }
3791     // not really necessary, just put this here so that reading the
3792     // code becomes simpler. The point is at this point in the code,
3793     // we know that we got DB_NOTFOUND and we have to continue
3794     assert(r == DB_NOTFOUND);
3795     // we have a new pivotkey
3796     if (node->height == 0) {
3797         // when we run off the end of a basement, try to lock the range up to the pivot. solves #3529
3798         const DBT *pivot = search->direction == FT_SEARCH_LEFT ? next_bounds.ubi() : // left -> right
3799                                                                  next_bounds.lbe();  // right -> left
3800         if (pivot != nullptr) {
3801             int rr = getf(pivot->size, pivot->data, 0, nullptr, getf_v, true);
3802             if (rr != 0) {
3803                 return rr; // lock was not granted
3804             }
3805         }
3806     }
3807 
3808     // If we got a DB_NOTFOUND then we have to search the next record.        Possibly everything present is not visible.
3809     // This way of doing DB_NOTFOUND is a kludge, and ought to be simplified.  Something like this is needed for DB_NEXT, but
3810     //        for point queries, it's overkill.  If we got a DB_NOTFOUND on a point query then we should just stop looking.
3811     // When releasing locks on I/O we must not search the same subtree again, or we won't be guaranteed to make forward progress.
3812     // If we got a DB_NOTFOUND, then the pivot is too small if searching from left to right (too large if searching from right to left).
3813     // So save the pivot key in the search object.
3814     maybe_search_save_bound(node, child_to_search, search);
3815 
3816     // as part of #5770, if we can continue searching,
3817     // we MUST return TOKUDB_TRY_AGAIN,
3818     // because there is no guarantee that messages have been applied
3819     // on any other path.
3820     if (search_try_again(node, child_to_search, search)) {
3821         r = TOKUDB_TRY_AGAIN;
3822     }
3823 
3824     return r;
3825 }
3826 
toku_ft_search(FT_HANDLE ft_handle,ft_search * search,FT_GET_CALLBACK_FUNCTION getf,void * getf_v,FT_CURSOR ftcursor,bool can_bulk_fetch)3827 int toku_ft_search(FT_HANDLE ft_handle, ft_search *search, FT_GET_CALLBACK_FUNCTION getf, void *getf_v, FT_CURSOR ftcursor, bool can_bulk_fetch)
3828 // Effect: Perform a search.  Associate cursor with a leaf if possible.
3829 // All searches are performed through this function.
3830 {
3831     int r;
3832     uint trycount = 0;     // How many tries did it take to get the result?
3833     FT ft = ft_handle->ft;
3834 
3835     toku::context search_ctx(CTX_SEARCH);
3836 
3837 try_again:
3838 
3839     trycount++;
3840 
3841     //
3842     // Here is how searches work
3843     // At a high level, we descend down the tree, using the search parameter
3844     // to guide us towards where to look. But the search parameter is not
3845     // used here to determine which child of a node to read (regardless
3846     // of whether that child is another node or a basement node)
3847     // The search parameter is used while we are pinning the node into
3848     // memory, because that is when the system needs to ensure that
3849     // the appropriate partition of the child we are using is in memory.
3850     // So, here are the steps for a search (and this applies to this function
3851     // as well as ft_search_child:
3852     //  - Take the search parameter, and create a ftnode_fetch_extra, that will be used by toku_pin_ftnode
3853     //  - Call toku_pin_ftnode with the bfe as the extra for the fetch callback (in case the node is not at all in memory)
3854     //       and the partial fetch callback (in case the node is perhaps partially in memory) to the fetch the node
3855     //  - This eventually calls either toku_ftnode_fetch_callback or  toku_ftnode_pf_req_callback depending on whether the node is in
3856     //     memory at all or not.
3857     //  - Within these functions, the "ft_search search" parameter is used to evaluate which child the search is interested in.
3858     //     If the node is not in memory at all, toku_ftnode_fetch_callback will read the node and decompress only the partition for the
3859     //     relevant child, be it a message buffer or basement node. If the node is in memory, then toku_ftnode_pf_req_callback
3860     //     will tell the cachetable that a partial fetch is required if and only if the relevant child is not in memory. If the relevant child
3861     //     is not in memory, then toku_ftnode_pf_callback is called to fetch the partition.
3862     //  - These functions set bfe->child_to_read so that the search code does not need to reevaluate it.
3863     //  - Just to reiterate, all of the last item happens within toku_ftnode_pin(_holding_lock)
3864     //  - At this point, toku_ftnode_pin_holding_lock has returned, with bfe.child_to_read set,
3865     //  - ft_search_node is called, assuming that the node and its relevant partition are in memory.
3866     //
3867     ftnode_fetch_extra bfe;
3868     bfe.create_for_subset_read(
3869         ft,
3870         search,
3871         &ftcursor->range_lock_left_key,
3872         &ftcursor->range_lock_right_key,
3873         ftcursor->left_is_neg_infty,
3874         ftcursor->right_is_pos_infty,
3875         ftcursor->disable_prefetching,
3876         true // We may as well always read the whole root into memory, if it's a leaf node it's a tiny tree anyway.
3877         );
3878     FTNODE node = NULL;
3879     {
3880         uint32_t fullhash;
3881         CACHEKEY root_key;
3882         toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
3883         toku_pin_ftnode(
3884             ft,
3885             root_key,
3886             fullhash,
3887             &bfe,
3888             PL_READ, // may_modify_node set to false, because root cannot change during search
3889             &node,
3890             true
3891             );
3892     }
3893 
3894     uint tree_height = node->height + 1;  // How high is the tree?  This is the height of the root node plus one (leaf is at height 0).
3895 
3896 
3897     struct unlock_ftnode_extra unlock_extra   = {ft_handle,node,false};
3898     struct unlockers                unlockers      = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};
3899 
3900     {
3901         bool doprefetch = false;
3902         //static int counter = 0;         counter++;
3903         r = ft_search_node(ft_handle, node, search, bfe.child_to_read, getf, getf_v, &doprefetch, ftcursor, &unlockers, (ANCESTORS)NULL, pivot_bounds::infinite_bounds(), can_bulk_fetch);
3904         if (r==TOKUDB_TRY_AGAIN) {
3905             // there are two cases where we get TOKUDB_TRY_AGAIN
3906             //  case 1 is when some later call to toku_pin_ftnode returned
3907             //  that value and unpinned all the nodes anyway. case 2
3908             //  is when ft_search_node had to stop its search because
3909             //  some piece of a node that it needed was not in memory.
3910             //  In this case, the node was not unpinned, so we unpin it here
3911             if (unlockers.locked) {
3912                 toku_unpin_ftnode_read_only(ft_handle->ft, node);
3913             }
3914             goto try_again;
3915         } else {
3916             assert(unlockers.locked);
3917         }
3918     }
3919 
3920     assert(unlockers.locked);
3921     toku_unpin_ftnode_read_only(ft_handle->ft, node);
3922 
3923 
3924     //Heaviside function (+direction) queries define only a lower or upper
3925     //bound.  Some queries require both an upper and lower bound.
3926     //They do this by wrapping the FT_GET_CALLBACK_FUNCTION with another
3927     //test that checks for the other bound.  If the other bound fails,
3928     //it returns TOKUDB_FOUND_BUT_REJECTED which means not found, but
3929     //stop searching immediately, as opposed to DB_NOTFOUND
3930     //which can mean not found, but keep looking in another leaf.
3931     if (r==TOKUDB_FOUND_BUT_REJECTED) r = DB_NOTFOUND;
3932     else if (r==DB_NOTFOUND) {
3933         //We truly did not find an answer to the query.
3934         //Therefore, the FT_GET_CALLBACK_FUNCTION has NOT been called.
3935         //The contract specifies that the callback function must be called
3936         //for 'r= (0|DB_NOTFOUND|TOKUDB_FOUND_BUT_REJECTED)'
3937         //TODO: #1378 This is not the ultimate location of this call to the
3938         //callback.  It is surely wrong for node-level locking, and probably
3939         //wrong for the STRADDLE callback for heaviside function(two sets of key/vals)
3940         int r2 = getf(0,NULL, 0,NULL, getf_v, false);
3941         if (r2!=0) r = r2;
3942     }
3943     {   // accounting (to detect and measure thrashing)
3944         uint retrycount = trycount - 1;         // how many retries were needed?
3945         if (retrycount) {
3946             FT_STATUS_INC(FT_TOTAL_RETRIES, retrycount);
3947         }
3948         if (retrycount > tree_height) {         // if at least one node was read from disk more than once
3949             FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHT, 1);
3950             if (retrycount > (tree_height+3))
3951                 FT_STATUS_INC(FT_SEARCH_TRIES_GT_HEIGHTPLUS3, 1);
3952         }
3953     }
3954     return r;
3955 }
3956 
3957 /* ********************************* delete **************************************/
3958 static int
getf_nothing(uint32_t UU (keylen),const void * UU (key),uint32_t UU (vallen),const void * UU (val),void * UU (pair_v),bool UU (lock_only))3959 getf_nothing (uint32_t UU(keylen), const void *UU(key), uint32_t UU(vallen), const void *UU(val), void *UU(pair_v), bool UU(lock_only)) {
3960     return 0;
3961 }
3962 
toku_ft_cursor_delete(FT_CURSOR cursor,int flags,TOKUTXN txn)3963 int toku_ft_cursor_delete(FT_CURSOR cursor, int flags, TOKUTXN txn) {
3964     int r;
3965 
3966     int unchecked_flags = flags;
3967     bool error_if_missing = (bool) !(flags&DB_DELETE_ANY);
3968     unchecked_flags &= ~DB_DELETE_ANY;
3969     if (unchecked_flags!=0) r = EINVAL;
3970     else if (toku_ft_cursor_not_set(cursor)) r = EINVAL;
3971     else {
3972         r = 0;
3973         if (error_if_missing) {
3974             r = toku_ft_cursor_current(cursor, DB_CURRENT, getf_nothing, NULL);
3975         }
3976         if (r == 0) {
3977             toku_ft_delete(cursor->ft_handle, &cursor->key, txn);
3978         }
3979     }
3980     return r;
3981 }
3982 
3983 /* ********************* keyrange ************************ */
3984 
3985 struct keyrange_compare_s {
3986     FT ft;
3987     const DBT *key;
3988 };
3989 
3990 // TODO: Remove me, I'm boring
keyrange_compare(DBT const & kdbt,const struct keyrange_compare_s & s)3991 static int keyrange_compare(DBT const &kdbt,
3992                             const struct keyrange_compare_s &s) {
3993     return s.ft->cmp(&kdbt, s.key);
3994 }
3995 
keysrange_in_leaf_partition(FT_HANDLE ft_handle,FTNODE node,DBT * key_left,DBT * key_right,int left_child_number,int right_child_number,uint64_t estimated_num_rows,uint64_t * less,uint64_t * equal_left,uint64_t * middle,uint64_t * equal_right,uint64_t * greater,bool * single_basement_node)3996 static void keysrange_in_leaf_partition(FT_HANDLE ft_handle,
3997                                         FTNODE node,
3998                                         DBT *key_left,
3999                                         DBT *key_right,
4000                                         int left_child_number,
4001                                         int right_child_number,
4002                                         uint64_t estimated_num_rows,
4003                                         uint64_t *less,
4004                                         uint64_t *equal_left,
4005                                         uint64_t *middle,
4006                                         uint64_t *equal_right,
4007                                         uint64_t *greater,
4008                                         bool *single_basement_node)
4009 // If the partition is in main memory then estimate the number
4010 // Treat key_left == NULL as negative infinity
4011 // Treat key_right == NULL as positive infinity
4012 {
4013     paranoid_invariant(node->height == 0);  // we are in a leaf
4014     paranoid_invariant(!(key_left == NULL && key_right != NULL));
4015     paranoid_invariant(left_child_number <= right_child_number);
4016     bool single_basement = left_child_number == right_child_number;
4017     paranoid_invariant(!single_basement ||
4018                        (BP_STATE(node, left_child_number) == PT_AVAIL));
4019     if (BP_STATE(node, left_child_number) == PT_AVAIL) {
4020         int r;
4021         // The partition is in main memory then get an exact count.
4022         struct keyrange_compare_s s_left = {ft_handle->ft, key_left};
4023         BASEMENTNODE bn = BLB(node, left_child_number);
4024         uint32_t idx_left = 0;
4025         // if key_left is NULL then set r==-1 and idx==0.
4026         r = key_left
4027                 ? bn->data_buffer.find_zero<decltype(s_left), keyrange_compare>(
4028                       s_left, nullptr, nullptr, nullptr, &idx_left)
4029                 : -1;
4030         *less = idx_left;
4031         *equal_left = (r == 0) ? 1 : 0;
4032 
4033         uint32_t size = bn->data_buffer.num_klpairs();
4034         uint32_t idx_right = size;
4035         r = -1;
4036         if (single_basement && key_right) {
4037             struct keyrange_compare_s s_right = {ft_handle->ft, key_right};
4038             r = bn->data_buffer.find_zero<decltype(s_right), keyrange_compare>(
4039                 s_right, nullptr, nullptr, nullptr, &idx_right);
4040         }
4041         *middle = idx_right - idx_left - *equal_left;
4042         *equal_right = (r == 0) ? 1 : 0;
4043         *greater = size - idx_right - *equal_right;
4044     } else {
4045         paranoid_invariant(!single_basement);
4046         uint32_t idx_left = estimated_num_rows / 2;
4047         if (!key_left) {
4048             // Both nullptr, assume key_left belongs before leftmost entry,
4049             // key_right belongs after rightmost entry
4050             idx_left = 0;
4051             paranoid_invariant(!key_right);
4052         }
4053         // Assume idx_left and idx_right point to where key_left and key_right
4054         // belong, (but are not there).
4055         *less = idx_left;
4056         *equal_left = 0;
4057         *middle = estimated_num_rows - idx_left;
4058         *equal_right = 0;
4059         *greater = 0;
4060     }
4061     *single_basement_node = single_basement;
4062 }
4063 
toku_ft_keysrange_internal(FT_HANDLE ft_handle,FTNODE node,DBT * key_left,DBT * key_right,bool may_find_right,uint64_t * less,uint64_t * equal_left,uint64_t * middle,uint64_t * equal_right,uint64_t * greater,bool * single_basement_node,uint64_t estimated_num_rows,ftnode_fetch_extra * min_bfe,ftnode_fetch_extra * match_bfe,struct unlockers * unlockers,ANCESTORS ancestors,const pivot_bounds & bounds)4064 static int toku_ft_keysrange_internal(
4065     FT_HANDLE ft_handle,
4066     FTNODE node,
4067     DBT *key_left,
4068     DBT *key_right,
4069     bool may_find_right,
4070     uint64_t *less,
4071     uint64_t *equal_left,
4072     uint64_t *middle,
4073     uint64_t *equal_right,
4074     uint64_t *greater,
4075     bool *single_basement_node,
4076     uint64_t estimated_num_rows,
4077     ftnode_fetch_extra *min_bfe,  // set up to read a minimal read.
4078     ftnode_fetch_extra
4079         *match_bfe,  // set up to read a basement node iff both keys in it
4080     struct unlockers *unlockers,
4081     ANCESTORS ancestors,
4082     const pivot_bounds &bounds)
4083 // Implementation note: Assign values to less, equal, and greater, and then on
4084 // the way out (returning up the stack) we add more values in.
4085 {
4086     int r = 0;
4087     // if KEY is NULL then use the leftmost key.
4088     int left_child_number =
4089         key_left ? toku_ftnode_which_child(node, key_left, ft_handle->ft->cmp)
4090                  : 0;
4091     int right_child_number =
4092         node->n_children;  // Sentinel that does not equal left_child_number.
4093     if (may_find_right) {
4094         right_child_number =
4095             key_right
4096                 ? toku_ftnode_which_child(node, key_right, ft_handle->ft->cmp)
4097                 : node->n_children - 1;
4098     }
4099 
4100     uint64_t rows_per_child = estimated_num_rows / node->n_children;
4101     if (node->height == 0) {
4102         keysrange_in_leaf_partition(ft_handle,
4103                                     node,
4104                                     key_left,
4105                                     key_right,
4106                                     left_child_number,
4107                                     right_child_number,
4108                                     rows_per_child,
4109                                     less,
4110                                     equal_left,
4111                                     middle,
4112                                     equal_right,
4113                                     greater,
4114                                     single_basement_node);
4115 
4116         *less += rows_per_child * left_child_number;
4117         if (*single_basement_node) {
4118             *greater +=
4119                 rows_per_child * (node->n_children - left_child_number - 1);
4120         } else {
4121             *middle +=
4122                 rows_per_child * (node->n_children - left_child_number - 1);
4123         }
4124     } else {
4125         // do the child.
4126         struct ancestors next_ancestors = {node, left_child_number, ancestors};
4127         BLOCKNUM childblocknum = BP_BLOCKNUM(node, left_child_number);
4128         uint32_t fullhash =
4129             compute_child_fullhash(ft_handle->ft->cf, node, left_child_number);
4130         FTNODE childnode;
4131         bool msgs_applied = false;
4132         bool child_may_find_right =
4133             may_find_right && left_child_number == right_child_number;
4134         r = toku_pin_ftnode_for_query(
4135             ft_handle,
4136             childblocknum,
4137             fullhash,
4138             unlockers,
4139             &next_ancestors,
4140             bounds,
4141             child_may_find_right ? match_bfe : min_bfe,
4142             false,
4143             &childnode,
4144             &msgs_applied);
4145         paranoid_invariant(!msgs_applied);
4146         if (r != TOKUDB_TRY_AGAIN) {
4147             assert_zero(r);
4148 
4149             struct unlock_ftnode_extra unlock_extra = {
4150                 ft_handle, childnode, false};
4151             struct unlockers next_unlockers = {
4152                 true, unlock_ftnode_fun, (void *)&unlock_extra, unlockers};
4153             const pivot_bounds next_bounds =
4154                 bounds.next_bounds(node, left_child_number);
4155 
4156             r = toku_ft_keysrange_internal(ft_handle,
4157                                            childnode,
4158                                            key_left,
4159                                            key_right,
4160                                            child_may_find_right,
4161                                            less,
4162                                            equal_left,
4163                                            middle,
4164                                            equal_right,
4165                                            greater,
4166                                            single_basement_node,
4167                                            rows_per_child,
4168                                            min_bfe,
4169                                            match_bfe,
4170                                            &next_unlockers,
4171                                            &next_ancestors,
4172                                            next_bounds);
4173             if (r != TOKUDB_TRY_AGAIN) {
4174                 assert_zero(r);
4175 
4176                 *less += rows_per_child * left_child_number;
4177                 if (*single_basement_node) {
4178                     *greater += rows_per_child *
4179                                 (node->n_children - left_child_number - 1);
4180                 } else {
4181                     *middle += rows_per_child *
4182                                (node->n_children - left_child_number - 1);
4183                 }
4184 
4185                 assert(unlockers->locked);
4186                 toku_unpin_ftnode_read_only(ft_handle->ft, childnode);
4187             }
4188         }
4189     }
4190     return r;
4191 }
4192 
toku_ft_keysrange(FT_HANDLE ft_handle,DBT * key_left,DBT * key_right,uint64_t * less_p,uint64_t * equal_left_p,uint64_t * middle_p,uint64_t * equal_right_p,uint64_t * greater_p,bool * middle_3_exact_p)4193 void toku_ft_keysrange(FT_HANDLE ft_handle,
4194                        DBT *key_left,
4195                        DBT *key_right,
4196                        uint64_t *less_p,
4197                        uint64_t *equal_left_p,
4198                        uint64_t *middle_p,
4199                        uint64_t *equal_right_p,
4200                        uint64_t *greater_p,
4201                        bool *middle_3_exact_p)
4202 // Effect: Return an estimate  of the number of keys to the left, the number
4203 // equal (to left key), number between keys, number equal to right key, and the
4204 // number to the right of both keys.
4205 //   The values are an estimate.
4206 //   If you perform a keyrange on two keys that are in the same basement,
4207 //   equal_less, middle, and equal_right will be exact.
4208 //   4184: What to do with a NULL key?
4209 //   key_left==NULL is treated as -infinity
4210 //   key_right==NULL is treated as +infinity
4211 //   If KEY is NULL then the system picks an arbitrary key and returns it.
4212 //   key_right can be non-null only if key_left is non-null;
4213 {
4214     if (!key_left && key_right) {
4215         // Simplify internals by only supporting key_right != null when key_left
4216         // != null
4217         // If key_right != null and key_left == null, then swap them and fix up
4218         // numbers.
4219         uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0,
4220                  greater = 0;
4221         toku_ft_keysrange(ft_handle,
4222                           key_right,
4223                           nullptr,
4224                           &less,
4225                           &equal_left,
4226                           &middle,
4227                           &equal_right,
4228                           &greater,
4229                           middle_3_exact_p);
4230         *less_p = 0;
4231         *equal_left_p = 0;
4232         *middle_p = less;
4233         *equal_right_p = equal_left;
4234         *greater_p = middle;
4235         invariant_zero(equal_right);
4236         invariant_zero(greater);
4237         return;
4238     }
4239     paranoid_invariant(!(!key_left && key_right));
4240     ftnode_fetch_extra min_bfe;
4241     ftnode_fetch_extra match_bfe;
4242     min_bfe.create_for_min_read(
4243         ft_handle->ft);  // read pivot keys but not message buffers
4244     match_bfe.create_for_keymatch(
4245         ft_handle->ft,
4246         key_left,
4247         key_right,
4248         false,
4249         false);  // read basement node only if both keys in it.
4250 try_again : {
4251     uint64_t less = 0, equal_left = 0, middle = 0, equal_right = 0, greater = 0;
4252     bool single_basement_node = false;
4253     FTNODE node = NULL;
4254     {
4255         uint32_t fullhash;
4256         CACHEKEY root_key;
4257         toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
4258         toku_pin_ftnode(
4259             ft_handle->ft,
4260             root_key,
4261             fullhash,
4262             &match_bfe,
4263             PL_READ,  // may_modify_node, cannot change root during keyrange
4264             &node,
4265             true);
4266     }
4267 
4268     struct unlock_ftnode_extra unlock_extra = {ft_handle, node, false};
4269     struct unlockers unlockers = {
4270         true, unlock_ftnode_fun, (void *)&unlock_extra, (UNLOCKERS)NULL};
4271 
4272     {
4273         int r;
4274         int64_t numrows = ft_handle->ft->in_memory_logical_rows;
4275         if (numrows < 0)
4276             numrows = 0;  // prevent appearance of a negative number
4277         r = toku_ft_keysrange_internal(ft_handle,
4278                                        node,
4279                                        key_left,
4280                                        key_right,
4281                                        true,
4282                                        &less,
4283                                        &equal_left,
4284                                        &middle,
4285                                        &equal_right,
4286                                        &greater,
4287                                        &single_basement_node,
4288                                        numrows,
4289                                        &min_bfe,
4290                                        &match_bfe,
4291                                        &unlockers,
4292                                        (ANCESTORS)NULL,
4293                                        pivot_bounds::infinite_bounds());
4294         assert(r == 0 || r == TOKUDB_TRY_AGAIN);
4295         if (r == TOKUDB_TRY_AGAIN) {
4296             assert(!unlockers.locked);
4297             goto try_again;
4298         }
4299         // May need to do a second query.
4300         if (!single_basement_node && key_right != nullptr) {
4301             // "greater" is stored in "middle"
4302             invariant_zero(equal_right);
4303             invariant_zero(greater);
4304             uint64_t less2 = 0, equal_left2 = 0, middle2 = 0, equal_right2 = 0,
4305                      greater2 = 0;
4306             bool ignore;
4307             r = toku_ft_keysrange_internal(ft_handle,
4308                                            node,
4309                                            key_right,
4310                                            nullptr,
4311                                            false,
4312                                            &less2,
4313                                            &equal_left2,
4314                                            &middle2,
4315                                            &equal_right2,
4316                                            &greater2,
4317                                            &ignore,
4318                                            numrows,
4319                                            &min_bfe,
4320                                            &match_bfe,
4321                                            &unlockers,
4322                                            (ANCESTORS) nullptr,
4323                                            pivot_bounds::infinite_bounds());
4324             assert(r == 0 || r == TOKUDB_TRY_AGAIN);
4325             if (r == TOKUDB_TRY_AGAIN) {
4326                 assert(!unlockers.locked);
4327                 goto try_again;
4328             }
4329             invariant_zero(equal_right2);
4330             invariant_zero(greater2);
4331             // Update numbers.
4332             // less is already correct.
4333             // equal_left is already correct.
4334 
4335             // "middle" currently holds everything greater than left_key in
4336             // first query
4337             // 'middle2' currently holds everything greater than right_key in
4338             // second query
4339             // 'equal_left2' is how many match right_key
4340 
4341             // Prevent underflow.
4342             if (middle >= equal_left2 + middle2) {
4343                 middle -= equal_left2 + middle2;
4344             } else {
4345                 middle = 0;
4346             }
4347             equal_right = equal_left2;
4348             greater = middle2;
4349         }
4350     }
4351     assert(unlockers.locked);
4352     toku_unpin_ftnode_read_only(ft_handle->ft, node);
4353     if (!key_right) {
4354         paranoid_invariant_zero(equal_right);
4355         paranoid_invariant_zero(greater);
4356     }
4357     if (!key_left) {
4358         paranoid_invariant_zero(less);
4359         paranoid_invariant_zero(equal_left);
4360     }
4361     *less_p = less;
4362     *equal_left_p = equal_left;
4363     *middle_p = middle;
4364     *equal_right_p = equal_right;
4365     *greater_p = greater;
4366     *middle_3_exact_p = single_basement_node;
4367 }
4368 }
4369 
4370 struct get_key_after_bytes_iterate_extra {
4371     uint64_t skip_len;
4372     uint64_t *skipped;
4373     void (*callback)(const DBT *, uint64_t, void *);
4374     void *cb_extra;
4375 };
4376 
get_key_after_bytes_iterate(const void * key,const uint32_t keylen,const LEAFENTRY & le,const uint32_t UU (idx),struct get_key_after_bytes_iterate_extra * const e)4377 static int get_key_after_bytes_iterate(const void* key, const uint32_t keylen, const LEAFENTRY & le, const uint32_t UU(idx), struct get_key_after_bytes_iterate_extra * const e) {
4378     // only checking the latest val, mvcc will make this inaccurate
4379     uint64_t pairlen = keylen + le_latest_vallen(le);
4380     if (*e->skipped + pairlen > e->skip_len) {
4381         // found our key!
4382         DBT end_key;
4383         toku_fill_dbt(&end_key, key, keylen);
4384         e->callback(&end_key, *e->skipped, e->cb_extra);
4385         return 1;
4386     } else {
4387         *e->skipped += pairlen;
4388         return 0;
4389     }
4390 }
4391 
get_key_after_bytes_in_basementnode(FT ft,BASEMENTNODE bn,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT *,uint64_t,void *),void * cb_extra,uint64_t * skipped)4392 static int get_key_after_bytes_in_basementnode(FT ft, BASEMENTNODE bn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4393     int r;
4394     uint32_t idx_left = 0;
4395     if (start_key != nullptr) {
4396         struct keyrange_compare_s cmp = {ft, start_key};
4397         r = bn->data_buffer.find_zero<decltype(cmp), keyrange_compare>(cmp, nullptr, nullptr, nullptr, &idx_left);
4398         assert(r == 0 || r == DB_NOTFOUND);
4399     }
4400     struct get_key_after_bytes_iterate_extra iter_extra = {skip_len, skipped, callback, cb_extra};
4401     r = bn->data_buffer.iterate_on_range<get_key_after_bytes_iterate_extra, get_key_after_bytes_iterate>(idx_left, bn->data_buffer.num_klpairs(), &iter_extra);
4402 
4403     // Invert the sense of r == 0 (meaning the iterate finished, which means we didn't find what we wanted)
4404     if (r == 1) {
4405         r = 0;
4406     } else {
4407         r = DB_NOTFOUND;
4408     }
4409     return r;
4410 }
4411 
4412 static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped);
4413 
get_key_after_bytes_in_child(FT_HANDLE ft_h,FT ft,FTNODE node,UNLOCKERS unlockers,ANCESTORS ancestors,const pivot_bounds & bounds,ftnode_fetch_extra * bfe,ft_search * search,int childnum,uint64_t subtree_bytes,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT *,uint64_t,void *),void * cb_extra,uint64_t * skipped)4414 static int get_key_after_bytes_in_child(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, int childnum, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4415     int r;
4416     struct ancestors next_ancestors = {node, childnum, ancestors};
4417     BLOCKNUM childblocknum = BP_BLOCKNUM(node, childnum);
4418     uint32_t fullhash = compute_child_fullhash(ft->cf, node, childnum);
4419     FTNODE child;
4420     bool msgs_applied = false;
4421     r = toku_pin_ftnode_for_query(ft_h, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, false, &child, &msgs_applied);
4422     paranoid_invariant(!msgs_applied);
4423     if (r == TOKUDB_TRY_AGAIN) {
4424         return r;
4425     }
4426     assert_zero(r);
4427     struct unlock_ftnode_extra unlock_extra = {ft_h, child, false};
4428     struct unlockers next_unlockers = {true, unlock_ftnode_fun, (void *) &unlock_extra, unlockers};
4429     const pivot_bounds next_bounds = bounds.next_bounds(node, childnum);
4430     return get_key_after_bytes_in_subtree(ft_h, ft, child, &next_unlockers, &next_ancestors, next_bounds, bfe, search, subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
4431 }
4432 
get_key_after_bytes_in_subtree(FT_HANDLE ft_h,FT ft,FTNODE node,UNLOCKERS unlockers,ANCESTORS ancestors,const pivot_bounds & bounds,ftnode_fetch_extra * bfe,ft_search * search,uint64_t subtree_bytes,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT *,uint64_t,void *),void * cb_extra,uint64_t * skipped)4433 static int get_key_after_bytes_in_subtree(FT_HANDLE ft_h, FT ft, FTNODE node, UNLOCKERS unlockers, ANCESTORS ancestors, const pivot_bounds &bounds, ftnode_fetch_extra *bfe, ft_search *search, uint64_t subtree_bytes, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *, uint64_t, void *), void *cb_extra, uint64_t *skipped) {
4434     int r;
4435     int childnum = toku_ft_search_which_child(ft->cmp, node, search);
4436     const uint64_t child_subtree_bytes = subtree_bytes / node->n_children;
4437     if (node->height == 0) {
4438         r = DB_NOTFOUND;
4439         for (int i = childnum; r == DB_NOTFOUND && i < node->n_children; ++i) {
4440             // The theory here is that a leaf node could only be very
4441             // unbalanced if it's dirty, which means all its basements are
4442             // available.  So if a basement node is available, we should
4443             // check it as carefully as possible, but if it's compressed
4444             // or on disk, then it should be fairly well balanced so we
4445             // can trust the fanout calculation.
4446             if (BP_STATE(node, i) == PT_AVAIL) {
4447                 r = get_key_after_bytes_in_basementnode(ft, BLB(node, i), (i == childnum) ? start_key : nullptr, skip_len, callback, cb_extra, skipped);
4448             } else {
4449                 *skipped += child_subtree_bytes;
4450                 if (*skipped >= skip_len && i < node->n_children - 1) {
4451                     DBT pivot;
4452                     callback(node->pivotkeys.fill_pivot(i, &pivot), *skipped, cb_extra);
4453                     r = 0;
4454                 }
4455                 // Otherwise, r is still DB_NOTFOUND.  If this is the last
4456                 // basement node, we'll return DB_NOTFOUND and that's ok.
4457                 // Some ancestor in the call stack will check the next
4458                 // node over and that will call the callback, or if no
4459                 // such node exists, we're at the max key and we should
4460                 // return DB_NOTFOUND up to the top.
4461             }
4462         }
4463     } else {
4464         r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, childnum, child_subtree_bytes, start_key, skip_len, callback, cb_extra, skipped);
4465         for (int i = childnum + 1; r == DB_NOTFOUND && i < node->n_children; ++i) {
4466             if (*skipped + child_subtree_bytes < skip_len) {
4467                 *skipped += child_subtree_bytes;
4468             } else {
4469                 r = get_key_after_bytes_in_child(ft_h, ft, node, unlockers, ancestors, bounds, bfe, search, i, child_subtree_bytes, nullptr, skip_len, callback, cb_extra, skipped);
4470             }
4471         }
4472     }
4473 
4474     if (r != TOKUDB_TRY_AGAIN) {
4475         assert(unlockers->locked);
4476         toku_unpin_ftnode_read_only(ft, node);
4477         unlockers->locked = false;
4478     }
4479     return r;
4480 }
4481 
toku_ft_get_key_after_bytes(FT_HANDLE ft_h,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT * end_key,uint64_t actually_skipped,void * extra),void * cb_extra)4482 int toku_ft_get_key_after_bytes(FT_HANDLE ft_h, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *extra), void *cb_extra)
4483 // Effect:
4484 //  Call callback with end_key set to the largest key such that the sum of the sizes of the key/val pairs in the range [start_key, end_key) is <= skip_len.
4485 //  Call callback with actually_skipped set to the sum of the sizes of the key/val pairs in the range [start_key, end_key).
4486 // Notes:
4487 //  start_key == nullptr is interpreted as negative infinity.
4488 //  end_key == nullptr is interpreted as positive infinity.
4489 //  Only the latest val is counted toward the size, in the case of MVCC data.
4490 // Implementation:
4491 //  This is an estimated calculation.  We assume for a node that each of its subtrees have equal size.  If the tree is a single basement node, then we will be accurate, but otherwise we could be quite off.
4492 // Returns:
4493 //  0 on success
4494 //  an error code otherwise
4495 {
4496     FT ft = ft_h->ft;
4497     ftnode_fetch_extra bfe;
4498     bfe.create_for_min_read(ft);
4499     while (true) {
4500         FTNODE root;
4501         {
4502             uint32_t fullhash;
4503             CACHEKEY root_key;
4504             toku_calculate_root_offset_pointer(ft, &root_key, &fullhash);
4505             toku_pin_ftnode(ft, root_key, fullhash, &bfe, PL_READ, &root, true);
4506         }
4507         struct unlock_ftnode_extra unlock_extra = {ft_h, root, false};
4508         struct unlockers unlockers = {true, unlock_ftnode_fun, (void*)&unlock_extra, (UNLOCKERS) nullptr};
4509         ft_search search;
4510         ft_search_init(&search, (start_key == nullptr ? toku_ft_cursor_compare_one : toku_ft_cursor_compare_set_range), FT_SEARCH_LEFT, start_key, nullptr, ft_h);
4511 
4512         int r;
4513         // We can't do this because of #5768, there may be dictionaries in the wild that have negative stats.  This won't affect mongo so it's ok:
4514         //paranoid_invariant(ft->in_memory_stats.numbytes >= 0);
4515         int64_t numbytes = ft->in_memory_stats.numbytes;
4516         if (numbytes < 0) {
4517             numbytes = 0;
4518         }
4519         uint64_t skipped = 0;
4520         r = get_key_after_bytes_in_subtree(ft_h, ft, root, &unlockers, nullptr, pivot_bounds::infinite_bounds(), &bfe, &search, (uint64_t) numbytes, start_key, skip_len, callback, cb_extra, &skipped);
4521         assert(!unlockers.locked);
4522         if (r != TOKUDB_TRY_AGAIN) {
4523             if (r == DB_NOTFOUND) {
4524                 callback(nullptr, skipped, cb_extra);
4525                 r = 0;
4526             }
4527             return r;
4528         }
4529     }
4530 }
4531 
4532 //Test-only wrapper for the old one-key range function
toku_ft_keyrange(FT_HANDLE ft_handle,DBT * key,uint64_t * less,uint64_t * equal,uint64_t * greater)4533 void toku_ft_keyrange(FT_HANDLE ft_handle, DBT *key, uint64_t *less,  uint64_t *equal,  uint64_t *greater) {
4534     uint64_t zero_equal_right, zero_greater;
4535     bool ignore;
4536     toku_ft_keysrange(ft_handle, key, nullptr, less, equal, greater, &zero_equal_right, &zero_greater, &ignore);
4537     invariant_zero(zero_equal_right);
4538     invariant_zero(zero_greater);
4539 }
4540 
toku_ft_handle_stat64(FT_HANDLE ft_handle,TOKUTXN UU (txn),struct ftstat64_s * s)4541 void toku_ft_handle_stat64 (FT_HANDLE ft_handle, TOKUTXN UU(txn), struct ftstat64_s *s) {
4542     toku_ft_stat64(ft_handle->ft, s);
4543 }
4544 
toku_ft_handle_get_fractal_tree_info64(FT_HANDLE ft_h,struct ftinfo64 * s)4545 void toku_ft_handle_get_fractal_tree_info64(FT_HANDLE ft_h, struct ftinfo64 *s) {
4546     toku_ft_get_fractal_tree_info64(ft_h->ft, s);
4547 }
4548 
toku_ft_handle_iterate_fractal_tree_block_map(FT_HANDLE ft_h,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)4549 int toku_ft_handle_iterate_fractal_tree_block_map(FT_HANDLE ft_h, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
4550     return toku_ft_iterate_fractal_tree_block_map(ft_h->ft, iter, iter_extra);
4551 }
4552 
4553 /* ********************* debugging dump ************************ */
4554 static int
toku_dump_ftnode(FILE * file,FT_HANDLE ft_handle,BLOCKNUM blocknum,int depth,const DBT * lorange,const DBT * hirange)4555 toku_dump_ftnode (FILE *file, FT_HANDLE ft_handle, BLOCKNUM blocknum, int depth, const DBT *lorange, const DBT *hirange) {
4556     int result=0;
4557     FTNODE node;
4558     toku_get_node_for_verify(blocknum, ft_handle, &node);
4559     result=toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, node, -1, lorange, hirange, NULL, NULL, 0, 1, 0);
4560     uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum);
4561     ftnode_fetch_extra bfe;
4562     bfe.create_for_full_read(ft_handle->ft);
4563     toku_pin_ftnode(
4564         ft_handle->ft,
4565         blocknum,
4566         fullhash,
4567         &bfe,
4568         PL_WRITE_EXPENSIVE,
4569         &node,
4570         true
4571         );
4572     assert(node->fullhash==fullhash);
4573     fprintf(file, "%*sNode=%p\n", depth, "", node);
4574 
4575     fprintf(file, "%*sNode %" PRId64 " height=%d n_children=%d  keyrange=%s %s\n",
4576             depth, "", blocknum.b, node->height, node->n_children, (char*)(lorange ? lorange->data : 0), (char*)(hirange ? hirange->data : 0));
4577     {
4578         int i;
4579         for (i=0; i+1< node->n_children; i++) {
4580             fprintf(file, "%*spivotkey %d =", depth+1, "", i);
4581             toku_print_BYTESTRING(file, node->pivotkeys.get_pivot(i).size, (char *) node->pivotkeys.get_pivot(i).data);
4582             fprintf(file, "\n");
4583         }
4584         for (i=0; i< node->n_children; i++) {
4585             if (node->height > 0) {
4586                 NONLEAF_CHILDINFO bnc = BNC(node, i);
4587                 fprintf(file, "%*schild %d buffered (%d entries):", depth+1, "", i, toku_bnc_n_entries(bnc));
4588                 struct print_msg_fn {
4589                     FILE *file;
4590                     int depth;
4591                     print_msg_fn(FILE *f, int d) : file(f), depth(d) { }
4592                     int operator()(const ft_msg &msg, bool UU(is_fresh)) {
4593                         fprintf(file, "%*s xid=%" PRIu64 " %u (type=%d) msn=0x%" PRIu64 "\n",
4594                                       depth+2, "",
4595                                       toku_xids_get_innermost_xid(msg.xids()),
4596                                       static_cast<unsigned>(toku_dtoh32(*(int*)msg.kdbt()->data)),
4597                                       msg.type(), msg.msn().msn);
4598                         return 0;
4599                     }
4600                 } print_fn(file, depth);
4601                 bnc->msg_buffer.iterate(print_fn);
4602             }
4603             else {
4604                 int size = BLB_DATA(node, i)->num_klpairs();
4605                 if (0)
4606                     for (int j=0; j<size; j++) {
4607                         LEAFENTRY le;
4608                         void* keyp = NULL;
4609                         uint32_t keylen = 0;
4610                         int r = BLB_DATA(node,i)->fetch_klpair(j, &le, &keylen, &keyp);
4611                         assert_zero(r);
4612                         fprintf(file, " [%d]=", j);
4613                         print_klpair(file, keyp, keylen, le);
4614                         fprintf(file, "\n");
4615                     }
4616                 fprintf(file, "\n");
4617             }
4618         }
4619         if (node->height > 0) {
4620             for (i=0; i<node->n_children; i++) {
4621                 fprintf(file, "%*schild %d\n", depth, "", i);
4622                 if (i>0) {
4623                     char *CAST_FROM_VOIDP(key, node->pivotkeys.get_pivot(i - 1).data);
4624                     fprintf(file, "%*spivot %d len=%u %u\n", depth+1, "", i-1, node->pivotkeys.get_pivot(i - 1).size, (unsigned)toku_dtoh32(*(int*)key));
4625                 }
4626                 DBT x, y;
4627                 toku_dump_ftnode(file, ft_handle, BP_BLOCKNUM(node, i), depth+4,
4628                                   (i==0) ? lorange : node->pivotkeys.fill_pivot(i - 1, &x),
4629                                   (i==node->n_children-1) ? hirange : node->pivotkeys.fill_pivot(i, &y));
4630             }
4631         }
4632     }
4633     toku_unpin_ftnode(ft_handle->ft, node);
4634     return result;
4635 }
4636 
toku_dump_ft(FILE * f,FT_HANDLE ft_handle)4637 int toku_dump_ft(FILE *f, FT_HANDLE ft_handle) {
4638     FT ft = ft_handle->ft;
4639     invariant_notnull(ft);
4640     ft->blocktable.dump_translation_table(f);
4641 
4642     uint32_t fullhash = 0;
4643     CACHEKEY root_key;
4644     toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
4645     return toku_dump_ftnode(f, ft_handle, root_key, 0, 0, 0);
4646 }
4647 
4648 
toku_pfs_keys_init(const char * toku_instr_group_name)4649 static void toku_pfs_keys_init(const char *toku_instr_group_name) {
4650     kibbutz_mutex_key = new toku_instr_key(
4651         toku_instr_object_type::mutex, toku_instr_group_name, "kibbutz_mutex");
4652     minicron_p_mutex_key = new toku_instr_key(
4653         toku_instr_object_type::mutex, toku_instr_group_name,
4654         "minicron_p_mutex");
4655     queue_result_mutex_key = new toku_instr_key(
4656         toku_instr_object_type::mutex, toku_instr_group_name,
4657         "queue_result_mutex");
4658     tpool_lock_mutex_key = new toku_instr_key(
4659         toku_instr_object_type::mutex, toku_instr_group_name,
4660         "tpool_lock_mutex");
4661     workset_lock_mutex_key = new toku_instr_key(
4662         toku_instr_object_type::mutex, toku_instr_group_name,
4663         "workset_lock_mutex");
4664     bjm_jobs_lock_mutex_key = new toku_instr_key(
4665         toku_instr_object_type::mutex, toku_instr_group_name,
4666         "bjm_jobs_lock_mutex");
4667     log_internal_lock_mutex_key = new toku_instr_key(
4668         toku_instr_object_type::mutex, toku_instr_group_name,
4669         "log_internal_lock_mutex");
4670     cachetable_ev_thread_lock_mutex_key =
4671         new toku_instr_key(toku_instr_object_type::mutex,
4672                            toku_instr_group_name,
4673                            "cachetable_ev_thread_lock_mutex");
4674     cachetable_disk_nb_mutex_key = new toku_instr_key(
4675         toku_instr_object_type::mutex, toku_instr_group_name,
4676         "cachetable_disk_nb_mutex");
4677     safe_file_size_lock_mutex_key = new toku_instr_key(
4678         toku_instr_object_type::mutex, toku_instr_group_name,
4679         "safe_file_size_lock_mutex");
4680     cachetable_m_mutex_key = new toku_instr_key(
4681         toku_instr_object_type::mutex, toku_instr_group_name,
4682         "cachetable_m_mutex_key");
4683     checkpoint_safe_mutex_key = new toku_instr_key(
4684         toku_instr_object_type::mutex, toku_instr_group_name,
4685         "checkpoint_safe_mutex");
4686     ft_ref_lock_mutex_key = new toku_instr_key(
4687         toku_instr_object_type::mutex, toku_instr_group_name,
4688         "ft_ref_lock_mutex");
4689     ft_open_close_lock_mutex_key = new toku_instr_key(
4690         toku_instr_object_type::mutex, toku_instr_group_name,
4691         "ft_open_close_lock_mutex");
4692     loader_error_mutex_key = new toku_instr_key(
4693         toku_instr_object_type::mutex, toku_instr_group_name,
4694         "loader_error_mutex");
4695     bfs_mutex_key =
4696         new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name,
4697         "bfs_mutex");
4698     loader_bl_mutex_key = new toku_instr_key(
4699         toku_instr_object_type::mutex, toku_instr_group_name,
4700         "loader_bl_mutex");
4701     loader_fi_lock_mutex_key = new toku_instr_key(
4702         toku_instr_object_type::mutex, toku_instr_group_name,
4703         "loader_fi_lock_mutex");
4704     loader_out_mutex_key = new toku_instr_key(
4705         toku_instr_object_type::mutex, toku_instr_group_name,
4706         "loader_out_mutex");
4707     result_output_condition_lock_mutex_key =
4708         new toku_instr_key(toku_instr_object_type::mutex,
4709                            toku_instr_group_name,
4710                            "result_output_condition_lock_mutex");
4711     block_table_mutex_key = new toku_instr_key(
4712         toku_instr_object_type::mutex, toku_instr_group_name,
4713         "block_table_mutex");
4714     rollback_log_node_cache_mutex_key = new toku_instr_key(
4715         toku_instr_object_type::mutex, toku_instr_group_name,
4716         "rollback_log_node_cache_mutex");
4717     txn_lock_mutex_key = new toku_instr_key(
4718         toku_instr_object_type::mutex, toku_instr_group_name, "txn_lock_mutex");
4719     txn_state_lock_mutex_key = new toku_instr_key(
4720         toku_instr_object_type::mutex, toku_instr_group_name,
4721         "txn_state_lock_mutex");
4722     txn_child_manager_mutex_key = new toku_instr_key(
4723         toku_instr_object_type::mutex, toku_instr_group_name,
4724         "txn_child_manager_mutex");
4725     txn_manager_lock_mutex_key = new toku_instr_key(
4726         toku_instr_object_type::mutex, toku_instr_group_name,
4727         "txn_manager_lock_mutex");
4728     treenode_mutex_key = new toku_instr_key(
4729         toku_instr_object_type::mutex, toku_instr_group_name, "treenode_mutex");
4730     locktree_request_info_mutex_key = new toku_instr_key(
4731         toku_instr_object_type::mutex, toku_instr_group_name,
4732         "locktree_request_info_mutex");
4733     locktree_request_info_retry_mutex_key = new toku_instr_key(
4734         toku_instr_object_type::mutex, toku_instr_group_name,
4735         "locktree_request_info_retry_mutex_key");
4736     manager_mutex_key = new toku_instr_key(
4737         toku_instr_object_type::mutex, toku_instr_group_name, "manager_mutex");
4738     manager_escalation_mutex_key = new toku_instr_key(
4739         toku_instr_object_type::mutex, toku_instr_group_name,
4740         "manager_escalation_mutex");
4741     db_txn_struct_i_txn_mutex_key = new toku_instr_key(
4742         toku_instr_object_type::mutex, toku_instr_group_name,
4743         "db_txn_struct_i_txn_mutex");
4744     manager_escalator_mutex_key = new toku_instr_key(
4745         toku_instr_object_type::mutex, toku_instr_group_name,
4746         "manager_escalator_mutex");
4747     indexer_i_indexer_lock_mutex_key = new toku_instr_key(
4748         toku_instr_object_type::mutex, toku_instr_group_name,
4749         "indexer_i_indexer_lock_mutex");
4750     indexer_i_indexer_estimate_lock_mutex_key =
4751         new toku_instr_key(toku_instr_object_type::mutex,
4752                            toku_instr_group_name,
4753                            "indexer_i_indexer_estimate_lock_mutex");
4754 
4755     tokudb_file_data_key = new toku_instr_key(
4756         toku_instr_object_type::file, toku_instr_group_name, "tokudb_data_file");
4757     tokudb_file_load_key = new toku_instr_key(
4758         toku_instr_object_type::file, toku_instr_group_name, "tokudb_load_file");
4759     tokudb_file_tmp_key = new toku_instr_key(
4760         toku_instr_object_type::file, toku_instr_group_name, "tokudb_tmp_file");
4761     tokudb_file_log_key = new toku_instr_key(
4762         toku_instr_object_type::file, toku_instr_group_name, "tokudb_log_file");
4763 
4764     fti_probe_1_key =
4765         new toku_instr_key(toku_instr_object_type::mutex, toku_instr_group_name,
4766         "fti_probe_1");
4767 
4768     extractor_thread_key = new toku_instr_key(
4769         toku_instr_object_type::thread, toku_instr_group_name,
4770         "extractor_thread");
4771     fractal_thread_key = new toku_instr_key(
4772         toku_instr_object_type::thread, toku_instr_group_name, "fractal_thread");
4773     io_thread_key =
4774         new toku_instr_key(toku_instr_object_type::thread, toku_instr_group_name,
4775         "io_thread");
4776     eviction_thread_key = new toku_instr_key(
4777         toku_instr_object_type::thread, toku_instr_group_name,
4778         "eviction_thread");
4779     kibbutz_thread_key = new toku_instr_key(
4780         toku_instr_object_type::thread, toku_instr_group_name, "kibbutz_thread");
4781     minicron_thread_key = new toku_instr_key(
4782         toku_instr_object_type::thread, toku_instr_group_name,
4783         "minicron_thread");
4784     tp_internal_thread_key = new toku_instr_key(
4785         toku_instr_object_type::thread, toku_instr_group_name,
4786         "tp_internal_thread");
4787 
4788     result_state_cond_key = new toku_instr_key(
4789         toku_instr_object_type::cond, toku_instr_group_name,
4790         "result_state_cond");
4791     bjm_jobs_wait_key = new toku_instr_key(
4792         toku_instr_object_type::cond, toku_instr_group_name, "bjm_jobs_wait");
4793     cachetable_p_refcount_wait_key = new toku_instr_key(
4794         toku_instr_object_type::cond, toku_instr_group_name,
4795         "cachetable_p_refcount_wait");
4796     cachetable_m_flow_control_cond_key = new toku_instr_key(
4797         toku_instr_object_type::cond, toku_instr_group_name,
4798         "cachetable_m_flow_control_cond");
4799     cachetable_m_ev_thread_cond_key = new toku_instr_key(
4800         toku_instr_object_type::cond, toku_instr_group_name,
4801         "cachetable_m_ev_thread_cond");
4802     bfs_cond_key =
4803         new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name,
4804         "bfs_cond");
4805     result_output_condition_key = new toku_instr_key(
4806         toku_instr_object_type::cond, toku_instr_group_name,
4807         "result_output_condition");
4808     manager_m_escalator_done_key = new toku_instr_key(
4809         toku_instr_object_type::cond, toku_instr_group_name,
4810         "manager_m_escalator_done");
4811     lock_request_m_wait_cond_key = new toku_instr_key(
4812         toku_instr_object_type::cond, toku_instr_group_name,
4813         "lock_request_m_wait_cond");
4814     queue_result_cond_key = new toku_instr_key(
4815         toku_instr_object_type::cond, toku_instr_group_name,
4816         "queue_result_cond");
4817     ws_worker_wait_key = new toku_instr_key(
4818         toku_instr_object_type::cond, toku_instr_group_name, "ws_worker_wait");
4819     rwlock_wait_read_key = new toku_instr_key(
4820         toku_instr_object_type::cond, toku_instr_group_name, "rwlock_wait_read");
4821     rwlock_wait_write_key = new toku_instr_key(
4822         toku_instr_object_type::cond, toku_instr_group_name,
4823         "rwlock_wait_write");
4824     rwlock_cond_key =
4825         new toku_instr_key(toku_instr_object_type::cond, toku_instr_group_name,
4826         "rwlock_cond");
4827     tp_thread_wait_key = new toku_instr_key(
4828         toku_instr_object_type::cond, toku_instr_group_name, "tp_thread_wait");
4829     tp_pool_wait_free_key = new toku_instr_key(
4830         toku_instr_object_type::cond, toku_instr_group_name,
4831         "tp_pool_wait_free");
4832     frwlock_m_wait_read_key = new toku_instr_key(
4833         toku_instr_object_type::cond, toku_instr_group_name,
4834         "frwlock_m_wait_read");
4835     kibbutz_k_cond_key = new toku_instr_key(
4836         toku_instr_object_type::cond, toku_instr_group_name, "kibbutz_k_cond");
4837     minicron_p_condvar_key = new toku_instr_key(
4838         toku_instr_object_type::cond, toku_instr_group_name,
4839         "minicron_p_condvar");
4840     locktree_request_info_retry_cv_key = new toku_instr_key(
4841         toku_instr_object_type::cond, toku_instr_group_name,
4842         "locktree_request_info_retry_cv_key");
4843 
4844     multi_operation_lock_key = new toku_instr_key(
4845         toku_instr_object_type::rwlock, toku_instr_group_name,
4846         "multi_operation_lock");
4847     low_priority_multi_operation_lock_key =
4848         new toku_instr_key(toku_instr_object_type::rwlock,
4849                            toku_instr_group_name,
4850                            "low_priority_multi_operation_lock");
4851     cachetable_m_list_lock_key = new toku_instr_key(
4852         toku_instr_object_type::rwlock, toku_instr_group_name,
4853         "cachetable_m_list_lock");
4854     cachetable_m_pending_lock_expensive_key =
4855         new toku_instr_key(toku_instr_object_type::rwlock,
4856                            toku_instr_group_name,
4857                            "cachetable_m_pending_lock_expensive");
4858     cachetable_m_pending_lock_cheap_key =
4859         new toku_instr_key(toku_instr_object_type::rwlock,
4860                            toku_instr_group_name,
4861                            "cachetable_m_pending_lock_cheap");
4862     cachetable_m_lock_key = new toku_instr_key(
4863         toku_instr_object_type::rwlock, toku_instr_group_name,
4864         "cachetable_m_lock");
4865     result_i_open_dbs_rwlock_key = new toku_instr_key(
4866         toku_instr_object_type::rwlock, toku_instr_group_name,
4867         "result_i_open_dbs_rwlock");
4868     checkpoint_safe_rwlock_key = new toku_instr_key(
4869         toku_instr_object_type::rwlock, toku_instr_group_name,
4870         "checkpoint_safe_rwlock");
4871     cachetable_value_key = new toku_instr_key(
4872         toku_instr_object_type::rwlock, toku_instr_group_name,
4873         "cachetable_value");
4874     safe_file_size_lock_rwlock_key = new toku_instr_key(
4875         toku_instr_object_type::rwlock, toku_instr_group_name,
4876         "safe_file_size_lock_rwlock");
4877     cachetable_disk_nb_rwlock_key = new toku_instr_key(
4878         toku_instr_object_type::rwlock, toku_instr_group_name,
4879         "cachetable_disk_nb_rwlock");
4880 
4881     toku_instr_probe_1 = new toku_instr_probe(*fti_probe_1_key);
4882 }
4883 
toku_pfs_keys_destroy(void)4884 static void toku_pfs_keys_destroy(void) {
4885     delete kibbutz_mutex_key;
4886     delete minicron_p_mutex_key;
4887     delete queue_result_mutex_key;
4888     delete tpool_lock_mutex_key;
4889     delete workset_lock_mutex_key;
4890     delete bjm_jobs_lock_mutex_key;
4891     delete log_internal_lock_mutex_key;
4892     delete cachetable_ev_thread_lock_mutex_key;
4893     delete cachetable_disk_nb_mutex_key;
4894     delete safe_file_size_lock_mutex_key;
4895     delete cachetable_m_mutex_key;
4896     delete checkpoint_safe_mutex_key;
4897     delete ft_ref_lock_mutex_key;
4898     delete ft_open_close_lock_mutex_key;
4899     delete loader_error_mutex_key;
4900     delete bfs_mutex_key;
4901     delete loader_bl_mutex_key;
4902     delete loader_fi_lock_mutex_key;
4903     delete loader_out_mutex_key;
4904     delete result_output_condition_lock_mutex_key;
4905     delete block_table_mutex_key;
4906     delete rollback_log_node_cache_mutex_key;
4907     delete txn_lock_mutex_key;
4908     delete txn_state_lock_mutex_key;
4909     delete txn_child_manager_mutex_key;
4910     delete txn_manager_lock_mutex_key;
4911     delete treenode_mutex_key;
4912     delete locktree_request_info_mutex_key;
4913     delete locktree_request_info_retry_mutex_key;
4914     delete manager_mutex_key;
4915     delete manager_escalation_mutex_key;
4916     delete db_txn_struct_i_txn_mutex_key;
4917     delete manager_escalator_mutex_key;
4918     delete indexer_i_indexer_lock_mutex_key;
4919     delete indexer_i_indexer_estimate_lock_mutex_key;
4920 
4921     delete tokudb_file_data_key;
4922     delete tokudb_file_load_key;
4923     delete tokudb_file_tmp_key;
4924     delete tokudb_file_log_key;
4925 
4926     delete fti_probe_1_key;
4927 
4928     delete extractor_thread_key;
4929     delete fractal_thread_key;
4930     delete io_thread_key;
4931     delete eviction_thread_key;
4932     delete kibbutz_thread_key;
4933     delete minicron_thread_key;
4934     delete tp_internal_thread_key;
4935 
4936     delete result_state_cond_key;
4937     delete bjm_jobs_wait_key;
4938     delete cachetable_p_refcount_wait_key;
4939     delete cachetable_m_flow_control_cond_key;
4940     delete cachetable_m_ev_thread_cond_key;
4941     delete bfs_cond_key;
4942     delete result_output_condition_key;
4943     delete manager_m_escalator_done_key;
4944     delete lock_request_m_wait_cond_key;
4945     delete queue_result_cond_key;
4946     delete ws_worker_wait_key;
4947     delete rwlock_wait_read_key;
4948     delete rwlock_wait_write_key;
4949     delete rwlock_cond_key;
4950     delete tp_thread_wait_key;
4951     delete tp_pool_wait_free_key;
4952     delete frwlock_m_wait_read_key;
4953     delete kibbutz_k_cond_key;
4954     delete minicron_p_condvar_key;
4955     delete locktree_request_info_retry_cv_key;
4956 
4957     delete multi_operation_lock_key;
4958     delete low_priority_multi_operation_lock_key;
4959     delete cachetable_m_list_lock_key;
4960     delete cachetable_m_pending_lock_expensive_key;
4961     delete cachetable_m_pending_lock_cheap_key;
4962     delete cachetable_m_lock_key;
4963     delete result_i_open_dbs_rwlock_key;
4964     delete checkpoint_safe_rwlock_key;
4965     delete cachetable_value_key;
4966     delete safe_file_size_lock_rwlock_key;
4967 
4968     delete cachetable_disk_nb_rwlock_key;
4969     delete toku_instr_probe_1;
4970 }
4971 
toku_ft_layer_init(void)4972 int toku_ft_layer_init(void) {
4973     static bool ft_layer_init_started = false;
4974 
4975     if(ft_layer_init_started) {
4976         return 0;
4977     }
4978 
4979     ft_layer_init_started = true;
4980 
4981     int r = 0;
4982 
4983     // Portability must be initialized first
4984     r = toku_portability_init();
4985     assert(r==0);
4986     if (r) {
4987         goto exit;
4988     }
4989 
4990     toku_pfs_keys_init("fti");
4991 
4992     r = db_env_set_toku_product_name("tokudb");
4993     assert(r==0);
4994     if (r) {
4995         goto exit;
4996     }
4997 
4998     partitioned_counters_init();
4999     toku_status_init();
5000     toku_context_status_init();
5001     toku_checkpoint_init();
5002     toku_ft_serialize_layer_init();
5003     toku_mutex_init(
5004         *ft_open_close_lock_mutex_key, &ft_open_close_lock, nullptr);
5005     toku_scoped_malloc_init();
5006 exit:
5007     return r;
5008 }
5009 
toku_ft_layer_destroy(void)5010 void toku_ft_layer_destroy(void) {
5011     static bool ft_layer_destroy_started = false;
5012 
5013     if(ft_layer_destroy_started) {
5014         return;
5015     }
5016 
5017     ft_layer_destroy_started = true;
5018 
5019     toku_mutex_destroy(&ft_open_close_lock);
5020     toku_ft_serialize_layer_destroy();
5021     toku_checkpoint_destroy();
5022     toku_context_status_destroy();
5023     toku_status_destroy();
5024     partitioned_counters_destroy();
5025     toku_scoped_malloc_destroy();
5026     toku_pfs_keys_destroy();
5027 
5028     // Portability must be cleaned up last
5029     toku_portability_destroy();
5030 }
5031 
5032 // This lock serializes all opens and closes because the cachetable requires that clients do not try to open or close a cachefile in parallel.  We made
5033 // it coarser by not allowing any cachefiles to be open or closed in parallel.
toku_ft_open_close_lock(void)5034 void toku_ft_open_close_lock(void) {
5035     toku_mutex_lock(&ft_open_close_lock);
5036 }
5037 
toku_ft_open_close_unlock(void)5038 void toku_ft_open_close_unlock(void) {
5039     toku_mutex_unlock(&ft_open_close_lock);
5040 }
5041 
5042 // Prepare to remove a dictionary from the database when this transaction is committed:
5043 //  - mark transaction as NEED fsync on commit
5044 //  - make entry in rollback log
5045 //  - make fdelete entry in recovery log
5046 //
5047 // Effect: when the txn commits, the ft's cachefile will be marked as unlink
5048 //         on close. see toku_commit_fdelete and how unlink on close works
5049 //         in toku_cachefile_close();
5050 // Requires: serialized with begin checkpoint
5051 //           this does not need to take the open close lock because
5052 //           1.) the ft/cf cannot go away because we have a live handle.
5053 //           2.) we're not setting the unlink on close bit _here_. that
5054 //           happens on txn commit (as the name suggests).
5055 //           3.) we're already holding the multi operation lock to
5056 //           synchronize with begin checkpoint.
5057 // Contract: the iname of the ft should never be reused.
toku_ft_unlink_on_commit(FT_HANDLE handle,TOKUTXN txn)5058 void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn) {
5059     assert(txn);
5060 
5061     CACHEFILE cf = handle->ft->cf;
5062     FT CAST_FROM_VOIDP(ft, toku_cachefile_get_userdata(cf));
5063 
5064     toku_txn_maybe_note_ft(txn, ft);
5065 
5066     // If the txn commits, the commit MUST be in the log before the file is actually unlinked
5067     toku_txn_force_fsync_on_commit(txn);
5068     // make entry in rollback log
5069     FILENUM filenum = toku_cachefile_filenum(cf);
5070     toku_logger_save_rollback_fdelete(txn, filenum);
5071     // make entry in recovery log
5072     toku_logger_log_fdelete(txn, filenum);
5073 }
5074 
5075 // Non-transactional version of fdelete
5076 //
5077 // Effect: The ft file is unlinked when the handle closes and it's ft is not
5078 //         pinned by checkpoint. see toku_remove_ft_ref() and how unlink on
5079 //         close works in toku_cachefile_close();
5080 // Requires: serialized with begin checkpoint
toku_ft_unlink(FT_HANDLE handle)5081 void toku_ft_unlink(FT_HANDLE handle) {
5082     CACHEFILE cf;
5083     cf = handle->ft->cf;
5084     toku_cachefile_unlink_on_close(cf);
5085 }
5086 
toku_ft_rename_iname(DB_TXN * txn,const char * data_dir,const char * old_iname,const char * new_iname,CACHETABLE ct)5087 int toku_ft_rename_iname(DB_TXN *txn,
5088                          const char *data_dir,
5089                          const char *old_iname,
5090                          const char *new_iname,
5091                          CACHETABLE ct) {
5092     int r = 0;
5093 
5094     std::unique_ptr<char[], decltype(&toku_free)> new_iname_full(nullptr,
5095                                                                  &toku_free);
5096     std::unique_ptr<char[], decltype(&toku_free)> old_iname_full(nullptr,
5097                                                                  &toku_free);
5098 
5099     new_iname_full.reset(toku_construct_full_name(2, data_dir, new_iname));
5100     old_iname_full.reset(toku_construct_full_name(2, data_dir, old_iname));
5101 
5102     if (txn) {
5103         BYTESTRING bs_old_name = {static_cast<uint32_t>(strlen(old_iname) + 1),
5104                                   const_cast<char *>(old_iname)};
5105         BYTESTRING bs_new_name = {static_cast<uint32_t>(strlen(new_iname) + 1),
5106                                   const_cast<char *>(new_iname)};
5107         FILENUM filenum = FILENUM_NONE;
5108         {
5109             CACHEFILE cf;
5110             r = toku_cachefile_of_iname_in_env(ct, old_iname, &cf);
5111             if (r != ENOENT) {
5112                 char *old_fname_in_cf = toku_cachefile_fname_in_env(cf);
5113                 toku_cachefile_set_fname_in_env(cf, toku_xstrdup(new_iname));
5114                 toku_free(old_fname_in_cf);
5115                 filenum = toku_cachefile_filenum(cf);
5116             }
5117         }
5118         toku_logger_save_rollback_frename(
5119             db_txn_struct_i(txn)->tokutxn, &bs_old_name, &bs_new_name);
5120         toku_log_frename(db_txn_struct_i(txn)->tokutxn->logger,
5121                          (LSN *)0,
5122                          0,
5123                          toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn),
5124                          bs_old_name,
5125                          filenum,
5126                          bs_new_name);
5127     }
5128 
5129     if (!toku_create_subdirs_if_needed(new_iname_full.get()))
5130         return get_error_errno();
5131     r = toku_os_rename(old_iname_full.get(), new_iname_full.get());
5132     if (r != 0)
5133         return r;
5134     r = toku_fsync_directory(new_iname_full.get());
5135     return r;
5136 }
5137 
toku_ft_get_fragmentation(FT_HANDLE ft_handle,TOKU_DB_FRAGMENTATION report)5138 int toku_ft_get_fragmentation(FT_HANDLE ft_handle, TOKU_DB_FRAGMENTATION report) {
5139     int fd = toku_cachefile_get_fd(ft_handle->ft->cf);
5140     toku_ft_lock(ft_handle->ft);
5141 
5142     int64_t file_size;
5143     int r = toku_os_get_file_size(fd, &file_size);
5144     if (r == 0) {
5145         report->file_size_bytes = file_size;
5146         ft_handle->ft->blocktable.get_fragmentation_unlocked(report);
5147     }
5148     toku_ft_unlock(ft_handle->ft);
5149     return r;
5150 }
5151 
is_empty_fast_iter(FT_HANDLE ft_handle,FTNODE node)5152 static bool is_empty_fast_iter (FT_HANDLE ft_handle, FTNODE node) {
5153     if (node->height > 0) {
5154         for (int childnum=0; childnum<node->n_children; childnum++) {
5155             if (toku_bnc_nbytesinbuf(BNC(node, childnum)) != 0) {
5156                 return 0; // it's not empty if there are bytes in buffers
5157             }
5158             FTNODE childnode;
5159             {
5160                 BLOCKNUM childblocknum = BP_BLOCKNUM(node,childnum);
5161                 uint32_t fullhash =  compute_child_fullhash(ft_handle->ft->cf, node, childnum);
5162                 ftnode_fetch_extra bfe;
5163                 bfe.create_for_full_read(ft_handle->ft);
5164                 // don't need to pass in dependent nodes as we are not
5165                 // modifying nodes we are pinning
5166                 toku_pin_ftnode(
5167                     ft_handle->ft,
5168                     childblocknum,
5169                     fullhash,
5170                     &bfe,
5171                     PL_READ, // may_modify_node set to false, as nodes not modified
5172                     &childnode,
5173                     true
5174                     );
5175             }
5176             int child_is_empty = is_empty_fast_iter(ft_handle, childnode);
5177             toku_unpin_ftnode(ft_handle->ft, childnode);
5178             if (!child_is_empty) return 0;
5179         }
5180         return 1;
5181     } else {
5182         // leaf:  If the dmt is empty, we are happy.
5183         for (int i = 0; i < node->n_children; i++) {
5184             if (BLB_DATA(node, i)->num_klpairs()) {
5185                 return false;
5186             }
5187         }
5188         return true;
5189     }
5190 }
5191 
toku_ft_is_empty_fast(FT_HANDLE ft_handle)5192 bool toku_ft_is_empty_fast (FT_HANDLE ft_handle)
5193 // A fast check to see if the tree is empty.  If there are any messages or leafentries, we consider the tree to be nonempty.  It's possible that those
5194 // messages and leafentries would all optimize away and that the tree is empty, but we'll say it is nonempty.
5195 {
5196     uint32_t fullhash;
5197     FTNODE node;
5198     {
5199         CACHEKEY root_key;
5200         toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &fullhash);
5201         ftnode_fetch_extra bfe;
5202         bfe.create_for_full_read(ft_handle->ft);
5203         toku_pin_ftnode(
5204             ft_handle->ft,
5205             root_key,
5206             fullhash,
5207             &bfe,
5208             PL_READ, // may_modify_node set to false, node does not change
5209             &node,
5210             true
5211             );
5212     }
5213     bool r = is_empty_fast_iter(ft_handle, node);
5214     toku_unpin_ftnode(ft_handle->ft, node);
5215     return r;
5216 }
5217 
5218 // test-only
toku_ft_strerror_r(int error,char * buf,size_t buflen)5219 int toku_ft_strerror_r(int error, char *buf, size_t buflen)
5220 {
5221     if (error>=0) {
5222         return (long) strerror_r(error, buf, buflen);
5223     } else {
5224         switch (error) {
5225         case DB_KEYEXIST:
5226             snprintf(buf, buflen, "Key exists");
5227             return 0;
5228         case TOKUDB_CANCELED:
5229             snprintf(buf, buflen, "User canceled operation");
5230             return 0;
5231         default:
5232             snprintf(buf, buflen, "Unknown error %d", error);
5233             return EINVAL;
5234         }
5235     }
5236 }
5237 
toku_keycompare(const void * key1,uint32_t key1len,const void * key2,uint32_t key2len)5238 int toku_keycompare(const void *key1, uint32_t key1len, const void *key2, uint32_t key2len) {
5239     int comparelen = key1len < key2len ? key1len : key2len;
5240     int c = memcmp(key1, key2, comparelen);
5241     if (__builtin_expect(c != 0, 1)) {
5242         return c;
5243     } else {
5244         if (key1len < key2len) {
5245             return -1;
5246         } else if (key1len > key2len) {
5247             return 1;
5248         } else {
5249             return 0;
5250         }
5251     }
5252 }
5253 
toku_builtin_compare_fun(DB * db,const DBT * a,const DBT * b)5254 int toku_builtin_compare_fun(DB *db __attribute__((__unused__)), const DBT *a, const DBT*b) {
5255     return toku_keycompare(a->data, a->size, b->data, b->size);
5256 }
5257 
5258 #include <toku_race_tools.h>
5259 void __attribute__((__constructor__)) toku_ft_helgrind_ignore(void);
5260 void
toku_ft_helgrind_ignore(void)5261 toku_ft_helgrind_ignore(void) {
5262     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ft_status, sizeof ft_status);
5263 }
5264