1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <toku_portability.h>
40 #include <toku_assert.h>
41 
42 #include <stdio.h>
43 #include <string.h>
44 
45 #include <ft/le-cursor.h>
46 #include <ft/ft-ops.h>
47 #include <ft/leafentry.h>
48 #include <ft/ule.h>
49 #include <ft/txn/txn_manager.h>
50 #include <ft/txn/xids.h>
51 #include <ft/cachetable/checkpoint.h>
52 
53 #include "ydb-internal.h"
54 #include "ydb_row_lock.h"
55 #include "indexer.h"
56 #include "indexer-internal.h"
57 
58 // initialize the commit keys
59 static void
indexer_commit_keys_init(struct indexer_commit_keys * keys)60 indexer_commit_keys_init(struct indexer_commit_keys *keys) {
61     keys->max_keys = keys->current_keys = 0;
62     keys->keys = NULL;
63 }
64 
65 // destroy the commit keys
66 static void
indexer_commit_keys_destroy(struct indexer_commit_keys * keys)67 indexer_commit_keys_destroy(struct indexer_commit_keys *keys) {
68     for (int i = 0; i < keys->max_keys; i++)
69         toku_destroy_dbt(&keys->keys[i]);
70     toku_free(keys->keys);
71 }
72 
73 // return the number of keys in the ordered set
74 static int
indexer_commit_keys_valid(struct indexer_commit_keys * keys)75 indexer_commit_keys_valid(struct indexer_commit_keys *keys) {
76     return keys->current_keys;
77 }
78 
79 // add a key to the commit keys
80 static void
indexer_commit_keys_add(struct indexer_commit_keys * keys,size_t length,void * ptr)81 indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) {
82     if (keys->current_keys >= keys->max_keys) {
83         int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2;
84         keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT));
85         for (int i = keys->current_keys; i < new_max_keys; i++)
86             toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC);
87         keys->max_keys = new_max_keys;
88     }
89     DBT *key = &keys->keys[keys->current_keys];
90     toku_dbt_set(length, ptr, key, NULL);
91     keys->current_keys++;
92 }
93 
94 // set the ordered set to empty
95 static void
indexer_commit_keys_set_empty(struct indexer_commit_keys * keys)96 indexer_commit_keys_set_empty(struct indexer_commit_keys *keys) {
97     keys->current_keys = 0;
98 }
99 
100 // internal functions
101 static int indexer_set_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
102 static int indexer_append_xid(DB_INDEXER *indexer, TXNID xid, XIDS *xids_result);
103 
104 static bool indexer_find_prev_xr(DB_INDEXER *indexer, ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex);
105 
106 static int indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info* prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals);
107 static int indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn);
108 static int indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
109 static int indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn);
110 static int indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids);
111 static int indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
112 static void indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn);
113 
114 
115 // initialize undo globals located in the indexer private object
116 void
indexer_undo_do_init(DB_INDEXER * indexer)117 indexer_undo_do_init(DB_INDEXER *indexer) {
118     indexer_commit_keys_init(&indexer->i->commit_keys);
119     XMALLOC_N(indexer->i->N, indexer->i->hot_keys);
120     XMALLOC_N(indexer->i->N, indexer->i->hot_vals);
121     for (int which = 0; which < indexer->i->N; which++) {
122         toku_dbt_array_init(&indexer->i->hot_keys[which], 1);
123         toku_dbt_array_init(&indexer->i->hot_vals[which], 1);
124     }
125 }
126 
127 // destroy the undo globals
128 void
indexer_undo_do_destroy(DB_INDEXER * indexer)129 indexer_undo_do_destroy(DB_INDEXER *indexer) {
130     indexer_commit_keys_destroy(&indexer->i->commit_keys);
131     if (indexer->i->hot_keys) {
132         invariant(indexer->i->hot_vals);
133         for (int which = 0; which < indexer->i->N; which++) {
134             toku_dbt_array_destroy(&indexer->i->hot_keys[which]);
135             toku_dbt_array_destroy(&indexer->i->hot_vals[which]);
136         }
137         toku_free(indexer->i->hot_keys);
138         toku_free(indexer->i->hot_vals);
139     }
140 }
141 
142 static int
indexer_undo_do_committed(DB_INDEXER * indexer,DB * hotdb,struct ule_prov_info * prov_info,DBT_ARRAY * hot_keys,DBT_ARRAY * hot_vals)143 indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
144     int result = 0;
145     ULEHANDLE ule = prov_info->ule;
146 
147     // init the xids to the root xid
148     XIDS xids = toku_xids_get_root_xids();
149 
150     // scan the committed stack from bottom to top
151     uint32_t num_committed = ule_get_num_committed(ule);
152     for (uint64_t xrindex = 0; xrindex < num_committed; xrindex++) {
153 
154         indexer_commit_keys_set_empty(&indexer->i->commit_keys);
155 
156         // get the transaction record
157         UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
158 
159         // setup up the xids
160         TXNID this_xid = uxr_get_txnid(uxr);
161         result = indexer_set_xid(indexer, this_xid, &xids);
162         if (result != 0)
163             break;
164 
165         // placeholders in the committed stack are not allowed
166         invariant(!uxr_is_placeholder(uxr));
167 
168         // undo
169         if (xrindex > 0) {
170             uint64_t prev_xrindex = xrindex - 1;
171             UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
172             if (uxr_is_delete(prevuxr)) {
173                 ; // do nothing
174             } else if (uxr_is_insert(prevuxr)) {
175                 // generate the hot delete key
176                 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL);
177                 if (result == 0) {
178                     paranoid_invariant(hot_keys->size <= hot_keys->capacity);
179                     for (uint32_t i = 0; i < hot_keys->size; i++) {
180                         DBT *hotkey = &hot_keys->dbts[i];
181 
182                         // send the delete message
183                         result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids);
184                         if (result == 0) {
185                             indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
186                         }
187                     }
188                 }
189             } else {
190                 assert(0);
191             }
192         }
193         if (result != 0) {
194             break;
195         }
196 
197         // do
198         if (uxr_is_delete(uxr)) {
199             ; // do nothing
200         } else if (uxr_is_insert(uxr)) {
201             // generate the hot insert key and val
202             result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals);
203             if (result == 0) {
204                 paranoid_invariant(hot_keys->size == hot_vals->size);
205                 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
206                 paranoid_invariant(hot_vals->size <= hot_vals->capacity);
207                 for (uint32_t i = 0; i < hot_keys->size; i++) {
208                     DBT *hotkey = &hot_keys->dbts[i];
209                     DBT *hotval = &hot_vals->dbts[i];
210 
211                     // send the insert message
212                     result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids);
213                     if (result == 0) {
214                         indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
215                     }
216                 }
217             }
218         } else
219             assert(0);
220 
221         // send commit messages if needed
222         for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++)
223             result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
224 
225         if (result != 0)
226             break;
227     }
228 
229     toku_xids_destroy(&xids);
230 
231     return result;
232 }
233 
release_txns(ULEHANDLE ule,TOKUTXN_STATE * prov_states,TOKUTXN * prov_txns,DB_INDEXER * indexer)234 static void release_txns(
235     ULEHANDLE ule,
236     TOKUTXN_STATE* prov_states,
237     TOKUTXN* prov_txns,
238     DB_INDEXER *indexer
239     )
240 {
241     uint32_t num_provisional = ule_get_num_provisional(ule);
242     if (indexer->i->test_xid_state) {
243         goto exit;
244     }
245     for (uint32_t i = 0; i < num_provisional; i++) {
246         if (prov_states[i] == TOKUTXN_LIVE || prov_states[i] == TOKUTXN_PREPARING) {
247             toku_txn_unpin_live_txn(prov_txns[i]);
248         }
249     }
250 exit:
251     return;
252 }
253 
254 static int
indexer_undo_do_provisional(DB_INDEXER * indexer,DB * hotdb,struct ule_prov_info * prov_info,DBT_ARRAY * hot_keys,DBT_ARRAY * hot_vals)255 indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
256     int result = 0;
257     indexer_commit_keys_set_empty(&indexer->i->commit_keys);
258     ULEHANDLE ule = prov_info->ule;
259 
260     // init the xids to the root xid
261     XIDS xids = toku_xids_get_root_xids();
262 
263     uint32_t num_provisional = prov_info->num_provisional;
264     uint32_t num_committed = prov_info->num_committed;
265     TXNID *prov_ids = prov_info->prov_ids;
266     TOKUTXN *prov_txns = prov_info->prov_txns;
267     TOKUTXN_STATE *prov_states = prov_info->prov_states;
268 
269     // nothing to do if there's nothing provisional
270     if (num_provisional == 0) {
271         goto exit;
272     }
273 
274     TXNID outermost_xid_state;
275     outermost_xid_state = prov_states[0];
276 
277     // scan the provisional stack from the outermost to the innermost transaction record
278     TOKUTXN curr_txn;
279     curr_txn = NULL;
280     for (uint64_t xrindex = num_committed; xrindex < num_committed + num_provisional; xrindex++) {
281 
282         // get the ith transaction record
283         UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
284 
285         TXNID this_xid = uxr_get_txnid(uxr);
286         TOKUTXN_STATE this_xid_state = prov_states[xrindex - num_committed];
287 
288         if (this_xid_state == TOKUTXN_ABORTING) {
289             break;         // nothing to do once we reach a transaction that is aborting
290         }
291 
292         if (xrindex == num_committed) { // if this is the outermost xr
293             result = indexer_set_xid(indexer, this_xid, &xids);    // always add the outermost xid to the XIDS list
294             curr_txn = prov_txns[xrindex - num_committed];
295         } else {
296             switch (this_xid_state) {
297             case TOKUTXN_LIVE:
298                 result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
299                 curr_txn = prov_txns[xrindex - num_committed];
300                 if (!indexer->i->test_xid_state) {
301                     assert(curr_txn);
302                 }
303                 break;
304             case TOKUTXN_PREPARING:
305                 assert(0); // not allowed
306             case TOKUTXN_COMMITTING:
307             case TOKUTXN_ABORTING:
308             case TOKUTXN_RETIRED:
309                 break; // nothing to do
310             }
311         }
312         if (result != 0)
313             break;
314 
315         if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
316             // If the outermost is not live, then the inner state must be retired.  That's the way that the txn API works.
317             assert(this_xid_state == TOKUTXN_RETIRED);
318         }
319 
320         if (uxr_is_placeholder(uxr)) {
321             continue;         // skip placeholders
322         }
323         // undo
324         uint64_t prev_xrindex;
325         bool prev_xrindex_found = indexer_find_prev_xr(indexer, ule, xrindex, &prev_xrindex);
326         if (prev_xrindex_found) {
327             UXRHANDLE prevuxr = ule_get_uxr(ule, prev_xrindex);
328             if (uxr_is_delete(prevuxr)) {
329                 ; // do nothing
330             } else if (uxr_is_insert(prevuxr)) {
331                 // generate the hot delete key
332                 result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, prevuxr, hot_keys, NULL);
333                 if (result == 0) {
334                     paranoid_invariant(hot_keys->size <= hot_keys->capacity);
335                     for (uint32_t i = 0; i < hot_keys->size; i++) {
336                         DBT *hotkey = &hot_keys->dbts[i];
337 
338                         // send the delete message
339                         switch (outermost_xid_state) {
340                         case TOKUTXN_LIVE:
341                         case TOKUTXN_PREPARING:
342                             invariant(this_xid_state != TOKUTXN_ABORTING);
343                             invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
344                             result = indexer_ft_delete_provisional(indexer, hotdb, hotkey, xids, curr_txn);
345                             if (result == 0) {
346                                 indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], curr_txn);
347                             }
348                             break;
349                         case TOKUTXN_COMMITTING:
350                         case TOKUTXN_RETIRED:
351                             result = indexer_ft_delete_committed(indexer, hotdb, hotkey, xids);
352                             if (result == 0)
353                                 indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
354                             break;
355                         case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
356                             assert(0);
357                         }
358                     }
359                 }
360             } else
361                 assert(0);
362         }
363         if (result != 0)
364             break;
365 
366         // do
367         if (uxr_is_delete(uxr)) {
368             ; // do nothing
369         } else if (uxr_is_insert(uxr)) {
370             // generate the hot insert key and val
371             result = indexer_generate_hot_keys_vals(indexer, hotdb, prov_info, uxr, hot_keys, hot_vals);
372             if (result == 0) {
373                 paranoid_invariant(hot_keys->size == hot_vals->size);
374                 paranoid_invariant(hot_keys->size <= hot_keys->capacity);
375                 paranoid_invariant(hot_vals->size <= hot_vals->capacity);
376                 for (uint32_t i = 0; i < hot_keys->size; i++) {
377                     DBT *hotkey = &hot_keys->dbts[i];
378                     DBT *hotval = &hot_vals->dbts[i];
379 
380                     // send the insert message
381                     switch (outermost_xid_state) {
382                     case TOKUTXN_LIVE:
383                     case TOKUTXN_PREPARING:
384                         assert(this_xid_state != TOKUTXN_ABORTING);
385                         invariant(!curr_txn || toku_txn_get_state(curr_txn) == TOKUTXN_LIVE || toku_txn_get_state(curr_txn) == TOKUTXN_PREPARING);
386                         result = indexer_ft_insert_provisional(indexer, hotdb, hotkey, hotval, xids, curr_txn);
387                         if (result == 0) {
388                             indexer_lock_key(indexer, hotdb, hotkey, prov_ids[0], prov_txns[0]);
389                         }
390                         break;
391                     case TOKUTXN_COMMITTING:
392                     case TOKUTXN_RETIRED:
393                         result = indexer_ft_insert_committed(indexer, hotdb, hotkey, hotval, xids);
394                         // no need to do this because we do implicit commits on inserts
395                         if (0 && result == 0)
396                             indexer_commit_keys_add(&indexer->i->commit_keys, hotkey->size, hotkey->data);
397                         break;
398                     case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
399                         assert(0);
400                     }
401                 }
402             }
403         } else
404             assert(0);
405 
406         if (result != 0)
407             break;
408     }
409 
410     // send commits if the outermost provisional transaction is committed
411     for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) {
412         result = indexer_ft_commit(indexer, hotdb, &indexer->i->commit_keys.keys[i], xids);
413     }
414 
415     // be careful with this in the future. Right now, only exit path
416     // is BEFORE we call fill_prov_info, so this happens before exit
417     // If in the future we add a way to exit after fill_prov_info,
418     // then this will need to be handled below exit
419     release_txns(ule, prov_states, prov_txns, indexer);
420 exit:
421     toku_xids_destroy(&xids);
422     return result;
423 }
424 
425 int
indexer_undo_do(DB_INDEXER * indexer,DB * hotdb,struct ule_prov_info * prov_info,DBT_ARRAY * hot_keys,DBT_ARRAY * hot_vals)426 indexer_undo_do(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, DBT_ARRAY *hot_keys, DBT_ARRAY *hot_vals) {
427     int result = indexer_undo_do_committed(indexer, hotdb, prov_info, hot_keys, hot_vals);
428     if (result == 0) {
429         result = indexer_undo_do_provisional(indexer, hotdb, prov_info, hot_keys, hot_vals);
430     }
431     if (indexer->i->test_only_flags == INDEXER_TEST_ONLY_ERROR_CALLBACK)  {
432         result = EINVAL;
433     }
434 
435     return result;
436 }
437 
438 // set xids_result = [root_xid, this_xid]
439 // Note that this could be sped up by adding a new xids constructor that constructs the stack with
440 // exactly one xid.
441 static int
indexer_set_xid(DB_INDEXER * UU (indexer),TXNID this_xid,XIDS * xids_result)442 indexer_set_xid(DB_INDEXER *UU(indexer), TXNID this_xid, XIDS *xids_result) {
443     int result = 0;
444     XIDS old_xids = *xids_result;
445     XIDS new_xids = toku_xids_get_root_xids();
446     if (this_xid != TXNID_NONE) {
447         XIDS child_xids;
448         result = toku_xids_create_child(new_xids, &child_xids, this_xid);
449         toku_xids_destroy(&new_xids);
450         if (result == 0)
451             new_xids = child_xids;
452     }
453     if (result == 0) {
454         toku_xids_destroy(&old_xids);
455         *xids_result = new_xids;
456     }
457 
458     return result;
459 }
460 
461 // append xid to xids_result
462 static int
indexer_append_xid(DB_INDEXER * UU (indexer),TXNID xid,XIDS * xids_result)463 indexer_append_xid(DB_INDEXER *UU(indexer), TXNID xid, XIDS *xids_result) {
464     XIDS old_xids = *xids_result;
465     XIDS new_xids;
466     int result = toku_xids_create_child(old_xids, &new_xids, xid);
467     if (result == 0) {
468         toku_xids_destroy(&old_xids);
469         *xids_result = new_xids;
470     }
471     return result;
472 }
473 
474 static int
indexer_generate_hot_keys_vals(DB_INDEXER * indexer,DB * hotdb,struct ule_prov_info * prov_info,UXRHANDLE uxr,DBT_ARRAY * hotkeys,DBT_ARRAY * hotvals)475 indexer_generate_hot_keys_vals(DB_INDEXER *indexer, DB *hotdb, struct ule_prov_info *prov_info, UXRHANDLE uxr, DBT_ARRAY *hotkeys, DBT_ARRAY *hotvals) {
476     int result = 0;
477 
478     // setup the source key
479     DBT srckey;
480     toku_fill_dbt(&srckey, prov_info->key, prov_info->keylen);
481 
482     // setup the source val
483     DBT srcval;
484     toku_fill_dbt(&srcval, uxr_get_val(uxr), uxr_get_vallen(uxr));
485 
486     // generate the secondary row
487     DB_ENV *env = indexer->i->env;
488     if (hotvals) {
489         result = env->i->generate_row_for_put(hotdb, indexer->i->src_db, hotkeys, hotvals, &srckey, &srcval);
490     }
491     else {
492         result = env->i->generate_row_for_del(hotdb, indexer->i->src_db, hotkeys, &srckey, &srcval);
493     }
494     toku_destroy_dbt(&srckey);
495     toku_destroy_dbt(&srcval);
496 
497     return result;
498 }
499 
500 // Take a write lock on the given key for the outermost xid in the xids list.
501 static void
indexer_lock_key(DB_INDEXER * indexer,DB * hotdb,DBT * key,TXNID outermost_live_xid,TOKUTXN txn)502 indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_xid, TOKUTXN txn) {
503     // TEST
504     if (indexer->i->test_lock_key) {
505         indexer->i->test_lock_key(indexer, outermost_live_xid, hotdb, key);
506     } else {
507         toku_db_grab_write_lock(hotdb, key, txn);
508     }
509 }
510 
511 // find the index of a non-placeholder transaction record that is previous to the transaction record
512 // found at xrindex.  return true if one is found and return its index in prev_xrindex.  otherwise,
513 // return false.
514 static bool
indexer_find_prev_xr(DB_INDEXER * UU (indexer),ULEHANDLE ule,uint64_t xrindex,uint64_t * prev_xrindex)515 indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) {
516     assert(xrindex < ule_num_uxrs(ule));
517     bool  prev_found = false;
518     while (xrindex > 0) {
519         xrindex -= 1;
520         UXRHANDLE uxr = ule_get_uxr(ule, xrindex);
521         if (!uxr_is_placeholder(uxr)) {
522             *prev_xrindex = xrindex;
523             prev_found = true;
524             break;
525         }
526     }
527     return prev_found;
528 }
529 
530 // inject "delete" message into ft with logging in recovery and rollback logs,
531 // and making association between txn and ft
532 static int
indexer_ft_delete_provisional(DB_INDEXER * indexer,DB * hotdb,DBT * hotkey,XIDS xids,TOKUTXN txn)533 indexer_ft_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids, TOKUTXN txn) {
534     int result = 0;
535     // TEST
536     if (indexer->i->test_delete_provisional) {
537         result = indexer->i->test_delete_provisional(indexer, hotdb, hotkey, xids);
538     } else {
539         result = toku_ydb_check_avail_fs_space(indexer->i->env);
540         if (result == 0) {
541             assert(txn != NULL);
542             // Not sure if this is really necessary, as
543             // the hot index DB should have to be checkpointed
544             // upon commit of the hot index transaction, but
545             // it is safe to do this
546             // this question apples to delete_committed, insert_provisional
547             // and insert_committed
548             toku_ft_maybe_delete (hotdb->i->ft_handle, hotkey, txn, false, ZERO_LSN, true);
549         }
550     }
551     return result;
552 }
553 
554 // send a delete message into the tree without rollback or recovery logging
555 static int
indexer_ft_delete_committed(DB_INDEXER * indexer,DB * hotdb,DBT * hotkey,XIDS xids)556 indexer_ft_delete_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
557     int result = 0;
558     // TEST
559     if (indexer->i->test_delete_committed) {
560         result = indexer->i->test_delete_committed(indexer, hotdb, hotkey, xids);
561     } else {
562         result = toku_ydb_check_avail_fs_space(indexer->i->env);
563         if (result == 0) {
564             FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
565             TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
566             txn_manager_state txn_state_for_gc(txn_manager);
567 
568             TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
569             txn_gc_info gc_info(&txn_state_for_gc,
570                                 oldest_referenced_xid_estimate,
571                                 oldest_referenced_xid_estimate,
572                                 true);
573             toku_ft_send_delete(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
574             toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, -1);
575         }
576     }
577     return result;
578 }
579 
580 // inject "insert" message into ft with logging in recovery and rollback logs,
581 // and making association between txn and ft
582 static int
indexer_ft_insert_provisional(DB_INDEXER * indexer,DB * hotdb,DBT * hotkey,DBT * hotval,XIDS xids,TOKUTXN txn)583 indexer_ft_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids, TOKUTXN txn) {
584     int result = 0;
585     // TEST
586     if (indexer->i->test_insert_provisional) {
587         result = indexer->i->test_insert_provisional(indexer, hotdb, hotkey, hotval, xids);
588     } else {
589         result = toku_ydb_check_avail_fs_space(indexer->i->env);
590         if (result == 0) {
591             assert(txn != NULL);
592             // comment/question in indexer_ft_delete_provisional applies
593             toku_ft_maybe_insert (hotdb->i->ft_handle, hotkey, hotval, txn, false, ZERO_LSN, true, FT_INSERT);
594         }
595     }
596     return result;
597 }
598 
599 // send an insert message into the tree without rollback or recovery logging
600 // and without associating the txn and the ft
601 static int
indexer_ft_insert_committed(DB_INDEXER * indexer,DB * hotdb,DBT * hotkey,DBT * hotval,XIDS xids)602 indexer_ft_insert_committed(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT *hotval, XIDS xids) {
603     int result = 0;
604     // TEST
605     if (indexer->i->test_insert_committed) {
606         result = indexer->i->test_insert_committed(indexer, hotdb, hotkey, hotval, xids);
607     } else {
608         result = toku_ydb_check_avail_fs_space(indexer->i->env);
609         if (result == 0) {
610             FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
611             TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
612             txn_manager_state txn_state_for_gc(txn_manager);
613 
614             TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
615             txn_gc_info gc_info(&txn_state_for_gc,
616                                 oldest_referenced_xid_estimate,
617                                 oldest_referenced_xid_estimate,
618                                 true);
619             toku_ft_send_insert(db_struct_i(hotdb)->ft_handle, hotkey, hotval, xids, FT_INSERT, &gc_info);
620             toku_ft_adjust_logical_row_count(db_struct_i(hotdb)->ft_handle->ft, 1);
621         }
622     }
623     return result;
624 }
625 
626 // send a commit message into the tree
627 // Note: If the xid is zero, then the leafentry will already have a committed transaction
628 //       record and no commit message is needed.  (A commit message with xid of zero is
629 //       illegal anyway.)
630 static int
indexer_ft_commit(DB_INDEXER * indexer,DB * hotdb,DBT * hotkey,XIDS xids)631 indexer_ft_commit(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids) {
632     int result = 0;
633     if (toku_xids_get_num_xids(xids) > 0) {// send commit only when not the root xid
634         // TEST
635         if (indexer->i->test_commit_any) {
636             result = indexer->i->test_commit_any(indexer, hotdb, hotkey, xids);
637         } else {
638             result = toku_ydb_check_avail_fs_space(indexer->i->env);
639             if (result == 0) {
640                 FT_HANDLE ft_h = db_struct_i(hotdb)->ft_handle;
641                 TXN_MANAGER txn_manager = toku_ft_get_txn_manager(ft_h);
642                 txn_manager_state txn_state_for_gc(txn_manager);
643 
644                 TXNID oldest_referenced_xid_estimate = toku_ft_get_oldest_referenced_xid_estimate(ft_h);
645                 txn_gc_info gc_info(&txn_state_for_gc,
646                                     oldest_referenced_xid_estimate,
647                                     oldest_referenced_xid_estimate,
648                                     true);
649                 toku_ft_send_commit_any(db_struct_i(hotdb)->ft_handle, hotkey, xids, &gc_info);
650             }
651         }
652     }
653     return result;
654 }
655