1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: 3 #ident "$Id$" 4 /*====== 5 This file is part of PerconaFT. 6 7 8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. 9 10 PerconaFT is free software: you can redistribute it and/or modify 11 it under the terms of the GNU General Public License, version 2, 12 as published by the Free Software Foundation. 13 14 PerconaFT is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 21 22 ---------------------------------------- 23 24 PerconaFT is free software: you can redistribute it and/or modify 25 it under the terms of the GNU Affero General Public License, version 3, 26 as published by the Free Software Foundation. 27 28 PerconaFT is distributed in the hope that it will be useful, 29 but WITHOUT ANY WARRANTY; without even the implied warranty of 30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 31 GNU Affero General Public License for more details. 32 33 You should have received a copy of the GNU Affero General Public License 34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. 35 ======= */ 36 37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." 38 39 /* 40 * The indexer 41 */ 42 #include <stdio.h> 43 #include <string.h> 44 #include <toku_portability.h> 45 #include "toku_assert.h" 46 #include "ydb-internal.h" 47 #include <ft/le-cursor.h> 48 #include "indexer.h" 49 #include <ft/ft-ops.h> 50 #include <ft/leafentry.h> 51 #include <ft/ule.h> 52 #include <ft/txn/xids.h> 53 #include <ft/logger/log-internal.h> 54 #include <ft/cachetable/checkpoint.h> 55 #include <portability/toku_atomic.h> 56 #include "loader.h" 57 #include <util/status.h> 58 59 /////////////////////////////////////////////////////////////////////////////////// 60 // Engine status 61 // 62 // Status is intended for display to humans to help understand system behavior. 63 // It does not need to be perfectly thread-safe. 64 65 static INDEXER_STATUS_S indexer_status; 66 67 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(indexer_status, k, c, t, "indexer: " l, inc) 68 69 static void 70 status_init(void) { 71 // Note, this function initializes the keyname, type, and legend fields. 72 // Value fields are initialized to zero by compiler. 73 STATUS_INIT(INDEXER_CREATE, nullptr, UINT64, "number of indexers successfully created", TOKU_ENGINE_STATUS); 74 STATUS_INIT(INDEXER_CREATE_FAIL, nullptr, UINT64, "number of calls to toku_indexer_create_indexer() that failed", TOKU_ENGINE_STATUS); 75 STATUS_INIT(INDEXER_BUILD, nullptr, UINT64, "number of calls to indexer->build() succeeded", TOKU_ENGINE_STATUS); 76 STATUS_INIT(INDEXER_BUILD_FAIL, nullptr, UINT64, "number of calls to indexer->build() failed", TOKU_ENGINE_STATUS); 77 STATUS_INIT(INDEXER_CLOSE, nullptr, UINT64, "number of calls to indexer->close() that succeeded", TOKU_ENGINE_STATUS); 78 STATUS_INIT(INDEXER_CLOSE_FAIL, nullptr, UINT64, "number of calls to indexer->close() that failed", TOKU_ENGINE_STATUS); 79 STATUS_INIT(INDEXER_ABORT, nullptr, UINT64, "number of calls to indexer->abort()", TOKU_ENGINE_STATUS); 80 STATUS_INIT(INDEXER_CURRENT, nullptr, UINT64, "number of indexers currently in existence", TOKU_ENGINE_STATUS); 81 STATUS_INIT(INDEXER_MAX, nullptr, UINT64, "max number of indexers that ever existed simultaneously", TOKU_ENGINE_STATUS); 82 indexer_status.initialized = true; 83 } 84 #undef STATUS_INIT 85 86 void 87 toku_indexer_get_status(INDEXER_STATUS statp) { 88 if (!indexer_status.initialized) 89 status_init(); 90 *statp = indexer_status; 91 } 92 93 #define STATUS_VALUE(x) indexer_status.status[x].value.num 94 95 #include "indexer-internal.h" 96 97 static int build_index(DB_INDEXER *indexer); 98 static int close_indexer(DB_INDEXER *indexer); 99 static int abort_indexer(DB_INDEXER *indexer); 100 static void free_indexer_resources(DB_INDEXER *indexer); 101 static void free_indexer(DB_INDEXER *indexer); 102 static int update_estimated_rows(DB_INDEXER *indexer); 103 static int maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count); 104 105 static int 106 associate_indexer_with_hot_dbs(DB_INDEXER *indexer, DB *dest_dbs[], int N) { 107 int result =0; 108 for (int i = 0; i < N; i++) { 109 result = toku_db_set_indexer(dest_dbs[i], indexer); 110 if (result != 0) { 111 for (int j = 0; j < i; j++) { 112 int result2 = toku_db_set_indexer(dest_dbs[j], NULL); 113 lazy_assert(result2 == 0); 114 } 115 break; 116 } 117 } 118 return result; 119 } 120 121 static void 122 disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) { 123 for (int i = 0; i < indexer->i->N; i++) { 124 int result = toku_db_set_indexer(indexer->i->dest_dbs[i], NULL); 125 lazy_assert(result == 0); 126 } 127 } 128 129 /* 130 * free_indexer_resources() frees all of the resources associated with 131 * struct __toku_indexer_internal 132 * assumes any previously freed items set the field pointer to NULL 133 */ 134 135 static void 136 free_indexer_resources(DB_INDEXER *indexer) { 137 if ( indexer->i ) { 138 toku_mutex_destroy(&indexer->i->indexer_lock); 139 toku_mutex_destroy(&indexer->i->indexer_estimate_lock); 140 toku_destroy_dbt(&indexer->i->position_estimate); 141 if ( indexer->i->lec ) { 142 toku_le_cursor_close(indexer->i->lec); 143 } 144 if ( indexer->i->fnums ) { 145 toku_free(indexer->i->fnums); 146 indexer->i->fnums = NULL; 147 } 148 indexer_undo_do_destroy(indexer); 149 // indexer->i 150 toku_free(indexer->i); 151 indexer->i = NULL; 152 } 153 } 154 155 static void 156 free_indexer(DB_INDEXER *indexer) { 157 if ( indexer ) { 158 free_indexer_resources(indexer); 159 toku_free(indexer); 160 indexer = NULL; 161 } 162 } 163 164 void 165 toku_indexer_lock(DB_INDEXER* indexer) { 166 toku_mutex_lock(&indexer->i->indexer_lock); 167 } 168 169 void 170 toku_indexer_unlock(DB_INDEXER* indexer) { 171 toku_mutex_unlock(&indexer->i->indexer_lock); 172 } 173 174 // a shortcut call 175 // 176 // a cheap(er) call to see if a key must be inserted 177 // into the DB. If true, then we know we have to insert. 178 // If false, then we don't know, and have to check again 179 // after grabbing the indexer lock 180 bool 181 toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) { 182 bool may_insert = false; 183 toku_mutex_lock(&indexer->i->indexer_estimate_lock); 184 185 // if we have no position estimate, we can't tell, so return false 186 if (indexer->i->position_estimate.data == nullptr) { 187 may_insert = false; 188 } else { 189 DB *db = indexer->i->src_db; 190 const toku::comparator &cmp = toku_ft_get_comparator(db->i->ft_handle); 191 int c = cmp(&indexer->i->position_estimate, key); 192 193 // if key > position_estimate, then we know the indexer cursor 194 // is past key, and we can safely say that associated values of 195 // key must be inserted into the indexer's db 196 may_insert = c < 0; 197 } 198 199 toku_mutex_unlock(&indexer->i->indexer_estimate_lock); 200 return may_insert; 201 } 202 203 void 204 toku_indexer_update_estimate(DB_INDEXER* indexer) { 205 toku_mutex_lock(&indexer->i->indexer_estimate_lock); 206 toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate); 207 toku_mutex_unlock(&indexer->i->indexer_estimate_lock); 208 } 209 210 // forward declare the test-only wrapper function for undo-do 211 static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule); 212 213 int 214 toku_indexer_create_indexer(DB_ENV *env, 215 DB_TXN *txn, 216 DB_INDEXER **indexerp, 217 DB *src_db, 218 int N, 219 DB *dest_dbs[/*N*/], 220 uint32_t db_flags[/*N*/] UU(), 221 uint32_t indexer_flags) 222 { 223 int rval; 224 DB_INDEXER *indexer = 0; // set later when created 225 HANDLE_READ_ONLY_TXN(txn); 226 227 *indexerp = NULL; 228 229 XCALLOC(indexer); // init to all zeroes (thus initializing the error_callback and poll_func) 230 if ( !indexer ) { rval = ENOMEM; goto create_exit; } 231 XCALLOC(indexer->i); // init to all zeroes (thus initializing all pointers to NULL) 232 if ( !indexer->i ) { rval = ENOMEM; goto create_exit; } 233 234 indexer->i->env = env; 235 indexer->i->txn = txn; 236 indexer->i->src_db = src_db; 237 indexer->i->N = N; 238 indexer->i->dest_dbs = dest_dbs; 239 indexer->i->indexer_flags = indexer_flags; 240 indexer->i->loop_mod = 1000; // call poll_func every 1000 rows 241 indexer->i->estimated_rows = 0; 242 indexer->i->undo_do = test_indexer_undo_do; // TEST export the undo do function 243 244 XCALLOC_N(N, indexer->i->fnums); 245 if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; } 246 for(int i=0;i<indexer->i->N;i++) { 247 indexer->i->fnums[i] = toku_cachefile_filenum(db_struct_i(dest_dbs[i])->ft_handle->ft->cf); 248 } 249 indexer->i->filenums.num = N; 250 indexer->i->filenums.filenums = indexer->i->fnums; 251 indexer->i->test_only_flags = 0; // for test use only 252 253 indexer->set_error_callback = toku_indexer_set_error_callback; 254 indexer->set_poll_function = toku_indexer_set_poll_function; 255 indexer->build = build_index; 256 indexer->close = close_indexer; 257 indexer->abort = abort_indexer; 258 259 toku_mutex_init( 260 *indexer_i_indexer_lock_mutex_key, &indexer->i->indexer_lock, nullptr); 261 toku_mutex_init(*indexer_i_indexer_estimate_lock_mutex_key, 262 &indexer->i->indexer_estimate_lock, 263 nullptr); 264 toku_init_dbt(&indexer->i->position_estimate); 265 266 // 267 // create and close a dummy loader to get redirection going for the hot 268 // indexer 269 // This way, if the hot index aborts, but other transactions have references 270 // to the 271 // underlying FT, then those transactions can do dummy operations on the FT 272 // while the DB gets redirected back to an empty dictionary 273 // 274 { 275 DB_LOADER* loader = NULL; 276 rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true); 277 if (rval) { 278 goto create_exit; 279 } 280 rval = loader->close(loader); 281 if (rval) { 282 goto create_exit; 283 } 284 } 285 286 // create and initialize the leafentry cursor 287 rval = toku_le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->ft_handle, db_txn_struct_i(txn)->tokutxn); 288 if ( !indexer->i->lec ) { goto create_exit; } 289 290 // 2954: add recovery and rollback entries 291 LSN hot_index_lsn; // not used (yet) 292 TOKUTXN ttxn; 293 ttxn = db_txn_struct_i(txn)->tokutxn; 294 FILENUMS filenums; 295 filenums = indexer->i->filenums; 296 toku_multi_operation_client_lock(); 297 toku_ft_hot_index(NULL, ttxn, filenums, 1, &hot_index_lsn); 298 toku_multi_operation_client_unlock(); 299 300 if (rval == 0) { 301 rval = associate_indexer_with_hot_dbs(indexer, dest_dbs, N); 302 } 303 create_exit: 304 if ( rval == 0 ) { 305 306 indexer_undo_do_init(indexer); 307 308 *indexerp = indexer; 309 310 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE), 1); 311 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CURRENT), 1); 312 if ( STATUS_VALUE(INDEXER_CURRENT) > STATUS_VALUE(INDEXER_MAX) ) 313 STATUS_VALUE(INDEXER_MAX) = STATUS_VALUE(INDEXER_CURRENT); // NOT WORTH A LOCK TO MAKE THREADSAFE), may be inaccurate 314 315 } else { 316 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE_FAIL), 1); 317 free_indexer(indexer); 318 } 319 320 return rval; 321 } 322 323 int 324 toku_indexer_set_poll_function(DB_INDEXER *indexer, 325 int (*poll_func)(void *poll_extra, 326 float progress), 327 void *poll_extra) 328 { 329 invariant(indexer != NULL); 330 indexer->i->poll_func = poll_func; 331 indexer->i->poll_extra = poll_extra; 332 return 0; 333 } 334 335 int 336 toku_indexer_set_error_callback(DB_INDEXER *indexer, 337 void (*error_cb)(DB *db, int i, int err, 338 DBT *key, DBT *val, 339 void *error_extra), 340 void *error_extra) 341 { 342 invariant(indexer != NULL); 343 indexer->i->error_callback = error_cb; 344 indexer->i->error_extra = error_extra; 345 return 0; 346 } 347 348 // a key is to the right of the indexer's cursor if it compares 349 // greater than the current le cursor position. 350 bool 351 toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) { 352 // the hot indexer runs from the end to the beginning, it gets the largest keys first 353 // 354 // if key is less than indexer's position, then we should NOT insert it because 355 // the indexer will get to it. If it is greater or equal, that means the indexer 356 // has already processed the key, and will not get to it, therefore, we need 357 // to handle it 358 return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key); 359 } 360 361 // initialize provisional info by allocating enough space to hold provisional 362 // ids, states, and txns for each of the provisional entries in the ule. the 363 // ule and le remain owned by the caller, not this struct. 364 static void 365 ule_prov_info_init(struct ule_prov_info *prov_info, const void* key, uint32_t keylen, LEAFENTRY le, ULEHANDLE ule) { 366 prov_info->le = le; 367 prov_info->ule = ule; 368 prov_info->keylen = keylen; 369 prov_info->key = toku_xmalloc(keylen); 370 memcpy(prov_info->key, key, keylen); 371 prov_info->num_provisional = ule_get_num_provisional(ule); 372 prov_info->num_committed = ule_get_num_committed(ule); 373 uint32_t n = prov_info->num_provisional; 374 if (n > 0) { 375 XMALLOC_N(n, prov_info->prov_ids); 376 XMALLOC_N(n, prov_info->prov_states); 377 XMALLOC_N(n, prov_info->prov_txns); 378 } 379 } 380 381 // clean up anything possibly created by ule_prov_info_init() 382 static void 383 ule_prov_info_destroy(struct ule_prov_info *prov_info) { 384 if (prov_info->num_provisional > 0) { 385 toku_free(prov_info->prov_ids); 386 toku_free(prov_info->prov_states); 387 toku_free(prov_info->prov_txns); 388 } else { 389 // nothing to free if there was nothing provisional 390 invariant(prov_info->prov_ids == NULL); 391 invariant(prov_info->prov_states == NULL); 392 invariant(prov_info->prov_txns == NULL); 393 } 394 } 395 396 static void 397 indexer_fill_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) { 398 ULEHANDLE ule = prov_info->ule; 399 uint32_t num_provisional = prov_info->num_provisional; 400 uint32_t num_committed = prov_info->num_committed; 401 TXNID *prov_ids = prov_info->prov_ids; 402 TOKUTXN_STATE *prov_states = prov_info->prov_states; 403 TOKUTXN *prov_txns = prov_info->prov_txns; 404 405 // don't both grabbing the txn manager lock if we don't 406 // have any provisional txns to record 407 if (num_provisional == 0) { 408 return; 409 } 410 411 // handle test case first 412 if (indexer->i->test_xid_state) { 413 for (uint32_t i = 0; i < num_provisional; i++) { 414 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i); 415 prov_ids[i] = uxr_get_txnid(uxr); 416 prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]); 417 prov_txns[i] = NULL; 418 } 419 return; 420 } 421 422 // hold the txn manager lock while we inspect txn state 423 // and pin some live txns 424 DB_ENV *env = indexer->i->env; 425 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger); 426 TXNID parent_xid = uxr_get_txnid(ule_get_uxr(ule, num_committed)); 427 428 // let's first initialize things to defaults 429 for (uint32_t i = 0; i < num_provisional; i++) { 430 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i); 431 prov_ids[i] = uxr_get_txnid(uxr); 432 prov_txns[i] = NULL; 433 prov_states[i] = TOKUTXN_RETIRED; 434 } 435 436 toku_txn_manager_suspend(txn_manager); 437 TXNID_PAIR root_xid_pair = {.parent_id64=parent_xid, .child_id64 = TXNID_NONE}; 438 TOKUTXN root_txn = NULL; 439 toku_txn_manager_id2txn_unlocked( 440 txn_manager, 441 root_xid_pair, 442 &root_txn 443 ); 444 if (root_txn == NULL) { 445 toku_txn_manager_resume(txn_manager); 446 return; //everything is retired in this case, the default 447 } 448 prov_txns[0] = root_txn; 449 prov_states[0] = toku_txn_get_state(root_txn); 450 toku_txn_lock_state(root_txn); 451 prov_states[0] = toku_txn_get_state(root_txn); 452 if (prov_states[0] == TOKUTXN_LIVE || prov_states[0] == TOKUTXN_PREPARING) { 453 // pin this live txn so it can't commit or abort until we're done with it 454 toku_txn_pin_live_txn_unlocked(root_txn); 455 } 456 toku_txn_unlock_state(root_txn); 457 458 root_txn->child_manager->suspend(); 459 for (uint32_t i = 1; i < num_provisional; i++) { 460 UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i); 461 TXNID child_id = uxr_get_txnid(uxr); 462 TOKUTXN txn = NULL; 463 464 TXNID_PAIR txnid_pair = {.parent_id64 = parent_xid, .child_id64 = child_id}; 465 root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid_pair, &txn); 466 prov_txns[i] = txn; 467 if (txn) { 468 toku_txn_lock_state(txn); 469 prov_states[i] = toku_txn_get_state(txn); 470 if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) { 471 // pin this live txn so it can't commit or abort until we're done with it 472 toku_txn_pin_live_txn_unlocked(txn); 473 } 474 toku_txn_unlock_state(txn); 475 } 476 else { 477 prov_states[i] = TOKUTXN_RETIRED; 478 } 479 } 480 root_txn->child_manager->resume(); 481 toku_txn_manager_resume(txn_manager); 482 } 483 484 struct le_cursor_extra { 485 DB_INDEXER *indexer; 486 struct ule_prov_info *prov_info; 487 }; 488 489 // cursor callback, so its synchronized with other db operations using 490 // cachetable pair locks. because no txn can commit on this db, read 491 // the provisional info for the newly read ule. 492 static int 493 le_cursor_callback(uint32_t keylen, const void *key, uint32_t UU(vallen), const void *val, void *extra, bool lock_only) { 494 if (lock_only || val == NULL) { 495 ; // do nothing if only locking. do nothing if val==NULL, means DB_NOTFOUND 496 } else { 497 struct le_cursor_extra *CAST_FROM_VOIDP(cursor_extra, extra); 498 struct ule_prov_info *prov_info = cursor_extra->prov_info; 499 // the val here is a leafentry. ule_create does not copy the entire 500 // contents of the leafentry it is given into its own buffers, so we 501 // must allocate space for a leafentry and keep it around with the ule. 502 LEAFENTRY CAST_FROM_VOIDP(le, toku_xmemdup(val, vallen)); 503 ULEHANDLE ule = toku_ule_create(le); 504 invariant(ule); 505 // when we initialize prov info, we also pass in the leafentry and ule 506 // pointers so the caller can access them later. it's their job to free 507 // them when they're not needed. 508 ule_prov_info_init(prov_info, key, keylen, le, ule); 509 indexer_fill_prov_info(cursor_extra->indexer, prov_info); 510 } 511 return 0; 512 } 513 514 // get the next ule and fill out its provisional info in the 515 // prov_info struct provided. caller is responsible for cleaning 516 // up the ule info after it's done. 517 static int 518 get_next_ule_with_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) { 519 struct le_cursor_extra extra = { 520 .indexer = indexer, 521 .prov_info = prov_info, 522 }; 523 int r = toku_le_cursor_next(indexer->i->lec, le_cursor_callback, &extra); 524 return r; 525 } 526 527 static int 528 build_index(DB_INDEXER *indexer) { 529 int result = 0; 530 531 bool done = false; 532 for (uint64_t loop_count = 0; !done; loop_count++) { 533 534 toku_indexer_lock(indexer); 535 // grab the multi operation lock because we will be injecting messages 536 // grab it here because we must hold it before 537 // trying to pin any live transactions, as discovered by #5775 538 toku_multi_operation_client_lock(); 539 540 // grab the next leaf entry and get its provisional info. we'll 541 // need the provisional info for the undo-do algorithm, and we get 542 // it here so it can be read atomically with respect to txn commit 543 // and abort. the atomicity comes from the root-to-leaf path pinned 544 // by the query and in the getf callback function 545 // 546 // this allocates space for the prov info, so we have to destroy it 547 // when we're done. 548 struct ule_prov_info prov_info; 549 memset(&prov_info, 0, sizeof(prov_info)); 550 result = get_next_ule_with_prov_info(indexer, &prov_info); 551 552 if (result != 0) { 553 invariant(prov_info.ule == NULL); 554 done = true; 555 if (result == DB_NOTFOUND) { 556 result = 0; // all done, normal way to exit loop successfully 557 } 558 } 559 else { 560 invariant(prov_info.le); 561 invariant(prov_info.ule); 562 for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) { 563 DB *db = indexer->i->dest_dbs[which_db]; 564 DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db]; 565 DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db]; 566 result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals); 567 if ((result != 0) && (indexer->i->error_callback != NULL)) { 568 // grab the key and call the error callback 569 DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC); 570 toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL); 571 indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra); 572 toku_destroy_dbt(&key); 573 } 574 } 575 // the leafentry and ule are not owned by the prov_info, 576 // and are still our responsibility to free 577 toku_free(prov_info.le); 578 toku_free(prov_info.key); 579 toku_ule_free(prov_info.ule); 580 } 581 582 toku_multi_operation_client_unlock(); 583 toku_indexer_unlock(indexer); 584 ule_prov_info_destroy(&prov_info); 585 586 if (result == 0) { 587 result = maybe_call_poll_func(indexer, loop_count); 588 } 589 if (result != 0) { 590 done = true; 591 } 592 } 593 594 // post index creation cleanup 595 // - optimize? 596 // - garbage collect? 597 // - unique checks? 598 599 if ( result == 0 ) { 600 // Perform a checkpoint so that all of the indexing makes it to disk before continuing. 601 // Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log. 602 DB_ENV *env = indexer->i->env; 603 CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable); 604 toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT); 605 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1); 606 } else { 607 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1); 608 } 609 610 return result; 611 } 612 613 // Clients must not operate on any of the hot dbs concurrently with close 614 static int 615 close_indexer(DB_INDEXER *indexer) { 616 int r = 0; 617 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1); 618 619 // Disassociate the indexer from the hot db and free_indexer 620 disassociate_indexer_from_hot_dbs(indexer); 621 free_indexer(indexer); 622 623 if ( r == 0 ) { 624 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE), 1); 625 } else { 626 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE_FAIL), 1); 627 } 628 return r; 629 } 630 631 // Clients must not operate on any of the hot dbs concurrently with abort 632 static int 633 abort_indexer(DB_INDEXER *indexer) { 634 (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1); 635 (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_ABORT), 1); 636 // Disassociate the indexer from the hot db and free_indexer 637 disassociate_indexer_from_hot_dbs(indexer); 638 free_indexer(indexer); 639 return 0; 640 } 641 642 643 // derived from the handlerton's estimate_num_rows() 644 static int 645 update_estimated_rows(DB_INDEXER *indexer) { 646 int error; 647 DB_TXN *txn = NULL; 648 DB_ENV *db_env = indexer->i->env; 649 error = db_env->txn_begin(db_env, 0, &txn, DB_READ_UNCOMMITTED); 650 if (error == 0) { 651 DB_BTREE_STAT64 stats; 652 DB *db = indexer->i->src_db; 653 error = db->stat64(db, txn, &stats); 654 if (error == 0) { 655 indexer->i->estimated_rows = stats.bt_ndata; 656 } 657 txn->commit(txn, 0); 658 } 659 return error; 660 } 661 662 static int 663 maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) { 664 int result = 0; 665 if ( indexer->i->poll_func != NULL && ( loop_count % indexer->i->loop_mod ) == 0 ) { 666 int r __attribute__((unused)) = update_estimated_rows(indexer); 667 // what happens if estimate_rows fails? 668 // - currently does not modify estimate, which is probably sufficient 669 float progress; 670 if ( indexer->i->estimated_rows == 0 || loop_count > indexer->i->estimated_rows) 671 progress = 1.0; 672 else 673 progress = (float)loop_count / (float)indexer->i->estimated_rows; 674 result = indexer->i->poll_func(indexer->i->poll_extra, progress); 675 } 676 return result; 677 } 678 679 680 // this allows us to force errors under test. Flags are defined in indexer.h 681 void 682 toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) { 683 invariant(indexer != NULL); 684 indexer->i->test_only_flags = flags; 685 } 686 687 // this allows us to call the undo do function in tests using 688 // a convenience wrapper that gets and destroys the ule's prov info 689 static int 690 test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule) { 691 int which_db; 692 for (which_db = 0; which_db < indexer->i->N; which_db++) { 693 if (indexer->i->dest_dbs[which_db] == hotdb) { 694 break; 695 } 696 } 697 if (which_db == indexer->i->N) { 698 return EINVAL; 699 } 700 struct ule_prov_info prov_info; 701 memset(&prov_info, 0, sizeof(prov_info)); 702 // pass null for the leafentry - we don't need it, neither does the info 703 ule_prov_info_init(&prov_info, key->data, key->size, NULL, ule); // mallocs prov_info->key, owned by this function 704 indexer_fill_prov_info(indexer, &prov_info); 705 DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db]; 706 DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db]; 707 int r = indexer_undo_do(indexer, hotdb, &prov_info, hot_keys, hot_vals); 708 toku_free(prov_info.key); 709 ule_prov_info_destroy(&prov_info); 710 return r; 711 } 712 713 DB * 714 toku_indexer_get_src_db(DB_INDEXER *indexer) { 715 return indexer->i->src_db; 716 } 717 718 719 #undef STATUS_VALUE 720 721