1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 /* Verify an FT. */
40 /* Check:
41 * The tree is of uniform depth (and the height is correct at every node)
42 * For each pivot key: the max of the stuff to the left is <= the pivot key < the min of the stuff to the right.
43 * For each leaf node: All the keys are in strictly increasing order.
44 * For each nonleaf node: All the messages have keys that are between the associated pivot keys ( left_pivot_key < message <= right_pivot_key)
45 */
46
47 #include "ft/serialize/block_table.h"
48 #include "ft/ft.h"
49 #include "ft/ft-cachetable-wrappers.h"
50 #include "ft/ft-internal.h"
51 #include "ft/node.h"
52
53 static int
compare_pairs(FT_HANDLE ft_handle,const DBT * a,const DBT * b)54 compare_pairs (FT_HANDLE ft_handle, const DBT *a, const DBT *b) {
55 return ft_handle->ft->cmp(a, b);
56 }
57
58 static int
compare_pair_to_key(FT_HANDLE ft_handle,const DBT * a,const void * key,uint32_t keylen)59 compare_pair_to_key (FT_HANDLE ft_handle, const DBT *a, const void *key, uint32_t keylen) {
60 DBT y;
61 return ft_handle->ft->cmp(a, toku_fill_dbt(&y, key, keylen));
62 }
63
64 static int
65 verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot)
66 __attribute__((warn_unused_result));
67
UU()68 UU()
69 static int
70 verify_msg_in_child_buffer(FT_HANDLE ft_handle, enum ft_msg_type type, MSN msn, const void *key, uint32_t keylen, const void *UU(data), uint32_t UU(datalen), XIDS UU(xids), const DBT *lesser_pivot, const DBT *greatereq_pivot) {
71 int result = 0;
72 if (msn.msn == ZERO_MSN.msn)
73 result = EINVAL;
74 switch (type) {
75 default:
76 break;
77 case FT_INSERT:
78 case FT_INSERT_NO_OVERWRITE:
79 case FT_DELETE_ANY:
80 case FT_ABORT_ANY:
81 case FT_COMMIT_ANY:
82 // verify key in bounds
83 if (lesser_pivot) {
84 int compare = compare_pair_to_key(ft_handle, lesser_pivot, key, keylen);
85 if (compare >= 0)
86 result = EINVAL;
87 }
88 if (result == 0 && greatereq_pivot) {
89 int compare = compare_pair_to_key(ft_handle, greatereq_pivot, key, keylen);
90 if (compare < 0)
91 result = EINVAL;
92 }
93 break;
94 }
95 return result;
96 }
97
98 static DBT
get_ith_key_dbt(BASEMENTNODE bn,int i)99 get_ith_key_dbt (BASEMENTNODE bn, int i) {
100 DBT kdbt;
101 int r = bn->data_buffer.fetch_key_and_len(i, &kdbt.size, &kdbt.data);
102 invariant_zero(r); // this is a bad failure if it happens.
103 return kdbt;
104 }
105
106 #define VERIFY_ASSERTION(predicate, i, string) ({ \
107 if(!(predicate)) { \
108 fprintf(stderr, "%s:%d: Looking at child %d of block %" PRId64 ": %s\n", __FILE__, __LINE__, i, blocknum.b, string); \
109 result = TOKUDB_NEEDS_REPAIR; \
110 if (!keep_going_on_failure) goto done; \
111 }})
112
113 #define VERIFY_ASSERTION_BASEMENT(predicate, bn, entry, string) ({ \
114 if(!(predicate)) { \
115 fprintf(stderr, "%s:%d: Looking at block %" PRId64 " bn %d entry %d: %s\n", __FILE__, __LINE__, blocknum.b, bn, entry, string); \
116 result = TOKUDB_NEEDS_REPAIR; \
117 if (!keep_going_on_failure) goto done; \
118 }})
119
120 struct count_msgs_extra {
121 int count;
122 MSN msn;
123 message_buffer *msg_buffer;
124 };
125
126 // template-only function, but must be extern
127 int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
128 __attribute__((nonnull(3)));
count_msgs(const int32_t & offset,const uint32_t UU (idx),struct count_msgs_extra * const e)129 int count_msgs(const int32_t &offset, const uint32_t UU(idx), struct count_msgs_extra *const e)
130 {
131 MSN msn;
132 e->msg_buffer->get_message_key_msn(offset, nullptr, &msn);
133 if (msn.msn == e->msn.msn) {
134 e->count++;
135 }
136 return 0;
137 }
138
139 struct verify_message_tree_extra {
140 message_buffer *msg_buffer;
141 bool broadcast;
142 bool is_fresh;
143 int i;
144 int verbose;
145 BLOCKNUM blocknum;
146 int keep_going_on_failure;
147 bool messages_have_been_moved;
148 };
149
150 int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
verify_message_tree(const int32_t & offset,const uint32_t UU (idx),struct verify_message_tree_extra * const e)151 int verify_message_tree(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
152 {
153 BLOCKNUM blocknum = e->blocknum;
154 int keep_going_on_failure = e->keep_going_on_failure;
155 int result = 0;
156 DBT k, v;
157 ft_msg msg = e->msg_buffer->get_message(offset, &k, &v);
158 bool is_fresh = e->msg_buffer->get_freshness(offset);
159 if (e->broadcast) {
160 VERIFY_ASSERTION(ft_msg_type_applies_all((enum ft_msg_type) msg.type()) || ft_msg_type_does_nothing((enum ft_msg_type) msg.type()),
161 e->i, "message found in broadcast list that is not a broadcast");
162 } else {
163 VERIFY_ASSERTION(ft_msg_type_applies_once((enum ft_msg_type) msg.type()),
164 e->i, "message found in fresh or stale message tree that does not apply once");
165 if (e->is_fresh) {
166 if (e->messages_have_been_moved) {
167 VERIFY_ASSERTION(is_fresh,
168 e->i, "message found in fresh message tree that is not fresh");
169 }
170 } else {
171 VERIFY_ASSERTION(!is_fresh,
172 e->i, "message found in stale message tree that is fresh");
173 }
174 }
175 done:
176 return result;
177 }
178
179 int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e));
error_on_iter(const int32_t & UU (offset),const uint32_t UU (idx),void * UU (e))180 int error_on_iter(const int32_t &UU(offset), const uint32_t UU(idx), void *UU(e)) {
181 return TOKUDB_NEEDS_REPAIR;
182 }
183
184 int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e) __attribute__((nonnull(3)));
verify_marked_messages(const int32_t & offset,const uint32_t UU (idx),struct verify_message_tree_extra * const e)185 int verify_marked_messages(const int32_t &offset, const uint32_t UU(idx), struct verify_message_tree_extra *const e)
186 {
187 BLOCKNUM blocknum = e->blocknum;
188 int keep_going_on_failure = e->keep_going_on_failure;
189 int result = 0;
190 bool is_fresh = e->msg_buffer->get_freshness(offset);
191 VERIFY_ASSERTION(!is_fresh, e->i, "marked message found in the fresh message tree that is fresh");
192 done:
193 return result;
194 }
195
196 template<typename verify_omt_t>
197 static int
verify_sorted_by_key_msn(FT_HANDLE ft_handle,message_buffer * msg_buffer,const verify_omt_t & mt)198 verify_sorted_by_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const verify_omt_t &mt) {
199 int result = 0;
200 size_t last_offset = 0;
201 for (uint32_t i = 0; i < mt.size(); i++) {
202 int32_t offset;
203 int r = mt.fetch(i, &offset);
204 assert_zero(r);
205 if (i > 0) {
206 struct toku_msg_buffer_key_msn_cmp_extra extra(ft_handle->ft->cmp, msg_buffer);
207 if (toku_msg_buffer_key_msn_cmp(extra, last_offset, offset) >= 0) {
208 result = TOKUDB_NEEDS_REPAIR;
209 break;
210 }
211 }
212 last_offset = offset;
213 }
214 return result;
215 }
216
217 template<typename count_omt_t>
218 static int
count_eq_key_msn(FT_HANDLE ft_handle,message_buffer * msg_buffer,const count_omt_t & mt,const DBT * key,MSN msn)219 count_eq_key_msn(FT_HANDLE ft_handle, message_buffer *msg_buffer, const count_omt_t &mt, const DBT *key, MSN msn) {
220 struct toku_msg_buffer_key_msn_heaviside_extra extra(ft_handle->ft->cmp, msg_buffer, key, msn);
221 int r = mt.template find_zero<struct toku_msg_buffer_key_msn_heaviside_extra, toku_msg_buffer_key_msn_heaviside>(extra, nullptr, nullptr);
222 int count;
223 if (r == 0) {
224 count = 1;
225 } else {
226 assert(r == DB_NOTFOUND);
227 count = 0;
228 }
229 return count;
230 }
231
232 void
toku_get_node_for_verify(BLOCKNUM blocknum,FT_HANDLE ft_handle,FTNODE * nodep)233 toku_get_node_for_verify(
234 BLOCKNUM blocknum,
235 FT_HANDLE ft_handle,
236 FTNODE* nodep
237 )
238 {
239 uint32_t fullhash = toku_cachetable_hash(ft_handle->ft->cf, blocknum);
240 ftnode_fetch_extra bfe;
241 bfe.create_for_full_read(ft_handle->ft);
242 toku_pin_ftnode(
243 ft_handle->ft,
244 blocknum,
245 fullhash,
246 &bfe,
247 PL_WRITE_EXPENSIVE, // may_modify_node
248 nodep,
249 false
250 );
251 }
252
253 struct verify_msg_fn {
254 FT_HANDLE ft_handle;
255 NONLEAF_CHILDINFO bnc;
256 const DBT *curr_less_pivot;
257 const DBT *curr_geq_pivot;
258 BLOCKNUM blocknum;
259 MSN this_msn;
260 int verbose;
261 int keep_going_on_failure;
262 bool messages_have_been_moved;
263
264 MSN last_msn;
265 int msg_i;
266 int result = 0; // needed by VERIFY_ASSERTION
267
verify_msg_fnverify_msg_fn268 verify_msg_fn(FT_HANDLE handle, NONLEAF_CHILDINFO nl, const DBT *less, const DBT *geq,
269 BLOCKNUM b, MSN tmsn, int v, int k, bool m) :
270 ft_handle(handle), bnc(nl), curr_less_pivot(less), curr_geq_pivot(geq),
271 blocknum(b), this_msn(tmsn), verbose(v), keep_going_on_failure(k), messages_have_been_moved(m), last_msn(ZERO_MSN), msg_i(0) {
272 }
273
operator ()verify_msg_fn274 int operator()(const ft_msg &msg, bool is_fresh) {
275 enum ft_msg_type type = (enum ft_msg_type) msg.type();
276 MSN msn = msg.msn();
277 XIDS xid = msg.xids();
278 const void *key = msg.kdbt()->data;
279 const void *data = msg.vdbt()->data;
280 uint32_t keylen = msg.kdbt()->size;
281 uint32_t datalen = msg.vdbt()->size;
282
283 int r = verify_msg_in_child_buffer(ft_handle, type, msn, key, keylen, data, datalen, xid,
284 curr_less_pivot,
285 curr_geq_pivot);
286 VERIFY_ASSERTION(r == 0, msg_i, "A message in the buffer is out of place");
287 VERIFY_ASSERTION((msn.msn > last_msn.msn), msg_i, "msn per msg must be monotonically increasing toward newer messages in buffer");
288 VERIFY_ASSERTION((msn.msn <= this_msn.msn), msg_i, "all messages must have msn within limit of this node's max_msn_applied_to_node_in_memory");
289 if (ft_msg_type_applies_once(type)) {
290 int count;
291 DBT keydbt;
292 toku_fill_dbt(&keydbt, key, keylen);
293 int total_count = 0;
294 count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree, toku_fill_dbt(&keydbt, key, keylen), msn);
295 total_count += count;
296 if (is_fresh) {
297 VERIFY_ASSERTION(count == 1, msg_i, "a fresh message was not found in the fresh message tree");
298 } else if (messages_have_been_moved) {
299 VERIFY_ASSERTION(count == 0, msg_i, "a stale message was found in the fresh message tree");
300 }
301 VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the fresh message tree");
302 count = count_eq_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree, &keydbt, msn);
303
304 total_count += count;
305 if (is_fresh) {
306 VERIFY_ASSERTION(count == 0, msg_i, "a fresh message was found in the stale message tree");
307 } else if (messages_have_been_moved) {
308 VERIFY_ASSERTION(count == 1, msg_i, "a stale message was not found in the stale message tree");
309 }
310 VERIFY_ASSERTION(count <= 1, msg_i, "a message was found multiple times in the stale message tree");
311
312 VERIFY_ASSERTION(total_count <= 1, msg_i, "a message was found in both message trees (or more than once in a single tree)");
313 VERIFY_ASSERTION(total_count >= 1, msg_i, "a message was not found in either message tree");
314 } else {
315 VERIFY_ASSERTION(ft_msg_type_applies_all(type) || ft_msg_type_does_nothing(type), msg_i, "a message was found that does not apply either to all or to only one key");
316 struct count_msgs_extra extra = { .count = 0, .msn = msn, .msg_buffer = &bnc->msg_buffer };
317 bnc->broadcast_list.iterate<struct count_msgs_extra, count_msgs>(&extra);
318 VERIFY_ASSERTION(extra.count == 1, msg_i, "a broadcast message was not found in the broadcast list");
319 }
320 last_msn = msn;
321 msg_i++;
322 done:
323 return result;
324 }
325 };
326
327 static int
toku_verify_ftnode_internal(FT_HANDLE ft_handle,MSN rootmsn,MSN parentmsn_with_messages,bool messages_exist_above,FTNODE node,int height,const DBT * lesser_pivot,const DBT * greatereq_pivot,int verbose,int keep_going_on_failure,bool messages_have_been_moved)328 toku_verify_ftnode_internal(FT_HANDLE ft_handle,
329 MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
330 FTNODE node, int height,
331 const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
332 const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
333 int verbose, int keep_going_on_failure, bool messages_have_been_moved)
334 {
335 int result=0;
336 MSN this_msn;
337 BLOCKNUM blocknum = node->blocknum;
338
339 //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
340 toku_ftnode_assert_fully_in_memory(node);
341 this_msn = node->max_msn_applied_to_node_on_disk;
342
343 if (height >= 0) {
344 invariant(height == node->height); // this is a bad failure if wrong
345 }
346 if (node->height > 0 && messages_exist_above) {
347 VERIFY_ASSERTION((parentmsn_with_messages.msn >= this_msn.msn), 0, "node msn must be descending down tree, newest messages at top");
348 }
349 // Verify that all the pivot keys are in order.
350 for (int i = 0; i < node->n_children-2; i++) {
351 DBT x, y;
352 int compare = compare_pairs(ft_handle, node->pivotkeys.fill_pivot(i, &x), node->pivotkeys.fill_pivot(i + 1, &y));
353 VERIFY_ASSERTION(compare < 0, i, "Value is >= the next value");
354 }
355 // Verify that all the pivot keys are lesser_pivot < pivot <= greatereq_pivot
356 for (int i = 0; i < node->n_children-1; i++) {
357 DBT x;
358 if (lesser_pivot) {
359 int compare = compare_pairs(ft_handle, lesser_pivot, node->pivotkeys.fill_pivot(i, &x));
360 VERIFY_ASSERTION(compare < 0, i, "Pivot is >= the lower-bound pivot");
361 }
362 if (greatereq_pivot) {
363 int compare = compare_pairs(ft_handle, greatereq_pivot, node->pivotkeys.fill_pivot(i, &x));
364 VERIFY_ASSERTION(compare >= 0, i, "Pivot is < the upper-bound pivot");
365 }
366 }
367
368 for (int i = 0; i < node->n_children; i++) {
369 DBT x, y;
370 const DBT *curr_less_pivot = (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x);
371 const DBT *curr_geq_pivot = (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y);
372 if (node->height > 0) {
373 NONLEAF_CHILDINFO bnc = BNC(node, i);
374 // Verify that messages in the buffers are in the right place.
375 VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->fresh_message_tree) == 0, i, "fresh_message_tree");
376 VERIFY_ASSERTION(verify_sorted_by_key_msn(ft_handle, &bnc->msg_buffer, bnc->stale_message_tree) == 0, i, "stale_message_tree");
377
378 verify_msg_fn verify_msg(ft_handle, bnc, curr_less_pivot, curr_geq_pivot,
379 blocknum, this_msn, verbose, keep_going_on_failure, messages_have_been_moved);
380 int r = bnc->msg_buffer.iterate(verify_msg);
381 if (r != 0) { result = r; goto done; }
382
383 struct verify_message_tree_extra extra = { .msg_buffer = &bnc->msg_buffer, .broadcast = false, .is_fresh = true, .i = i, .verbose = verbose, .blocknum = node->blocknum, .keep_going_on_failure = keep_going_on_failure, .messages_have_been_moved = messages_have_been_moved };
384 r = bnc->fresh_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
385 if (r != 0) { result = r; goto done; }
386 extra.is_fresh = false;
387 r = bnc->stale_message_tree.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
388 if (r != 0) { result = r; goto done; }
389
390 bnc->fresh_message_tree.verify_marks_consistent();
391 if (messages_have_been_moved) {
392 VERIFY_ASSERTION(!bnc->fresh_message_tree.has_marks(), i, "fresh message tree still has marks after moving messages");
393 r = bnc->fresh_message_tree.iterate_over_marked<void, error_on_iter>(nullptr);
394 if (r != 0) { result = r; goto done; }
395 }
396 else {
397 r = bnc->fresh_message_tree.iterate_over_marked<struct verify_message_tree_extra, verify_marked_messages>(&extra);
398 if (r != 0) { result = r; goto done; }
399 }
400
401 extra.broadcast = true;
402 r = bnc->broadcast_list.iterate<struct verify_message_tree_extra, verify_message_tree>(&extra);
403 if (r != 0) { result = r; goto done; }
404 }
405 else {
406 BASEMENTNODE bn = BLB(node, i);
407 for (uint32_t j = 0; j < bn->data_buffer.num_klpairs(); j++) {
408 VERIFY_ASSERTION((rootmsn.msn >= this_msn.msn), 0, "leaf may have latest msn, but cannot be greater than root msn");
409 DBT kdbt = get_ith_key_dbt(bn, j);
410 if (curr_less_pivot) {
411 int compare = compare_pairs(ft_handle, curr_less_pivot, &kdbt);
412 VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "The leafentry is >= the lower-bound pivot");
413 }
414 if (curr_geq_pivot) {
415 int compare = compare_pairs(ft_handle, curr_geq_pivot, &kdbt);
416 VERIFY_ASSERTION_BASEMENT(compare >= 0, i, j, "The leafentry is < the upper-bound pivot");
417 }
418 if (0 < j) {
419 DBT prev_key_dbt = get_ith_key_dbt(bn, j-1);
420 int compare = compare_pairs(ft_handle, &prev_key_dbt, &kdbt);
421 VERIFY_ASSERTION_BASEMENT(compare < 0, i, j, "Adjacent leafentries are out of order");
422 }
423 }
424 }
425 }
426
427 done:
428 return result;
429 }
430
431
432 // input is a pinned node, on exit, node is unpinned
433 int
toku_verify_ftnode(FT_HANDLE ft_handle,MSN rootmsn,MSN parentmsn_with_messages,bool messages_exist_above,FTNODE node,int height,const DBT * lesser_pivot,const DBT * greatereq_pivot,int (* progress_callback)(void * extra,float progress),void * progress_extra,int recurse,int verbose,int keep_going_on_failure)434 toku_verify_ftnode (FT_HANDLE ft_handle,
435 MSN rootmsn, MSN parentmsn_with_messages, bool messages_exist_above,
436 FTNODE node, int height,
437 const DBT *lesser_pivot, // Everything in the subtree should be > lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
438 const DBT *greatereq_pivot, // Everything in the subtree should be <= lesser_pivot. (lesser_pivot==NULL if there is no lesser pivot.)
439 int (*progress_callback)(void *extra, float progress), void *progress_extra,
440 int recurse, int verbose, int keep_going_on_failure)
441 {
442 MSN this_msn;
443
444 //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
445 toku_ftnode_assert_fully_in_memory(node);
446 this_msn = node->max_msn_applied_to_node_on_disk;
447
448 int result = 0;
449 int result2 = 0;
450 if (node->height > 0) {
451 // Otherwise we'll just do the next call
452
453 result = toku_verify_ftnode_internal(
454 ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
455 verbose, keep_going_on_failure, false);
456 if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
457 }
458 if (node->height > 0) {
459 toku_move_ftnode_messages_to_stale(ft_handle->ft, node);
460 }
461 result2 = toku_verify_ftnode_internal(
462 ft_handle, rootmsn, parentmsn_with_messages, messages_exist_above, node, height, lesser_pivot, greatereq_pivot,
463 verbose, keep_going_on_failure, true);
464 if (result == 0) {
465 result = result2;
466 if (result != 0 && (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR)) goto done;
467 }
468
469 // Verify that the subtrees have the right properties.
470 if (recurse && node->height > 0) {
471 for (int i = 0; i < node->n_children; i++) {
472 FTNODE child_node;
473 toku_get_node_for_verify(BP_BLOCKNUM(node, i), ft_handle, &child_node);
474 DBT x, y;
475 int r = toku_verify_ftnode(ft_handle, rootmsn,
476 (toku_bnc_n_entries(BNC(node, i)) > 0
477 ? this_msn
478 : parentmsn_with_messages),
479 messages_exist_above || toku_bnc_n_entries(BNC(node, i)) > 0,
480 child_node, node->height-1,
481 (i==0) ? lesser_pivot : node->pivotkeys.fill_pivot(i - 1, &x),
482 (i==node->n_children-1) ? greatereq_pivot : node->pivotkeys.fill_pivot(i, &y),
483 progress_callback, progress_extra,
484 recurse, verbose, keep_going_on_failure);
485 if (r) {
486 result = r;
487 if (!keep_going_on_failure || result != TOKUDB_NEEDS_REPAIR) goto done;
488 }
489 }
490 }
491 done:
492 toku_unpin_ftnode(ft_handle->ft, node);
493
494 if (result == 0 && progress_callback)
495 result = progress_callback(progress_extra, 0.0);
496
497 return result;
498 }
499
500 int
toku_verify_ft_with_progress(FT_HANDLE ft_handle,int (* progress_callback)(void * extra,float progress),void * progress_extra,int verbose,int keep_on_going)501 toku_verify_ft_with_progress (FT_HANDLE ft_handle, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_on_going) {
502 assert(ft_handle->ft);
503 FTNODE root_node = NULL;
504 {
505 uint32_t root_hash;
506 CACHEKEY root_key;
507 toku_calculate_root_offset_pointer(ft_handle->ft, &root_key, &root_hash);
508 toku_get_node_for_verify(root_key, ft_handle, &root_node);
509 }
510 int r = toku_verify_ftnode(ft_handle, ft_handle->ft->h->max_msn_in_ft, ft_handle->ft->h->max_msn_in_ft, false, root_node, -1, NULL, NULL, progress_callback, progress_extra, 1, verbose, keep_on_going);
511 if (r == 0) {
512 toku_ft_lock(ft_handle->ft);
513 ft_handle->ft->h->time_of_last_verification = time(NULL);
514 ft_handle->ft->h->set_dirty();
515 toku_ft_unlock(ft_handle->ft);
516 }
517 return r;
518 }
519
520 int
toku_verify_ft(FT_HANDLE ft_handle)521 toku_verify_ft (FT_HANDLE ft_handle) {
522 return toku_verify_ft_with_progress(ft_handle, NULL, NULL, 0, 0);
523 }
524