1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 /*
40  *   The indexer
41  */
42 #include <stdio.h>
43 #include <string.h>
44 #include <toku_portability.h>
45 #include "toku_assert.h"
46 #include "ydb-internal.h"
47 #include <ft/le-cursor.h>
48 #include "indexer.h"
49 #include <ft/ft-ops.h>
50 #include <ft/leafentry.h>
51 #include <ft/ule.h>
52 #include <ft/txn/xids.h>
53 #include <ft/logger/log-internal.h>
54 #include <ft/cachetable/checkpoint.h>
55 #include <portability/toku_atomic.h>
56 #include "loader.h"
57 #include <util/status.h>
58 
59 ///////////////////////////////////////////////////////////////////////////////////
60 // Engine status
61 //
62 // Status is intended for display to humans to help understand system behavior.
63 // It does not need to be perfectly thread-safe.
64 
65 static INDEXER_STATUS_S indexer_status;
66 
67 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(indexer_status, k, c, t, "indexer: " l, inc)
68 
69 static void
status_init(void)70 status_init(void) {
71     // Note, this function initializes the keyname, type, and legend fields.
72     // Value fields are initialized to zero by compiler.
73     STATUS_INIT(INDEXER_CREATE,      nullptr, UINT64, "number of indexers successfully created", TOKU_ENGINE_STATUS);
74     STATUS_INIT(INDEXER_CREATE_FAIL, nullptr, UINT64, "number of calls to toku_indexer_create_indexer() that failed", TOKU_ENGINE_STATUS);
75     STATUS_INIT(INDEXER_BUILD,       nullptr, UINT64, "number of calls to indexer->build() succeeded", TOKU_ENGINE_STATUS);
76     STATUS_INIT(INDEXER_BUILD_FAIL,  nullptr, UINT64, "number of calls to indexer->build() failed", TOKU_ENGINE_STATUS);
77     STATUS_INIT(INDEXER_CLOSE,       nullptr, UINT64, "number of calls to indexer->close() that succeeded", TOKU_ENGINE_STATUS);
78     STATUS_INIT(INDEXER_CLOSE_FAIL,  nullptr, UINT64, "number of calls to indexer->close() that failed", TOKU_ENGINE_STATUS);
79     STATUS_INIT(INDEXER_ABORT,       nullptr, UINT64, "number of calls to indexer->abort()", TOKU_ENGINE_STATUS);
80     STATUS_INIT(INDEXER_CURRENT,     nullptr, UINT64, "number of indexers currently in existence", TOKU_ENGINE_STATUS);
81     STATUS_INIT(INDEXER_MAX,         nullptr, UINT64, "max number of indexers that ever existed simultaneously", TOKU_ENGINE_STATUS);
82     indexer_status.initialized = true;
83 }
84 #undef STATUS_INIT
85 
86 void
toku_indexer_get_status(INDEXER_STATUS statp)87 toku_indexer_get_status(INDEXER_STATUS statp) {
88     if (!indexer_status.initialized)
89         status_init();
90     *statp = indexer_status;
91 }
92 
93 #define STATUS_VALUE(x) indexer_status.status[x].value.num
94 
95 #include "indexer-internal.h"
96 
97 static int build_index(DB_INDEXER *indexer);
98 static int close_indexer(DB_INDEXER *indexer);
99 static int abort_indexer(DB_INDEXER *indexer);
100 static void free_indexer_resources(DB_INDEXER *indexer);
101 static void free_indexer(DB_INDEXER *indexer);
102 static int update_estimated_rows(DB_INDEXER *indexer);
103 static int maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count);
104 
105 static int
associate_indexer_with_hot_dbs(DB_INDEXER * indexer,DB * dest_dbs[],int N)106 associate_indexer_with_hot_dbs(DB_INDEXER *indexer, DB *dest_dbs[], int N) {
107     int result =0;
108     for (int i = 0; i < N; i++) {
109         result = toku_db_set_indexer(dest_dbs[i], indexer);
110         if (result != 0) {
111             for (int j = 0; j < i; j++) {
112                 int result2 = toku_db_set_indexer(dest_dbs[j], NULL);
113                 lazy_assert(result2 == 0);
114             }
115             break;
116         }
117     }
118     return result;
119 }
120 
121 static void
disassociate_indexer_from_hot_dbs(DB_INDEXER * indexer)122 disassociate_indexer_from_hot_dbs(DB_INDEXER *indexer) {
123     for (int i = 0; i < indexer->i->N; i++) {
124         int result = toku_db_set_indexer(indexer->i->dest_dbs[i], NULL);
125         lazy_assert(result == 0);
126     }
127 }
128 
129 /*
130  *  free_indexer_resources() frees all of the resources associated with
131  *      struct __toku_indexer_internal
132  *  assumes any previously freed items set the field pointer to NULL
133  */
134 
135 static void
free_indexer_resources(DB_INDEXER * indexer)136 free_indexer_resources(DB_INDEXER *indexer) {
137     if ( indexer->i ) {
138         toku_mutex_destroy(&indexer->i->indexer_lock);
139         toku_mutex_destroy(&indexer->i->indexer_estimate_lock);
140         toku_destroy_dbt(&indexer->i->position_estimate);
141         if ( indexer->i->lec ) {
142             toku_le_cursor_close(indexer->i->lec);
143         }
144         if ( indexer->i->fnums ) {
145             toku_free(indexer->i->fnums);
146             indexer->i->fnums = NULL;
147         }
148         indexer_undo_do_destroy(indexer);
149         // indexer->i
150         toku_free(indexer->i);
151         indexer->i = NULL;
152     }
153 }
154 
155 static void
free_indexer(DB_INDEXER * indexer)156 free_indexer(DB_INDEXER *indexer) {
157     if ( indexer ) {
158         free_indexer_resources(indexer);
159         toku_free(indexer);
160         indexer = NULL;
161     }
162 }
163 
164 void
toku_indexer_lock(DB_INDEXER * indexer)165 toku_indexer_lock(DB_INDEXER* indexer) {
166     toku_mutex_lock(&indexer->i->indexer_lock);
167 }
168 
169 void
toku_indexer_unlock(DB_INDEXER * indexer)170 toku_indexer_unlock(DB_INDEXER* indexer) {
171     toku_mutex_unlock(&indexer->i->indexer_lock);
172 }
173 
174 // a shortcut call
175 //
176 // a cheap(er) call to see if a key must be inserted
177 // into the DB. If true, then we know we have to insert.
178 // If false, then we don't know, and have to check again
179 // after grabbing the indexer lock
180 bool
toku_indexer_may_insert(DB_INDEXER * indexer,const DBT * key)181 toku_indexer_may_insert(DB_INDEXER* indexer, const DBT* key) {
182     bool may_insert = false;
183     toku_mutex_lock(&indexer->i->indexer_estimate_lock);
184 
185     // if we have no position estimate, we can't tell, so return false
186     if (indexer->i->position_estimate.data == nullptr) {
187         may_insert = false;
188     } else {
189         DB *db = indexer->i->src_db;
190         const toku::comparator &cmp = toku_ft_get_comparator(db->i->ft_handle);
191         int c = cmp(&indexer->i->position_estimate, key);
192 
193         // if key > position_estimate, then we know the indexer cursor
194         // is past key, and we can safely say that associated values of
195         // key must be inserted into the indexer's db
196         may_insert = c < 0;
197     }
198 
199     toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
200     return may_insert;
201 }
202 
203 void
toku_indexer_update_estimate(DB_INDEXER * indexer)204 toku_indexer_update_estimate(DB_INDEXER* indexer) {
205     toku_mutex_lock(&indexer->i->indexer_estimate_lock);
206     toku_le_cursor_update_estimate(indexer->i->lec, &indexer->i->position_estimate);
207     toku_mutex_unlock(&indexer->i->indexer_estimate_lock);
208 }
209 
210 // forward declare the test-only wrapper function for undo-do
211 static int test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule);
212 
213 int
toku_indexer_create_indexer(DB_ENV * env,DB_TXN * txn,DB_INDEXER ** indexerp,DB * src_db,int N,DB * dest_dbs[],uint32_t db_flags[]UU (),uint32_t indexer_flags)214 toku_indexer_create_indexer(DB_ENV *env,
215                             DB_TXN *txn,
216                             DB_INDEXER **indexerp,
217                             DB *src_db,
218                             int N,
219                             DB *dest_dbs[/*N*/],
220                             uint32_t db_flags[/*N*/] UU(),
221                             uint32_t indexer_flags)
222 {
223     int rval;
224     DB_INDEXER *indexer = 0;   // set later when created
225     HANDLE_READ_ONLY_TXN(txn);
226 
227     *indexerp = NULL;
228 
229     XCALLOC(indexer);      // init to all zeroes (thus initializing the error_callback and poll_func)
230     if ( !indexer )    { rval = ENOMEM; goto create_exit; }
231     XCALLOC(indexer->i);   // init to all zeroes (thus initializing all pointers to NULL)
232     if ( !indexer->i ) { rval = ENOMEM; goto create_exit; }
233 
234     indexer->i->env                = env;
235     indexer->i->txn                = txn;
236     indexer->i->src_db             = src_db;
237     indexer->i->N                  = N;
238     indexer->i->dest_dbs           = dest_dbs;
239     indexer->i->indexer_flags      = indexer_flags;
240     indexer->i->loop_mod           = 1000; // call poll_func every 1000 rows
241     indexer->i->estimated_rows     = 0;
242     indexer->i->undo_do            = test_indexer_undo_do; // TEST export the undo do function
243 
244     XCALLOC_N(N, indexer->i->fnums);
245     if ( !indexer->i->fnums ) { rval = ENOMEM; goto create_exit; }
246     for(int i=0;i<indexer->i->N;i++) {
247         indexer->i->fnums[i]       = toku_cachefile_filenum(db_struct_i(dest_dbs[i])->ft_handle->ft->cf);
248     }
249     indexer->i->filenums.num       = N;
250     indexer->i->filenums.filenums  = indexer->i->fnums;
251     indexer->i->test_only_flags    = 0;  // for test use only
252 
253     indexer->set_error_callback    = toku_indexer_set_error_callback;
254     indexer->set_poll_function     = toku_indexer_set_poll_function;
255     indexer->build                 = build_index;
256     indexer->close = close_indexer;
257     indexer->abort = abort_indexer;
258 
259     toku_mutex_init(
260         *indexer_i_indexer_lock_mutex_key, &indexer->i->indexer_lock, nullptr);
261     toku_mutex_init(*indexer_i_indexer_estimate_lock_mutex_key,
262                     &indexer->i->indexer_estimate_lock,
263                     nullptr);
264     toku_init_dbt(&indexer->i->position_estimate);
265 
266     //
267     // create and close a dummy loader to get redirection going for the hot
268     // indexer
269     // This way, if the hot index aborts, but other transactions have references
270     // to the
271     // underlying FT, then those transactions can do dummy operations on the FT
272     // while the DB gets redirected back to an empty dictionary
273     //
274     {
275         DB_LOADER* loader = NULL;
276         rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true);
277         if (rval) {
278             goto create_exit;
279         }
280         rval = loader->close(loader);
281         if (rval) {
282             goto create_exit;
283         }
284     }
285 
286     // create and initialize the leafentry cursor
287     rval = toku_le_cursor_create(&indexer->i->lec, db_struct_i(src_db)->ft_handle, db_txn_struct_i(txn)->tokutxn);
288     if ( !indexer->i->lec ) { goto create_exit; }
289 
290     // 2954: add recovery and rollback entries
291     LSN hot_index_lsn; // not used (yet)
292     TOKUTXN      ttxn;
293     ttxn = db_txn_struct_i(txn)->tokutxn;
294     FILENUMS filenums;
295     filenums = indexer->i->filenums;
296     toku_multi_operation_client_lock();
297     toku_ft_hot_index(NULL, ttxn, filenums, 1, &hot_index_lsn);
298     toku_multi_operation_client_unlock();
299 
300     if (rval == 0) {
301         rval = associate_indexer_with_hot_dbs(indexer, dest_dbs, N);
302     }
303 create_exit:
304     if ( rval == 0 ) {
305 
306         indexer_undo_do_init(indexer);
307 
308         *indexerp = indexer;
309 
310         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE), 1);
311         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CURRENT), 1);
312         if ( STATUS_VALUE(INDEXER_CURRENT) > STATUS_VALUE(INDEXER_MAX) )
313             STATUS_VALUE(INDEXER_MAX) = STATUS_VALUE(INDEXER_CURRENT);   // NOT WORTH A LOCK TO MAKE THREADSAFE), may be inaccurate
314 
315     } else {
316         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CREATE_FAIL), 1);
317         free_indexer(indexer);
318     }
319 
320     return rval;
321 }
322 
323 int
toku_indexer_set_poll_function(DB_INDEXER * indexer,int (* poll_func)(void * poll_extra,float progress),void * poll_extra)324 toku_indexer_set_poll_function(DB_INDEXER *indexer,
325                                int (*poll_func)(void *poll_extra,
326                                                 float progress),
327                                void *poll_extra)
328 {
329     invariant(indexer != NULL);
330     indexer->i->poll_func  = poll_func;
331     indexer->i->poll_extra = poll_extra;
332     return 0;
333 }
334 
335 int
toku_indexer_set_error_callback(DB_INDEXER * indexer,void (* error_cb)(DB * db,int i,int err,DBT * key,DBT * val,void * error_extra),void * error_extra)336 toku_indexer_set_error_callback(DB_INDEXER *indexer,
337                                 void (*error_cb)(DB *db, int i, int err,
338                                                  DBT *key, DBT *val,
339                                                  void *error_extra),
340                                 void *error_extra)
341 {
342     invariant(indexer != NULL);
343     indexer->i->error_callback = error_cb;
344     indexer->i->error_extra    = error_extra;
345     return 0;
346 }
347 
348 // a key is to the right of the indexer's cursor if it compares
349 // greater than the current le cursor position.
350 bool
toku_indexer_should_insert_key(DB_INDEXER * indexer,const DBT * key)351 toku_indexer_should_insert_key(DB_INDEXER *indexer, const DBT *key) {
352     // the hot indexer runs from the end to the beginning, it gets the largest keys first
353     //
354     // if key is less than indexer's position, then we should NOT insert it because
355     // the indexer will get to it. If it is greater or equal, that means the indexer
356     // has already processed the key, and will not get to it, therefore, we need
357     // to handle it
358     return toku_le_cursor_is_key_greater_or_equal(indexer->i->lec, key);
359 }
360 
361 // initialize provisional info by allocating enough space to hold provisional
362 // ids, states, and txns for each of the provisional entries in the ule. the
363 // ule and le remain owned by the caller, not this struct.
364 static void
ule_prov_info_init(struct ule_prov_info * prov_info,const void * key,uint32_t keylen,LEAFENTRY le,ULEHANDLE ule)365 ule_prov_info_init(struct ule_prov_info *prov_info, const void* key, uint32_t keylen, LEAFENTRY le, ULEHANDLE ule) {
366     prov_info->le = le;
367     prov_info->ule = ule;
368     prov_info->keylen = keylen;
369     prov_info->key = toku_xmalloc(keylen);
370     memcpy(prov_info->key, key, keylen);
371     prov_info->num_provisional = ule_get_num_provisional(ule);
372     prov_info->num_committed = ule_get_num_committed(ule);
373     uint32_t n = prov_info->num_provisional;
374     if (n > 0) {
375         XMALLOC_N(n, prov_info->prov_ids);
376         XMALLOC_N(n, prov_info->prov_states);
377         XMALLOC_N(n, prov_info->prov_txns);
378     }
379 }
380 
381 // clean up anything possibly created by ule_prov_info_init()
382 static void
ule_prov_info_destroy(struct ule_prov_info * prov_info)383 ule_prov_info_destroy(struct ule_prov_info *prov_info) {
384     if (prov_info->num_provisional > 0) {
385         toku_free(prov_info->prov_ids);
386         toku_free(prov_info->prov_states);
387         toku_free(prov_info->prov_txns);
388     } else {
389         // nothing to free if there was nothing provisional
390         invariant(prov_info->prov_ids == NULL);
391         invariant(prov_info->prov_states == NULL);
392         invariant(prov_info->prov_txns == NULL);
393     }
394 }
395 
396 static void
indexer_fill_prov_info(DB_INDEXER * indexer,struct ule_prov_info * prov_info)397 indexer_fill_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
398     ULEHANDLE ule = prov_info->ule;
399     uint32_t num_provisional = prov_info->num_provisional;
400     uint32_t num_committed = prov_info->num_committed;
401     TXNID *prov_ids = prov_info->prov_ids;
402     TOKUTXN_STATE *prov_states = prov_info->prov_states;
403     TOKUTXN *prov_txns = prov_info->prov_txns;
404 
405     // don't both grabbing the txn manager lock if we don't
406     // have any provisional txns to record
407     if (num_provisional == 0) {
408         return;
409     }
410 
411     // handle test case first
412     if (indexer->i->test_xid_state) {
413         for (uint32_t i = 0; i < num_provisional; i++) {
414             UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
415             prov_ids[i] = uxr_get_txnid(uxr);
416             prov_states[i] = indexer->i->test_xid_state(indexer, prov_ids[i]);
417             prov_txns[i] = NULL;
418         }
419         return;
420     }
421 
422     // hold the txn manager lock while we inspect txn state
423     // and pin some live txns
424     DB_ENV *env = indexer->i->env;
425     TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
426     TXNID parent_xid = uxr_get_txnid(ule_get_uxr(ule, num_committed));
427 
428     // let's first initialize things to defaults
429     for (uint32_t i = 0; i < num_provisional; i++) {
430         UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
431         prov_ids[i] = uxr_get_txnid(uxr);
432         prov_txns[i] = NULL;
433         prov_states[i] = TOKUTXN_RETIRED;
434     }
435 
436     toku_txn_manager_suspend(txn_manager);
437     TXNID_PAIR root_xid_pair = {.parent_id64=parent_xid, .child_id64 = TXNID_NONE};
438     TOKUTXN root_txn = NULL;
439     toku_txn_manager_id2txn_unlocked(
440         txn_manager,
441         root_xid_pair,
442         &root_txn
443         );
444     if (root_txn == NULL) {
445         toku_txn_manager_resume(txn_manager);
446         return; //everything is retired in this case, the default
447     }
448     prov_txns[0] = root_txn;
449     prov_states[0] = toku_txn_get_state(root_txn);
450     toku_txn_lock_state(root_txn);
451     prov_states[0] = toku_txn_get_state(root_txn);
452     if (prov_states[0] == TOKUTXN_LIVE || prov_states[0] == TOKUTXN_PREPARING) {
453         // pin this live txn so it can't commit or abort until we're done with it
454         toku_txn_pin_live_txn_unlocked(root_txn);
455     }
456     toku_txn_unlock_state(root_txn);
457 
458     root_txn->child_manager->suspend();
459     for (uint32_t i = 1; i < num_provisional; i++) {
460         UXRHANDLE uxr = ule_get_uxr(ule, num_committed + i);
461         TXNID child_id = uxr_get_txnid(uxr);
462         TOKUTXN txn = NULL;
463 
464         TXNID_PAIR txnid_pair = {.parent_id64 = parent_xid, .child_id64 = child_id};
465         root_txn->child_manager->find_tokutxn_by_xid_unlocked(txnid_pair, &txn);
466         prov_txns[i] = txn;
467         if (txn) {
468             toku_txn_lock_state(txn);
469             prov_states[i] = toku_txn_get_state(txn);
470             if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
471                 // pin this live txn so it can't commit or abort until we're done with it
472                 toku_txn_pin_live_txn_unlocked(txn);
473             }
474             toku_txn_unlock_state(txn);
475         }
476         else {
477             prov_states[i] = TOKUTXN_RETIRED;
478         }
479     }
480     root_txn->child_manager->resume();
481     toku_txn_manager_resume(txn_manager);
482 }
483 
484 struct le_cursor_extra {
485     DB_INDEXER *indexer;
486     struct ule_prov_info *prov_info;
487 };
488 
489 // cursor callback, so its synchronized with other db operations using
490 // cachetable pair locks. because no txn can commit on this db, read
491 // the provisional info for the newly read ule.
492 static int
le_cursor_callback(uint32_t keylen,const void * key,uint32_t UU (vallen),const void * val,void * extra,bool lock_only)493 le_cursor_callback(uint32_t keylen, const void *key, uint32_t UU(vallen), const void *val, void *extra, bool lock_only) {
494     if (lock_only || val == NULL) {
495         ; // do nothing if only locking. do nothing if val==NULL, means DB_NOTFOUND
496     } else {
497         struct le_cursor_extra *CAST_FROM_VOIDP(cursor_extra, extra);
498         struct ule_prov_info *prov_info = cursor_extra->prov_info;
499         // the val here is a leafentry. ule_create does not copy the entire
500         // contents of the leafentry it is given into its own buffers, so we
501         // must allocate space for a leafentry and keep it around with the ule.
502         LEAFENTRY CAST_FROM_VOIDP(le, toku_xmemdup(val, vallen));
503         ULEHANDLE ule = toku_ule_create(le);
504         invariant(ule);
505         // when we initialize prov info, we also pass in the leafentry and ule
506         // pointers so the caller can access them later. it's their job to free
507         // them when they're not needed.
508         ule_prov_info_init(prov_info, key, keylen, le, ule);
509         indexer_fill_prov_info(cursor_extra->indexer, prov_info);
510     }
511     return 0;
512 }
513 
514 // get the next ule and fill out its provisional info in the
515 // prov_info struct provided. caller is responsible for cleaning
516 // up the ule info after it's done.
517 static int
get_next_ule_with_prov_info(DB_INDEXER * indexer,struct ule_prov_info * prov_info)518 get_next_ule_with_prov_info(DB_INDEXER *indexer, struct ule_prov_info *prov_info) {
519     struct le_cursor_extra extra = {
520         .indexer = indexer,
521         .prov_info = prov_info,
522     };
523     int r = toku_le_cursor_next(indexer->i->lec, le_cursor_callback, &extra);
524     return r;
525 }
526 
527 static int
build_index(DB_INDEXER * indexer)528 build_index(DB_INDEXER *indexer) {
529     int result = 0;
530 
531     bool done = false;
532     for (uint64_t loop_count = 0; !done; loop_count++) {
533 
534         toku_indexer_lock(indexer);
535         // grab the multi operation lock because we will be injecting messages
536         // grab it here because we must hold it before
537         // trying to pin any live transactions, as discovered by #5775
538         toku_multi_operation_client_lock();
539 
540         // grab the next leaf entry and get its provisional info. we'll
541         // need the provisional info for the undo-do algorithm, and we get
542         // it here so it can be read atomically with respect to txn commit
543         // and abort. the atomicity comes from the root-to-leaf path pinned
544         // by the query and in the getf callback function
545         //
546         // this allocates space for the prov info, so we have to destroy it
547         // when we're done.
548         struct ule_prov_info prov_info;
549         memset(&prov_info, 0, sizeof(prov_info));
550         result = get_next_ule_with_prov_info(indexer, &prov_info);
551 
552         if (result != 0) {
553             invariant(prov_info.ule == NULL);
554             done = true;
555             if (result == DB_NOTFOUND) {
556                 result = 0;  // all done, normal way to exit loop successfully
557             }
558         }
559         else {
560             invariant(prov_info.le);
561             invariant(prov_info.ule);
562             for (int which_db = 0; (which_db < indexer->i->N) && (result == 0); which_db++) {
563                 DB *db = indexer->i->dest_dbs[which_db];
564                 DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
565                 DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
566                 result = indexer_undo_do(indexer, db, &prov_info, hot_keys, hot_vals);
567                 if ((result != 0) && (indexer->i->error_callback != NULL)) {
568                     // grab the key and call the error callback
569                     DBT key; toku_init_dbt_flags(&key, DB_DBT_REALLOC);
570                     toku_dbt_set(prov_info.keylen, prov_info.key, &key, NULL);
571                     indexer->i->error_callback(db, which_db, result, &key, NULL, indexer->i->error_extra);
572                     toku_destroy_dbt(&key);
573                 }
574             }
575             // the leafentry and ule are not owned by the prov_info,
576             // and are still our responsibility to free
577             toku_free(prov_info.le);
578             toku_free(prov_info.key);
579             toku_ule_free(prov_info.ule);
580         }
581 
582         toku_multi_operation_client_unlock();
583         toku_indexer_unlock(indexer);
584         ule_prov_info_destroy(&prov_info);
585 
586         if (result == 0) {
587             result = maybe_call_poll_func(indexer, loop_count);
588         }
589         if (result != 0) {
590             done = true;
591         }
592     }
593 
594     // post index creation cleanup
595     //  - optimize?
596     //  - garbage collect?
597     //  - unique checks?
598 
599     if ( result == 0 ) {
600         // Perform a checkpoint so that all of the indexing makes it to disk before continuing.
601         // Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log.
602         DB_ENV *env = indexer->i->env;
603         CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
604         toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT);
605         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1);
606     } else {
607         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
608     }
609 
610     return result;
611 }
612 
613 // Clients must not operate on any of the hot dbs concurrently with close
614 static int
close_indexer(DB_INDEXER * indexer)615 close_indexer(DB_INDEXER *indexer) {
616     int r = 0;
617     (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
618 
619     // Disassociate the indexer from the hot db and free_indexer
620     disassociate_indexer_from_hot_dbs(indexer);
621     free_indexer(indexer);
622 
623     if ( r == 0 ) {
624         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE), 1);
625     } else {
626         (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_CLOSE_FAIL), 1);
627     }
628     return r;
629 }
630 
631 // Clients must not operate on any of the hot dbs concurrently with abort
632 static int
abort_indexer(DB_INDEXER * indexer)633 abort_indexer(DB_INDEXER *indexer) {
634     (void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
635     (void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_ABORT), 1);
636     // Disassociate the indexer from the hot db and free_indexer
637     disassociate_indexer_from_hot_dbs(indexer);
638     free_indexer(indexer);
639     return 0;
640 }
641 
642 
643 // derived from the handlerton's estimate_num_rows()
644 static int
update_estimated_rows(DB_INDEXER * indexer)645 update_estimated_rows(DB_INDEXER *indexer) {
646     int error;
647     DB_TXN *txn = NULL;
648     DB_ENV *db_env = indexer->i->env;
649     error = db_env->txn_begin(db_env, 0, &txn, DB_READ_UNCOMMITTED);
650     if (error == 0) {
651         DB_BTREE_STAT64 stats;
652         DB *db = indexer->i->src_db;
653         error = db->stat64(db, txn, &stats);
654         if (error == 0) {
655             indexer->i->estimated_rows = stats.bt_ndata;
656         }
657         txn->commit(txn, 0);
658     }
659     return error;
660 }
661 
662 static int
maybe_call_poll_func(DB_INDEXER * indexer,uint64_t loop_count)663 maybe_call_poll_func(DB_INDEXER *indexer, uint64_t loop_count) {
664     int result = 0;
665     if ( indexer->i->poll_func != NULL && ( loop_count % indexer->i->loop_mod ) == 0 ) {
666         int r __attribute__((unused)) = update_estimated_rows(indexer);
667         // what happens if estimate_rows fails?
668         //   - currently does not modify estimate, which is probably sufficient
669         float progress;
670         if ( indexer->i->estimated_rows == 0  || loop_count > indexer->i->estimated_rows)
671             progress = 1.0;
672         else
673             progress = (float)loop_count / (float)indexer->i->estimated_rows;
674         result = indexer->i->poll_func(indexer->i->poll_extra, progress);
675     }
676     return result;
677 }
678 
679 
680 // this allows us to force errors under test.  Flags are defined in indexer.h
681 void
toku_indexer_set_test_only_flags(DB_INDEXER * indexer,int flags)682 toku_indexer_set_test_only_flags(DB_INDEXER *indexer, int flags) {
683     invariant(indexer != NULL);
684     indexer->i->test_only_flags = flags;
685 }
686 
687 // this allows us to call the undo do function in tests using
688 // a convenience wrapper that gets and destroys the ule's prov info
689 static int
test_indexer_undo_do(DB_INDEXER * indexer,DB * hotdb,DBT * key,ULEHANDLE ule)690 test_indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, DBT* key, ULEHANDLE ule) {
691     int which_db;
692     for (which_db = 0; which_db < indexer->i->N; which_db++) {
693         if (indexer->i->dest_dbs[which_db] == hotdb) {
694             break;
695         }
696     }
697     if (which_db == indexer->i->N) {
698         return EINVAL;
699     }
700     struct ule_prov_info prov_info;
701     memset(&prov_info, 0, sizeof(prov_info));
702     // pass null for the leafentry - we don't need it, neither does the info
703     ule_prov_info_init(&prov_info, key->data, key->size, NULL, ule); // mallocs prov_info->key, owned by this function
704     indexer_fill_prov_info(indexer, &prov_info);
705     DBT_ARRAY *hot_keys = &indexer->i->hot_keys[which_db];
706     DBT_ARRAY *hot_vals = &indexer->i->hot_vals[which_db];
707     int r = indexer_undo_do(indexer, hotdb, &prov_info, hot_keys, hot_vals);
708     toku_free(prov_info.key);
709     ule_prov_info_destroy(&prov_info);
710     return r;
711 }
712 
713 DB *
toku_indexer_get_src_db(DB_INDEXER * indexer)714 toku_indexer_get_src_db(DB_INDEXER *indexer) {
715     return indexer->i->src_db;
716 }
717 
718 
719 #undef STATUS_VALUE
720 
721