1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /* Verify an FT. */
40 /* Check:
41  *   The tree is of uniform depth (and the height is correct at every node)
42  *   For each pivot key:  the max of the stuff to the left is <= the pivot key < the min of the stuff to the right.
43  *   For each leaf node:  All the keys are in strictly increasing order.
44  *   For each nonleaf node:  All the messages have keys that are between the associated pivot keys ( left_pivot_key < message <= right_pivot_key)
45  */
46 
47 #include "ft/serialize/block_table.h"
48 #include "ft/ft.h"
49 #include "ft/ft-cachetable-wrappers.h"
50 #include "ft/ft-internal.h"
51 #include "ft/node.h"
52 
53 static int
compare_pairs(FT_HANDLE ft_handle,const DBT * a,const DBT * b)54 compare_pairs (FT_HANDLE ft_handle, const DBT *a, const DBT *b) {
55     return ft_handle->ft->cmp(a, b);
56 }
57 
58 static int
compare_pair_to_key(FT_HANDLE ft_handle,const DBT * a,const void * key,uint32_t keylen)59 compare_pair_to_key (FT_HANDLE ft_handle, const DBT *a, const void *key, uint32_t keylen) {
60     DBT y;
61     return ft_handle->ft->cmp(a, toku_fill_dbt(&y, key, keylen));
62 }
63 
64 static int
65 verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot)
66     __attribute__((warn_unused_result));
67 
UU()68 UU()
69 static int
70 verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) {
71     int result = 0;
72     if (msn.msn == ZERO_MSN.msn)
73         result = EINVAL;
74     switch (type) {
75     default:
76         break;
77     case FT_INSERT:
78     case FT_INSERT_NO_OVERWRITE:
79     case FT_DELETE_ANY:
80     case FT_ABORT_ANY:
81     case FT_COMMIT_ANY:
82         // verify key in bounds
83         if (lesser_pivot) {
84             int compare = compare_pair_to_key(ft_handle, lesser_pivot, key, keylen);
85             if (compare >= 0)
86                 result = EINVAL;
87         }
88         if (result == 0 && greatereq_pivot) {
89             int compare = compare_pair_to_key(ft_handle, greatereq_pivot, key, keylen);
90             if (compare < 0)
91                 result = EINVAL;
92         }
93         break;
94     }
95     return result;
96 }
97 
98 static DBT
get_ith_key_dbt(BASEMENTNODE bn,int i)99 get_ith_key_dbt (BASEMENTNODE bn, int i) {
100     DBT kdbt;
101     int r = bn->data_buffer.fetch_key_and_len(i, &kdbt.size, &kdbt.data);
102     invariant_zero(r); // this is a bad failure if it happens.
103     return kdbt;
104 }
105 
106 #define VERIFY_ASSERTION(predicate, i, string) ({                                                                              \
107     if(!(predicate)) {                                                                                                         \
108         fprintf(stderr, "%s:%d: Looking at child %d of block %" PRId64 ": %s\n", __FILE__, __LINE__, i, blocknum.b, string);   \
109         result = TOKUDB_NEEDS_REPAIR;                                                                                          \
110         if (!keep_going_on_failure) goto done;                                                                                 \
111     }})
112 
113 #define VERIFY_ASSERTION_BASEMENT(predicate, bn, entry, string) ({           \
114     if(!(predicate)) {                                                                                                         \
115         fprintf(stderr, "%s:%d: Looking at block %" PRId64 " bn %d entry %d: %s\n", __FILE__, __LINE__, blocknum.b, bn, entry, string); \
116         result = TOKUDB_NEEDS_REPAIR;                                                                                          \
117         if (!keep_going_on_failure) goto done;                                                                                 \
118     }})
119 
120 struct count_msgs_extra {
121     int count;
122     MSN msn;
123     message_buffer *msg_buffer;
124 };
125 
126 // template-only function, but must be extern
127 int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
128     __attribute__((nonnull(3)));
count_msgs(const int32_t & offset,const uint32_t UU (idx),struct count_msgs_extra * const e)129 int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
130 {
131     MSN msn;
132     e->msg_buffer->get_message_key_msn(offset, nullptr, &msn);
133     if (msn.msn == e->msn.msn) {
134         e->count++;
135     }
136     return 0;
137 }
138 
139 struct verify_message_tree_extra {
140     message_buffer *msg_buffer;
141     bool broadcast;
142     bool is_fresh;
143     int i;
144     int verbose;
145     BLOCKNUM blocknum;
146     int keep_going_on_failure;
147     bool messages_have_been_moved;
148 };
149 
150 int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
verify_message_tree(const int32_t & offset,const uint32_t UU (idx),struct verify_message_tree_extra * const e)151 int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
152 {
153     BLOCKNUM blocknum = e->blocknum;
154     int keep_going_on_failure = e->keep_going_on_failure;
155     int result = 0;
156     DBT k, v;
157     ft_msg msg = e->msg_buffer->get_message(offset, &k, &v);
158     bool is_fresh = e->msg_buffer->get_freshness(offset);
159     if (e->broadcast) {
160         VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type()) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type()),
161                          e->i, "message found in broadcast list that is not a broadcast");
162     } else {
163         VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type()),
164                          e->i, "message found in fresh or stale message tree that does not apply once");
165         if (e->is_fresh) {
166             if (e->messages_have_been_moved) {
167                 VERIFY_ASSERTION(is_fresh,
168                                  e->i, "message found in fresh message tree that is not fresh");
169             }
170         } else {
171             VERIFY_ASSERTION(!is_fresh,
172                              e->i, "message found in stale message tree that is fresh");
173         }
174     }
175 done:
176     return result;
177 }
178 
179 int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e));
error_on_iter(const int32_t & UU (offset),const uint32_t UU (idx),void * UU (e))180 int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e)) {
181     return TOKUDB_NEEDS_REPAIR;
182 }
183 
184 int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
verify_marked_messages(const int32_t & offset,const uint32_t UU (idx),struct verify_message_tree_extra * const e)185 int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
186 {
187     BLOCKNUM blocknum = e->blocknum;
188     int keep_going_on_failure = e->keep_going_on_failure;
189     int result = 0;
190     bool is_fresh = e->msg_buffer->get_freshness(offset);
191     VERIFY_ASSERTION(!is_fresh, e->i, "marked message found in the fresh message tree that is fresh");
192  done:
193     return result;
194 }
195 
196 template<typename verify_omt_t>
197 static int
verify_sorted_by_key_msn(FT_HANDLE ft_handle,message_buffer * msg_buffer,const verify_omt_t & mt)198 verify_sorted_by_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const verify_omt_t &mt) {
199     int result = 0;
200     size_t last_offset = 0;
201     for (uint32_t i = 0; i < mt.size(); i++) {
202         int32_t offset;
203         int r = mt.fetch(i, &offset);
204         assert_zero(r);
205         if (i > 0) {
206             struct toku_msg_buffer_key_msn_cmp_extra extra(ft_handle->ft->cmp, msg_buffer);
207             if (toku_msg_buffer_key_msn_cmp(extra, last_offset, offset) >= 0) {
208                 result = TOKUDB_NEEDS_REPAIR;
209                 break;
210             }
211         }
212         last_offset = offset;
213     }
214     return result;
215 }
216 
217 template<typename count_omt_t>
218 static int
count_eq_key_msn(FT_HANDLE ft_handle,message_buffer * msg_buffer,const count_omt_t & mt,const DBT * key,MSN msn)219 count_eq_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const count_omt_t &mt, const DBT *key, MSN msn) {
220     struct toku_msg_buffer_key_msn_heaviside_extra extra(ft_handle->ft->cmp, msg_buffer, key, msn);
221     int r = mt.template find_zero<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(extra, nullptr, nullptr);
222     int count;
223     if (r == 0) {
224         count = 1;
225     } else {
226         assert(r == DB_NOTFOUND);
227         count = 0;
228     }
229     return count;
230 }
231 
232 void
toku_get_node_for_verify(BLOCKNUM blocknum,FT_HANDLE ft_handle,FTNODE * nodep)233 toku_get_node_for_verify(
234     BLOCKNUM blocknum,
235     FT_HANDLE ft_handle,
236     FTNODE* nodep
237     )
238 {
239     uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum);
240     ftnode_fetch_extra bfe;
241     bfe.create_for_full_read(ft_handle->ft);
242     toku_pin_ftnode(
243         ft_handle->ft,
244         blocknum,
245         fullhash,
246         &bfe,
247         PL_WRITE_EXPENSIVE, // may_modify_node
248         nodep,
249         false
250         );
251 }
252 
253 struct verify_msg_fn {
254     FT_HANDLE ft_handle;
255     NONLEAF_CHILDINFO bnc;
256     const DBT *curr_less_pivot;
257     const DBT *curr_geq_pivot;
258     BLOCKNUM blocknum;
259     MSN this_msn;
260     int verbose;
261     int keep_going_on_failure;
262     bool messages_have_been_moved;
263 
264     MSN last_msn;
265     int msg_i;
266     int result = 0; // needed by VERIFY_ASSERTION
267 
verify_msg_fnverify_msg_fn268     verify_msg_fn(FT_HANDLE handle, NONLEAF_CHILDINFO nl, const DBT *less, const DBT *geq,
269                   BLOCKNUM b, MSN tmsn, int v, int k, bool m) :
270         ft_handle(handle), bnc(nl), curr_less_pivot(less), curr_geq_pivot(geq),
271         blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) {
272     }
273 
operator ()verify_msg_fn274     int operator()(const ft_msg &msg, bool is_fresh) {
275         enum ft_msg_type type = (enum ft_msg_type) msg.type();
276         MSN msn = msg.msn();
277         XIDS xid = msg.xids();
278         const void *key = msg.kdbt()->data;
279         const void *data = msg.vdbt()->data;
280         uint32_t keylen = msg.kdbt()->size;
281         uint32_t datalen = msg.vdbt()->size;
282 
283         int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid,
284                                            curr_less_pivot,
285                                            curr_geq_pivot);
286         VERIFY_ASSERTION(r == 0, msg_i, "A message in the buffer is out of place");
287         VERIFY_ASSERTION((msn.msn > last_msn.msn), msg_i, "msn per msg must be monotonically increasing toward newer messages in buffer");
288         VERIFY_ASSERTION((msn.msn <= this_msn.msn), msg_i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
289         if (ft_msg_type_applies_once(type)) {
290             int count;
291             DBT keydbt;
292             toku_fill_dbt(&keydbt, key, keylen);
293             int total_count = 0;
294             count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
295             total_count += count;
296             if (is_fresh) {
297                 VERIFY_ASSERTION(count == 1, msg_i, "a fresh message was not found in the fresh message tree");
298             } else if (messages_have_been_moved) {
299                 VERIFY_ASSERTION(count == 0, msg_i, "a stale message was found in the fresh message tree");
300             }
301             VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the fresh message tree");
302             count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree, &keydbt, msn);
303 
304             total_count += count;
305             if (is_fresh) {
306                 VERIFY_ASSERTION(count == 0, msg_i, "a fresh message was found in the stale message tree");
307             } else if (messages_have_been_moved) {
308                 VERIFY_ASSERTION(count == 1, msg_i, "a stale message was not found in the stale message tree");
309             }
310             VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the stale message tree");
311 
312             VERIFY_ASSERTION(total_count <= 1, msg_i, "a message was found in both message trees (or more than once in a single tree)");
313             VERIFY_ASSERTION(total_count >= 1, msg_i, "a message was not found in either message tree");
314         } else {
315             VERIFY_ASSERTION(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type), msg_i, "a message was found that does not apply either to all or to only one key");
316             struct count_msgs_extra extra = { .count = 0, .msn = msn, .msg_buffer = &bnc->msg_buffer };
317             bnc->broadcast_list.iterate<struct count_msgs_extra, count_msgs>(&extra);
318             VERIFY_ASSERTION(extra.count == 1, msg_i, "a broadcast message was not found in the broadcast list");
319         }
320         last_msn = msn;
321         msg_i++;
322 done:
323         return result;
324     }
325 };
326 
327 static int
toku_verify_ftnode_internal(FT_HANDLE ft_handle,MSN rootmsn,MSN parentmsn_with_messages,bool messages_exist_above,FTNODE node,int height,const DBT * lesser_pivot,const DBT * greatereq_pivot,int verbose,int keep_going_on_failure,bool messages_have_been_moved)328 toku_verify_ftnode_internal(FT_HANDLE ft_handle,
329                             MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
330                             FTNODE node, int height,
331                             const DBT *lesser_pivot,               // Everything in the subtree should be > lesser_pivot.  (lesser_pivot==NULL if there is no lesser pivot.)
332                             const DBT *greatereq_pivot,            // Everything in the subtree should be <= lesser_pivot.  (lesser_pivot==NULL if there is no lesser pivot.)
333                             int verbose, int keep_going_on_failure, bool messages_have_been_moved)
334 {
335     int result=0;
336     MSN   this_msn;
337     BLOCKNUM blocknum = node->blocknum;
338 
339     //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
340     toku_ftnode_assert_fully_in_memory(node);
341     this_msn = node->max_msn_applied_to_node_on_disk;
342 
343     if (height >= 0) {
344         invariant(height == node->height);   // this is a bad failure if wrong
345     }
346     if (node->height > 0 && messages_exist_above) {
347         VERIFY_ASSERTION((parentmsn_with_messages.msn >= this_msn.msn), 0, "node msn must be descending down tree, newest messages at top");
348     }
349     // Verify that all the pivot keys are in order.
350     for (int i = 0; i < node->n_children-2; i++) {
351         DBT x, y;
352         int compare = compare_pairs(ft_handle, node->pivotkeys.fill_pivot(i, &x), node->pivotkeys.fill_pivot(i + 1, &y));
353         VERIFY_ASSERTION(compare < 0, i, "Value is >= the next value");
354     }
355     // Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
356     for (int i = 0; i < node->n_children-1; i++) {
357         DBT x;
358         if (lesser_pivot) {
359             int compare = compare_pairs(ft_handle, lesser_pivot, node->pivotkeys.fill_pivot(i, &x));
360             VERIFY_ASSERTION(compare < 0, i, "Pivot is >= the lower-bound pivot");
361         }
362         if (greatereq_pivot) {
363             int compare = compare_pairs(ft_handle, greatereq_pivot, node->pivotkeys.fill_pivot(i, &x));
364             VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
365         }
366     }
367 
368     for (int i = 0; i < node->n_children; i++) {
369         DBT x, y;
370         const DBT *curr_less_pivot = (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x);
371         const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y);
372         if (node->height > 0) {
373             NONLEAF_CHILDINFO bnc = BNC(node, i);
374             // Verify that messages in the buffers are in the right place.
375             VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree) == 0, i, "fresh_message_tree");
376             VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree) == 0, i, "stale_message_tree");
377 
378             verify_msg_fn verify_msg(ft_handle, bnc, curr_less_pivot, curr_geq_pivot,
379                                      blocknum, this_msn, verbose, keep_going_on_failure, messages_have_been_moved);
380             int r = bnc->msg_buffer.iterate(verify_msg);
381             if (r != 0) { result = r; goto done; }
382 
383             struct verify_message_tree_extra extra = { .msg_buffer = &bnc->msg_buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->blocknum, .keep_going_on_failure = keep_going_on_failure, .messages_have_been_moved = messages_have_been_moved };
384             r = bnc->fresh_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
385             if (r != 0) { result = r; goto done; }
386             extra.is_fresh = false;
387             r = bnc->stale_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
388             if (r != 0) { result = r; goto done; }
389 
390             bnc->fresh_message_tree.verify_marks_consistent();
391             if (messages_have_been_moved) {
392                 VERIFY_ASSERTION(!bnc->fresh_message_tree.has_marks(), i, "fresh message tree still has marks after moving messages");
393                 r = bnc->fresh_message_tree.iterate_over_marked<void, error_on_iter>(nullptr);
394                 if (r != 0) { result = r; goto done; }
395             }
396             else {
397                 r = bnc->fresh_message_tree.iterate_over_marked<struct verify_message_tree_extra, verify_marked_messages>(&extra);
398                 if (r != 0) { result = r; goto done; }
399             }
400 
401             extra.broadcast = true;
402             r = bnc->broadcast_list.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
403             if (r != 0) { result = r; goto done; }
404         }
405         else {
406             BASEMENTNODE bn = BLB(node, i);
407             for (uint32_t j = 0; j < bn->data_buffer.num_klpairs(); j++) {
408                 VERIFY_ASSERTION((rootmsn.msn >= this_msn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn");
409                 DBT kdbt = get_ith_key_dbt(bn, j);
410                 if (curr_less_pivot) {
411                     int compare = compare_pairs(ft_handle, curr_less_pivot, &kdbt);
412                     VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "The leafentry is >= the lower-bound pivot");
413                 }
414                 if (curr_geq_pivot) {
415                     int compare = compare_pairs(ft_handle, curr_geq_pivot, &kdbt);
416                     VERIFY_ASSERTION_BASEMENT(compare >= 0, i, j, "The leafentry is < the upper-bound pivot");
417                 }
418                 if (0 < j) {
419                     DBT prev_key_dbt = get_ith_key_dbt(bn, j-1);
420                     int compare = compare_pairs(ft_handle, &prev_key_dbt, &kdbt);
421                     VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "Adjacent leafentries are out of order");
422                 }
423             }
424         }
425     }
426 
427 done:
428     return result;
429 }
430 
431 
432 // input is a pinned node, on exit, node is unpinned
433 int
toku_verify_ftnode(FT_HANDLE ft_handle,MSN rootmsn,MSN parentmsn_with_messages,bool messages_exist_above,FTNODE node,int height,const DBT * lesser_pivot,const DBT * greatereq_pivot,int (* progress_callback)(void * extra,float progress),void * progress_extra,int recurse,int verbose,int keep_going_on_failure)434 toku_verify_ftnode (FT_HANDLE ft_handle,
435                     MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
436                      FTNODE node, int height,
437                      const DBT *lesser_pivot,               // Everything in the subtree should be > lesser_pivot.  (lesser_pivot==NULL if there is no lesser pivot.)
438                      const DBT *greatereq_pivot,            // Everything in the subtree should be <= lesser_pivot.  (lesser_pivot==NULL if there is no lesser pivot.)
439                      int (*progress_callback)(void *extra, float progress), void *progress_extra,
440                      int recurse, int verbose, int keep_going_on_failure)
441 {
442     MSN   this_msn;
443 
444     //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
445     toku_ftnode_assert_fully_in_memory(node);
446     this_msn = node->max_msn_applied_to_node_on_disk;
447 
448     int result = 0;
449     int result2 = 0;
450     if (node->height > 0) {
451         // Otherwise we'll just do the next call
452 
453         result = toku_verify_ftnode_internal(
454                 ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
455                 verbose, keep_going_on_failure, false);
456         if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
457     }
458     if (node->height > 0) {
459         toku_move_ftnode_messages_to_stale(ft_handle->ft, node);
460     }
461     result2 = toku_verify_ftnode_internal(
462             ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
463             verbose, keep_going_on_failure, true);
464     if (result == 0) {
465         result = result2;
466         if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
467     }
468 
469     // Verify that the subtrees have the right properties.
470     if (recurse && node->height > 0) {
471         for (int i = 0; i < node->n_children; i++) {
472             FTNODE child_node;
473             toku_get_node_for_verify(BP_BLOCKNUM(node, i), ft_handle, &child_node);
474             DBT x, y;
475             int r = toku_verify_ftnode(ft_handle, rootmsn,
476                                        (toku_bnc_n_entries(BNC(node, i)) > 0
477                                         ? this_msn
478                                         : parentmsn_with_messages),
479                                        messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0,
480                                        child_node, node->height-1,
481                                        (i==0)                  ? lesser_pivot        : node->pivotkeys.fill_pivot(i - 1, &x),
482                                        (i==node->n_children-1) ? greatereq_pivot     : node->pivotkeys.fill_pivot(i, &y),
483                                        progress_callback, progress_extra,
484                                        recurse, verbose, keep_going_on_failure);
485             if (r) {
486                 result = r;
487                 if (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR) goto done;
488             }
489         }
490     }
491 done:
492     toku_unpin_ftnode(ft_handle->ft, node);
493 
494     if (result == 0 && progress_callback)
495     result = progress_callback(progress_extra, 0.0);
496 
497     return result;
498 }
499 
500 int
toku_verify_ft_with_progress(FT_HANDLE ft_handle,int (* progress_callback)(void * extra,float progress),void * progress_extra,int verbose,int keep_on_going)501 toku_verify_ft_with_progress (FT_HANDLE ft_handle, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_on_going) {
502     assert(ft_handle->ft);
503     FTNODE root_node = NULL;
504     {
505         uint32_t root_hash;
506         CACHEKEY root_key;
507         toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &root_hash);
508         toku_get_node_for_verify(root_key, ft_handle, &root_node);
509     }
510     int r = toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
511     if (r == 0) {
512         toku_ft_lock(ft_handle->ft);
513         ft_handle->ft->h->time_of_last_verification = time(NULL);
514         ft_handle->ft->h->set_dirty();
515         toku_ft_unlock(ft_handle->ft);
516     }
517     return r;
518 }
519 
520 int
toku_verify_ft(FT_HANDLE ft_handle)521 toku_verify_ft (FT_HANDLE ft_handle) {
522     return toku_verify_ft_with_progress(ft_handle, NULL, NULL, 0, 0);
523 }
524