1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 #include <my_global.h>
40 #include <ctype.h>
41 
42 #include <db.h>
43 #include <locktree/locktree.h>
44 #include <ft/ft.h>
45 #include <ft/ft-flusher.h>
46 #include <ft/cachetable/checkpoint.h>
47 
48 #include "ydb_cursor.h"
49 #include "ydb_row_lock.h"
50 #include "ydb_db.h"
51 #include "ydb_write.h"
52 #include "ydb-internal.h"
53 #include "ydb_load.h"
54 #include "indexer.h"
55 #include <portability/toku_atomic.h>
56 #include <util/status.h>
57 #include <ft/le-cursor.h>
58 
59 static YDB_DB_LAYER_STATUS_S ydb_db_layer_status;
60 #ifdef STATUS_VALUE
61 #undef STATUS_VALUE
62 #endif
63 #define STATUS_VALUE(x) ydb_db_layer_status.status[x].value.num
64 
65 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_db_layer_status, k, c, t, l, inc)
66 
67 static void
ydb_db_layer_status_init(void)68 ydb_db_layer_status_init (void) {
69     // Note, this function initializes the keyname, type, and legend fields.
70     // Value fields are initialized to zero by compiler.
71 
72     STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS,      nullptr, UINT64,   "directory write locks", TOKU_ENGINE_STATUS);
73     STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL, nullptr, UINT64,   "directory write locks fail", TOKU_ENGINE_STATUS);
74     STATUS_INIT(YDB_LAYER_LOGSUPPRESS,                nullptr, UINT64,   "log suppress", TOKU_ENGINE_STATUS);
75     STATUS_INIT(YDB_LAYER_LOGSUPPRESS_FAIL,           nullptr, UINT64,   "log suppress fail", TOKU_ENGINE_STATUS);
76     ydb_db_layer_status.initialized = true;
77 }
78 #undef STATUS_INIT
79 
80 void
ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp)81 ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp) {
82     if (!ydb_db_layer_status.initialized)
83         ydb_db_layer_status_init();
84     *statp = ydb_db_layer_status;
85 }
86 
create_iname_hint(DB_ENV * env,const char * dname,char * hint)87 void create_iname_hint(DB_ENV *env, const char *dname, char *hint) {
88     //Requires: size of hint array must be > strlen(dname)
89     //Copy alphanumeric characters only.
90     //Replace strings of non-alphanumeric characters with a single underscore.
91     if (env->get_dir_per_db(env) && !toku_os_is_absolute_name(dname)) {
92         assert(dname);
93         if (*dname == '.')
94             ++dname;
95         if (*dname == '/')
96             ++dname;
97         bool underscored = false;
98         bool dbdir_is_parsed = false;
99         // Do not change the first '/' because this is
100         // delimiter which splits name into database dir
101         // and table dir.
102         while (*dname) {
103             if (isalnum(*dname) || (*dname == '/' && !dbdir_is_parsed)) {
104                 char c = *dname++;
105                 *hint++ = c;
106                 if (c == '/')
107                     dbdir_is_parsed = true;
108                 underscored = false;
109             } else if (!dbdir_is_parsed) {
110                 char c = *dname++;
111                 *hint++ = c;
112             } else {
113                 if (!underscored)
114                     *hint++ = '_';
115                 dname++;
116                 underscored = true;
117             }
118         }
119         *hint = '\0';
120     } else {
121         bool underscored = false;
122         while (*dname) {
123             if (isalnum(*dname)) {
124                 char c = *dname++;
125                 *hint++ = c;
126                 underscored = false;
127             }
128             else {
129                 if (!underscored)
130                     *hint++ = '_';
131                 dname++;
132                 underscored = true;
133             }
134         }
135         *hint = '\0';
136     }
137 }
138 
139 // n < 0  means to ignore mark and ignore n
140 // n >= 0 means to include mark ("_B_" or "_P_") with hex value of n in iname
141 // (intended for use by loader, which will create many inames using one txnid).
create_iname(DB_ENV * env,uint64_t id1,uint64_t id2,char * hint,const char * mark,int n)142 char *create_iname(DB_ENV *env,
143                    uint64_t id1,
144                    uint64_t id2,
145                    char *hint,
146                    const char *mark,
147                    int n) {
148     int bytes;
149     char inamebase[strlen(hint) +
150                    8 +  // hex file format version
151                    24 + // hex id (normally the txnid's parent and child)
152                    8  + // hex value of n if non-neg
153                    sizeof("_B___.") + // extra pieces
154                    strlen(toku_product_name)];
155     if (n < 0)
156         bytes = snprintf(inamebase, sizeof(inamebase),
157                          "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32            ".%s",
158                          hint, id1, id2, FT_LAYOUT_VERSION, toku_product_name);
159     else {
160         invariant(strlen(mark) == 1);
161         bytes = snprintf(inamebase, sizeof(inamebase),
162                          "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32 "_%s_%" PRIx32 ".%s",
163                          hint, id1, id2, FT_LAYOUT_VERSION, mark, n, toku_product_name);
164     }
165     assert(bytes>0);
166     assert(bytes<=(int)sizeof(inamebase)-1);
167     char *rval;
168     if (env->i->data_dir)
169         rval = toku_construct_full_name(2, env->i->data_dir, inamebase);
170     else
171         rval = toku_construct_full_name(1, inamebase);
172     assert(rval);
173     return rval;
174 }
175 
176 static uint64_t nontransactional_open_id = 0;
177 
generate_iname_for_rename_or_open(DB_ENV * env,DB_TXN * txn,const char * dname,bool is_open)178 std::unique_ptr<char[], decltype(&toku_free)> generate_iname_for_rename_or_open(
179     DB_ENV *env,
180     DB_TXN *txn,
181     const char *dname,
182     bool is_open) {
183     std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free);
184     char hint[strlen(dname) + 1];
185     uint64_t id1 = 0;
186     uint64_t id2 = 0;
187 
188     if (txn) {
189         id1 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).parent_id64;
190         id2 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).child_id64;
191     } else if (is_open)
192         id1 = toku_sync_fetch_and_add(&nontransactional_open_id, 1);
193 
194     create_iname_hint(env, dname, hint);
195 
196     result.reset(create_iname(env, id1, id2, hint, NULL, -1));
197 
198     return result;
199 }
200 
201 static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode);
202 
203 // Effect: Do the work required of DB->close().
204 // requires: the multi_operation client lock is held.
205 int
toku_db_close(DB * db)206 toku_db_close(DB * db) {
207     int r = 0;
208     if (db_opened(db) && db->i->dname) {
209         // internal (non-user) dictionary has no dname
210         env_note_db_closed(db->dbenv, db);  // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
211     }
212     // close the ft handle, and possibly close the locktree
213     toku_ft_handle_close(db->i->ft_handle);
214     if (db->i->lt) {
215         db->dbenv->i->ltm.release_lt(db->i->lt);
216     }
217     toku_sdbt_cleanup(&db->i->skey);
218     toku_sdbt_cleanup(&db->i->sval);
219     if (db->i->dname) {
220         toku_free(db->i->dname);
221     }
222     toku_free(db->i);
223     toku_free(db);
224     return r;
225 }
226 
227 ///////////
228 //db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor
229 
230 int
db_getf_set(DB * db,DB_TXN * txn,uint32_t flags,DBT * key,YDB_CALLBACK_FUNCTION f,void * extra)231 db_getf_set(DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
232     HANDLE_PANICKED_DB(db);
233     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
234     DBC c;
235     uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
236     flags &= ~DB_ISOLATION_FLAGS;
237     int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
238     if (r==0) {
239         r = toku_c_getf_set(&c, flags, key, f, extra);
240         int r2 = toku_c_close_internal(&c);
241         if (r==0) r = r2;
242     }
243     return r;
244 }
245 
246 static inline int
db_thread_need_flags(DBT * dbt)247 db_thread_need_flags(DBT *dbt) {
248     return (dbt->flags & (DB_DBT_MALLOC+DB_DBT_REALLOC+DB_DBT_USERMEM)) == 0;
249 }
250 
251 int
toku_db_get(DB * db,DB_TXN * txn,DBT * key,DBT * data,uint32_t flags)252 toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, uint32_t flags) {
253     HANDLE_PANICKED_DB(db);
254     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
255     int r;
256     uint32_t iso_flags = flags & DB_ISOLATION_FLAGS;
257 
258     if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
259         return EINVAL;
260 
261     uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
262     flags &= ~lock_flags;
263     flags &= ~DB_ISOLATION_FLAGS;
264     // And DB_GET_BOTH is no longer supported. #2862.
265     if (flags != 0) return EINVAL;
266 
267     DBC dbc;
268     r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
269     if (r!=0) return r;
270     uint32_t c_get_flags = DB_SET;
271     r = toku_c_get(&dbc, key, data, c_get_flags | lock_flags);
272     int r2 = toku_c_close_internal(&dbc);
273     return r ? r : r2;
274 }
275 
276 static int
db_open_subdb(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)277 db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
278     int r;
279     if (!fname || !dbname) r = EINVAL;
280     else {
281         char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
282         int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
283         assert(bytes==(int)sizeof(subdb_full_name)-1);
284         const char *null_subdbname = NULL;
285         r = toku_db_open(db, txn, subdb_full_name, null_subdbname, dbtype, flags, mode);
286     }
287     return r;
288 }
289 
290 // inames are created here.
291 // algorithm:
292 //  begin txn
293 //  convert dname to iname (possibly creating new iname)
294 //  open file (toku_ft_handle_open() will handle logging)
295 //  close txn
296 //  if created a new iname, take full range lock
297 // Requires: no checkpoint may take place during this function, which is enforced by holding the multi_operation_client_lock.
298 static int
toku_db_open(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)299 toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
300     HANDLE_PANICKED_DB(db);
301     HANDLE_READ_ONLY_TXN(txn);
302     if (dbname != NULL) {
303         return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
304     }
305 
306     // at this point fname is the dname
307     //This code ONLY supports single-db files.
308     assert(dbname == NULL);
309     const char * dname = fname;  // db_open_subdb() converts (fname, dbname) to dname
310 
311     ////////////////////////////// do some level of parameter checking.
312     uint32_t unused_flags = flags;
313     int r;
314     if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
315     int is_db_excl    = flags & DB_EXCL;    unused_flags&=~DB_EXCL;
316     int is_db_create  = flags & DB_CREATE;  unused_flags&=~DB_CREATE;
317     int is_db_hot_index  = flags & DB_IS_HOT_INDEX;  unused_flags&=~DB_IS_HOT_INDEX;
318 
319     //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
320     unused_flags&=~DB_READ_UNCOMMITTED;
321     unused_flags&=~DB_READ_COMMITTED;
322     unused_flags&=~DB_SERIALIZABLE;
323 
324     // DB_THREAD is implicitly supported and DB_BLACKHOLE is supported at the ft-layer
325     unused_flags &= ~DB_THREAD;
326     unused_flags &= ~DB_BLACKHOLE;
327     unused_flags &= ~DB_RDONLY;
328 
329     // check for unknown or conflicting flags
330     if (unused_flags) return EINVAL; // unknown flags
331     if (is_db_excl && !is_db_create) return EINVAL;
332     if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
333 
334     if (db_opened(db)) {
335         // it was already open
336         return EINVAL;
337     }
338     //////////////////////////////
339 
340     // convert dname to iname
341     //  - look up dname, get iname
342     //  - if dname does not exist, create iname and make entry in directory
343     DBT dname_dbt;  // holds dname
344     DBT iname_dbt;  // holds iname_in_env
345     toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
346     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
347     r = toku_db_get(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
348     std::unique_ptr<char[], decltype(&toku_free)> iname(
349         static_cast<char *>(iname_dbt.data), &toku_free);
350     if (r == DB_NOTFOUND && !is_db_create) {
351         r = ENOENT;
352     } else if (r==0 && is_db_excl) {
353         r = EEXIST;
354     } else if (r == DB_NOTFOUND) {
355         iname = generate_iname_for_rename_or_open(db->dbenv, txn, dname, true);
356         toku_fill_dbt(&iname_dbt, iname.get(), strlen(iname.get()) + 1);
357         //
358         // put_flags will be 0 for performance only, avoid unnecessary query
359         // if we are creating a hot index, per #3166, we do not want the write lock  in directory grabbed.
360         // directory read lock is grabbed in toku_db_get above
361         //
362         uint32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0);
363         r = toku_db_put(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, put_flags, true);
364     }
365 
366     // we now have an iname
367     if (r == 0) {
368         r = toku_db_open_iname(db, txn, iname.get(), flags, mode);
369         if (r == 0) {
370             db->i->dname = toku_xstrdup(dname);
371             env_note_db_opened(db->dbenv, db);  // tell env that a new db handle is open (using dname)
372         }
373     }
374 
375     return r;
376 }
377 
378 // set the descriptor and cmp_descriptor to the
379 // descriptors from the given ft, updating the
380 // locktree's descriptor pointer if necessary
381 static void
db_set_descriptors(DB * db,FT_HANDLE ft_handle)382 db_set_descriptors(DB *db, FT_HANDLE ft_handle) {
383     const toku::comparator &cmp = toku_ft_get_comparator(ft_handle);
384     db->descriptor = toku_ft_get_descriptor(ft_handle);
385     db->cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
386     invariant(db->cmp_descriptor == cmp.get_descriptor());
387     if (db->i->lt) {
388         db->i->lt->set_comparator(cmp);
389     }
390 }
391 
392 // callback that sets the descriptors when
393 // a dictionary is redirected at the ft layer
394 static void
db_on_redirect_callback(FT_HANDLE ft_handle,void * extra)395 db_on_redirect_callback(FT_HANDLE ft_handle, void* extra) {
396     DB *db = (DB *) extra;
397     db_set_descriptors(db, ft_handle);
398 }
399 
400 // when a locktree is created, clone a ft handle and store it
401 // as userdata so we can close it later.
toku_db_lt_on_create_callback(toku::locktree * lt,void * extra)402 int toku_db_lt_on_create_callback(toku::locktree *lt, void *extra) {
403     int r;
404     struct lt_on_create_callback_extra *info = (struct lt_on_create_callback_extra *) extra;
405     TOKUTXN ttxn = info->txn ? db_txn_struct_i(info->txn)->tokutxn : NULL;
406     FT_HANDLE ft_handle = info->ft_handle;
407 
408     FT_HANDLE cloned_ft_handle;
409     r = toku_ft_handle_clone(&cloned_ft_handle, ft_handle, ttxn, info->open_rw);
410     if (r == 0) {
411         assert(lt->get_userdata() == NULL);
412         lt->set_userdata(cloned_ft_handle);
413     }
414     return r;
415 }
416 
417 // when a locktree is about to be destroyed,
418 // close the ft handle stored as userdata.
toku_db_lt_on_destroy_callback(toku::locktree * lt)419 void toku_db_lt_on_destroy_callback(toku::locktree *lt) {
420     FT_HANDLE ft_handle = (FT_HANDLE) lt->get_userdata();
421     assert(ft_handle);
422     toku_ft_handle_close(ft_handle);
423 }
424 
425 // Instruct db to use the default (built-in) key comparison function
426 // by setting the flag bits in the db and ft structs
toku_db_use_builtin_key_cmp(DB * db)427 int toku_db_use_builtin_key_cmp(DB *db) {
428     HANDLE_PANICKED_DB(db);
429     int r = 0;
430     if (db_opened(db)) {
431         r = toku_ydb_do_error(db->dbenv, EINVAL, "Comparison functions cannot be set after DB open.\n");
432     } else if (db->i->key_compare_was_set) {
433         r = toku_ydb_do_error(db->dbenv, EINVAL, "Key comparison function already set.\n");
434     } else {
435         uint32_t tflags;
436         toku_ft_get_flags(db->i->ft_handle, &tflags);
437 
438         tflags |= TOKU_DB_KEYCMP_BUILTIN;
439         toku_ft_set_flags(db->i->ft_handle, tflags);
440         db->i->key_compare_was_set = true;
441     }
442     return r;
443 }
444 
toku_db_open_iname(DB * db,DB_TXN * txn,const char * iname_in_env,uint32_t flags,int mode)445 int toku_db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, int mode) {
446     //Set comparison functions if not yet set.
447     HANDLE_READ_ONLY_TXN(txn);
448     if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
449         toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
450         db->i->key_compare_was_set = true;
451     }
452     if (db->dbenv->i->update_function) {
453         toku_ft_set_update(db->i->ft_handle,db->dbenv->i->update_function);
454     }
455     toku_ft_set_redirect_callback(
456         db->i->ft_handle,
457         db_on_redirect_callback,
458         db
459         );
460     bool need_locktree = (bool)((db->dbenv->i->open_flags & DB_INIT_LOCK) &&
461                                 (db->dbenv->i->open_flags & DB_INIT_TXN));
462 
463     int is_db_excl    = flags & DB_EXCL;    flags&=~DB_EXCL;
464     int is_db_create  = flags & DB_CREATE;  flags&=~DB_CREATE;
465     //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
466                                             flags&=~DB_READ_UNCOMMITTED;
467                                             flags&=~DB_READ_COMMITTED;
468                                             flags&=~DB_SERIALIZABLE;
469                                             flags&=~DB_IS_HOT_INDEX;
470                                             flags&=~DB_RDONLY;
471     // unknown or conflicting flags are bad
472     int unknown_flags = flags & ~DB_THREAD;
473     unknown_flags &= ~DB_BLACKHOLE;
474     if (unknown_flags || (is_db_excl && !is_db_create)) {
475         return EINVAL;
476     }
477 
478     if (db_opened(db)) {
479         return EINVAL;              /* It was already open. */
480     }
481 
482     db->i->open_flags = flags;
483     db->i->open_mode = mode;
484 
485     bool open_rw = mode & (S_IWUSR | S_IWOTH | S_IWGRP);
486     FT_HANDLE ft_handle = db->i->ft_handle;
487     int r = toku_ft_handle_open(ft_handle, iname_in_env,
488                       is_db_create, is_db_excl,
489                       db->dbenv->i->cachetable,
490                       txn ? db_txn_struct_i(txn)->tokutxn : nullptr, open_rw);
491     if (r != 0) {
492         goto out;
493     }
494 
495     // if the dictionary was opened as a blackhole, mark the
496     // fractal tree as blackhole too.
497     if (flags & DB_BLACKHOLE) {
498         toku_ft_set_blackhole(ft_handle);
499     }
500 
501     db->i->opened = 1;
502 
503     // now that the handle has successfully opened, a valid descriptor
504     // is in the ft. we need to set the db's descriptor pointers
505     db_set_descriptors(db, ft_handle);
506 
507     if (need_locktree) {
508         db->i->dict_id = toku_ft_get_dictionary_id(db->i->ft_handle);
509         struct lt_on_create_callback_extra on_create_extra = {
510             .txn = txn,
511             .ft_handle = db->i->ft_handle,
512             .open_rw = false
513         };
514         db->i->lt = db->dbenv->i->ltm.get_lt(db->i->dict_id,
515                                              toku_ft_get_comparator(db->i->ft_handle),
516                                              &on_create_extra);
517         if (db->i->lt == nullptr) {
518             r = errno;
519             if (r == 0) {
520                 r = EINVAL;
521             }
522             goto out;
523         }
524     }
525     r = 0;
526 
527 out:
528     if (r != 0) {
529         db->i->dict_id = DICTIONARY_ID_NONE;
530         db->i->opened = 0;
531         if (db->i->lt) {
532             db->dbenv->i->ltm.release_lt(db->i->lt);
533             db->i->lt = nullptr;
534         }
535     }
536     return r;
537 }
538 
539 // Return the maximum key and val size in
540 // *key_size and *val_size respectively
541 static void
toku_db_get_max_row_size(DB * UU (db),uint32_t * max_key_size,uint32_t * max_val_size)542 toku_db_get_max_row_size(DB * UU(db), uint32_t * max_key_size, uint32_t * max_val_size) {
543     *max_key_size = 0;
544     *max_val_size = 0;
545     toku_ft_get_maximum_advised_key_value_lengths(max_key_size, max_val_size);
546 }
547 
toku_db_pre_acquire_fileops_lock(DB * db,DB_TXN * txn)548 int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
549     // bad hack because some environment dictionaries do not have a dname
550     char *dname = db->i->dname;
551     if (!dname)
552         return 0;
553 
554     DBT key_in_directory = { .data = dname, .size = (uint32_t) strlen(dname)+1 };
555     //Left end of range == right end of range (point lock)
556     int r = toku_db_get_range_lock(db->dbenv->i->directory, txn,
557             &key_in_directory, &key_in_directory,
558             toku::lock_request::type::WRITE);
559     if (r == 0)
560         STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS)++;  // accountability
561     else
562         STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL)++;  // accountability
563     return r;
564 }
565 
566 //
567 // This function is used both to set an initial descriptor of a DB and to
568 // change a descriptor. (only way to set a descriptor of a DB)
569 //
570 // Requires:
571 //  - The caller must not call put_multiple, del_multiple, or update_multiple concurrently
572 //  - The caller must not have a hot index running concurrently on db
573 //  - If the caller has passed DB_UPDATE_CMP_DESCRIPTOR as a flag, then he is calling this function
574 //     ONLY immediately after creating the dictionary and before doing any actual work on the dictionary.
575 //
576 static int
toku_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)577 toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, uint32_t flags) {
578     HANDLE_PANICKED_DB(db);
579     HANDLE_READ_ONLY_TXN(txn);
580     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
581     int r = 0;
582     TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
583     bool is_db_hot_index  = ((flags & DB_IS_HOT_INDEX) != 0);
584     bool update_cmp_descriptor = ((flags & DB_UPDATE_CMP_DESCRIPTOR) != 0);
585 
586     DBT old_descriptor_dbt;
587     toku_init_dbt(&old_descriptor_dbt);
588 
589     if (!db_opened(db) || !descriptor || (descriptor->size>0 && !descriptor->data)){
590         r = EINVAL;
591         goto cleanup;
592     }
593     // For a hot index, this is an initial descriptor.
594     // We do not support (yet) hcad with hot index concurrently on a single table, which
595     // would require changing a descriptor for a hot index.
596     if (!is_db_hot_index) {
597         r = toku_db_pre_acquire_table_lock(db, txn);
598         if (r != 0) { goto cleanup; }
599     }
600 
601     toku_clone_dbt(&old_descriptor_dbt, db->descriptor->dbt);
602     toku_ft_change_descriptor(db->i->ft_handle, &old_descriptor_dbt, descriptor,
603                               true, ttxn, update_cmp_descriptor);
604 
605 cleanup:
606     toku_destroy_dbt(&old_descriptor_dbt);
607     return r;
608 }
609 
610 static int
toku_db_set_flags(DB * db,uint32_t flags)611 toku_db_set_flags(DB *db, uint32_t flags) {
612     HANDLE_PANICKED_DB(db);
613 
614     /* the following matches BDB */
615     if (db_opened(db) && flags != 0) return EINVAL;
616 
617     return 0;
618 }
619 
620 static int
toku_db_get_flags(DB * db,uint32_t * pflags)621 toku_db_get_flags(DB *db, uint32_t *pflags) {
622     HANDLE_PANICKED_DB(db);
623     if (!pflags) return EINVAL;
624     *pflags = 0;
625     return 0;
626 }
627 
628 static int
toku_db_change_pagesize(DB * db,uint32_t pagesize)629 toku_db_change_pagesize(DB *db, uint32_t pagesize) {
630     HANDLE_PANICKED_DB(db);
631     if (!db_opened(db)) return EINVAL;
632     toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
633     return 0;
634 }
635 
636 static int
toku_db_set_pagesize(DB * db,uint32_t pagesize)637 toku_db_set_pagesize(DB *db, uint32_t pagesize) {
638     HANDLE_PANICKED_DB(db);
639     if (db_opened(db)) return EINVAL;
640     toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
641     return 0;
642 }
643 
644 static int
toku_db_get_pagesize(DB * db,uint32_t * pagesize_ptr)645 toku_db_get_pagesize(DB *db, uint32_t *pagesize_ptr) {
646     HANDLE_PANICKED_DB(db);
647     toku_ft_handle_get_nodesize(db->i->ft_handle, pagesize_ptr);
648     return 0;
649 }
650 
651 static int
toku_db_change_readpagesize(DB * db,uint32_t readpagesize)652 toku_db_change_readpagesize(DB *db, uint32_t readpagesize) {
653     HANDLE_PANICKED_DB(db);
654     if (!db_opened(db)) return EINVAL;
655     toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
656     return 0;
657 }
658 
659 static int
toku_db_set_readpagesize(DB * db,uint32_t readpagesize)660 toku_db_set_readpagesize(DB *db, uint32_t readpagesize) {
661     HANDLE_PANICKED_DB(db);
662     if (db_opened(db)) return EINVAL;
663     toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
664     return 0;
665 }
666 
667 static int
toku_db_get_readpagesize(DB * db,uint32_t * readpagesize_ptr)668 toku_db_get_readpagesize(DB *db, uint32_t *readpagesize_ptr) {
669     HANDLE_PANICKED_DB(db);
670     toku_ft_handle_get_basementnodesize(db->i->ft_handle, readpagesize_ptr);
671     return 0;
672 }
673 
674 static int
toku_db_change_compression_method(DB * db,enum toku_compression_method compression_method)675 toku_db_change_compression_method(DB *db, enum toku_compression_method compression_method) {
676     HANDLE_PANICKED_DB(db);
677     if (!db_opened(db)) return EINVAL;
678     toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
679     return 0;
680 }
681 
682 static int
toku_db_set_compression_method(DB * db,enum toku_compression_method compression_method)683 toku_db_set_compression_method(DB *db, enum toku_compression_method compression_method) {
684     HANDLE_PANICKED_DB(db);
685     if (db_opened(db)) return EINVAL;
686     toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
687     return 0;
688 }
689 
690 static int
toku_db_get_compression_method(DB * db,enum toku_compression_method * compression_method_ptr)691 toku_db_get_compression_method(DB *db, enum toku_compression_method *compression_method_ptr) {
692     HANDLE_PANICKED_DB(db);
693     toku_ft_handle_get_compression_method(db->i->ft_handle, compression_method_ptr);
694     return 0;
695 }
696 
697 static int
toku_db_change_fanout(DB * db,unsigned int fanout)698 toku_db_change_fanout(DB *db, unsigned int fanout) {
699     HANDLE_PANICKED_DB(db);
700     if (!db_opened(db)) return EINVAL;
701     toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
702     return 0;
703 }
704 
705 static int
toku_db_set_fanout(DB * db,unsigned int fanout)706 toku_db_set_fanout(DB *db, unsigned int fanout) {
707     HANDLE_PANICKED_DB(db);
708     if (db_opened(db)) return EINVAL;
709     toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
710     return 0;
711 }
712 
713 static int
toku_db_get_fanout(DB * db,unsigned int * fanout)714 toku_db_get_fanout(DB *db, unsigned int *fanout) {
715     HANDLE_PANICKED_DB(db);
716     toku_ft_handle_get_fanout(db->i->ft_handle, fanout);
717     return 0;
718 }
719 
720 static int
toku_db_set_memcmp_magic(DB * db,uint8_t magic)721 toku_db_set_memcmp_magic(DB *db, uint8_t magic) {
722     HANDLE_PANICKED_DB(db);
723     if (db_opened(db)) {
724         return EINVAL;
725     }
726     return toku_ft_handle_set_memcmp_magic(db->i->ft_handle, magic);
727 }
728 
729 static int
toku_db_get_fractal_tree_info64(DB * db,uint64_t * num_blocks_allocated,uint64_t * num_blocks_in_use,uint64_t * size_allocated,uint64_t * size_in_use)730 toku_db_get_fractal_tree_info64(DB *db, uint64_t *num_blocks_allocated, uint64_t *num_blocks_in_use, uint64_t *size_allocated, uint64_t *size_in_use) {
731     HANDLE_PANICKED_DB(db);
732     struct ftinfo64 ftinfo;
733     toku_ft_handle_get_fractal_tree_info64(db->i->ft_handle, &ftinfo);
734     *num_blocks_allocated = ftinfo.num_blocks_allocated;
735     *num_blocks_in_use = ftinfo.num_blocks_in_use;
736     *size_allocated = ftinfo.size_allocated;
737     *size_in_use = ftinfo.size_in_use;
738     return 0;
739 }
740 
741 static int
toku_db_iterate_fractal_tree_block_map(DB * db,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)742 toku_db_iterate_fractal_tree_block_map(DB *db, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
743     HANDLE_PANICKED_DB(db);
744     return toku_ft_handle_iterate_fractal_tree_block_map(db->i->ft_handle, iter, iter_extra);
745 }
746 
747 static int
toku_db_stat64(DB * db,DB_TXN * txn,DB_BTREE_STAT64 * s)748 toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) {
749     HANDLE_PANICKED_DB(db);
750     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
751     struct ftstat64_s ftstat;
752     TOKUTXN tokutxn = NULL;
753     if (txn != NULL) {
754         tokutxn = db_txn_struct_i(txn)->tokutxn;
755     }
756     toku_ft_handle_stat64(db->i->ft_handle, tokutxn, &ftstat);
757     s->bt_nkeys = ftstat.nkeys;
758     s->bt_ndata = ftstat.ndata;
759     s->bt_dsize = ftstat.dsize;
760     s->bt_fsize = ftstat.fsize;
761     s->bt_create_time_sec = ftstat.create_time_sec;
762     s->bt_modify_time_sec = ftstat.modify_time_sec;
763     s->bt_verify_time_sec = ftstat.verify_time_sec;
764     return 0;
765 }
766 
767 static const char *
toku_db_get_dname(DB * db)768 toku_db_get_dname(DB *db) {
769     if (!db_opened(db)) {
770         return nullptr;
771     }
772     if (db->i->dname == nullptr) {
773         return "";
774     }
775     return db->i->dname;
776 }
777 
778 static int
toku_db_keys_range64(DB * db,DB_TXN * txn,DBT * keyleft,DBT * keyright,uint64_t * less,uint64_t * left,uint64_t * between,uint64_t * right,uint64_t * greater,bool * middle_3_exact)779 toku_db_keys_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* keyleft, DBT* keyright, uint64_t* less, uint64_t* left, uint64_t* between, uint64_t *right, uint64_t *greater, bool* middle_3_exact) {
780     HANDLE_PANICKED_DB(db);
781     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
782 
783     // note that we ignore the txn param.  It would be more complicated to support it.
784     // TODO(yoni): Maybe add support for txns later?  How would we do this?  ydb lock comment about db_keyrange64 is obsolete.
785     toku_ft_keysrange(db->i->ft_handle, keyleft, keyright, less, left, between, right, greater, middle_3_exact);
786     return 0;
787 }
788 
789 static int
toku_db_key_range64(DB * db,DB_TXN * txn,DBT * key,uint64_t * less_p,uint64_t * equal_p,uint64_t * greater_p,int * is_exact)790 toku_db_key_range64(DB* db, DB_TXN* txn, DBT* key, uint64_t* less_p, uint64_t* equal_p, uint64_t* greater_p, int* is_exact) {
791     uint64_t less, equal_left, middle, equal_right, greater;
792     bool ignore;
793     int r = toku_db_keys_range64(db, txn, key, NULL, &less, &equal_left, &middle, &equal_right, &greater, &ignore);
794     if (r == 0) {
795         *less_p = less;
796         *equal_p = equal_left;
797         *greater_p = middle;
798         paranoid_invariant_zero(greater);  // no keys are greater than positive infinity
799         paranoid_invariant_zero(equal_right);  // no keys are equal to positive infinity
800         // toku_ft_keysrange does not know when all 3 are exact, so set is_exact to false
801         *is_exact = false;
802     }
803     return 0;
804 }
805 
toku_db_get_key_after_bytes(DB * db,DB_TXN * txn,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT * end_key,uint64_t actually_skipped,void * extra),void * cb_extra,uint32_t UU (flags))806 static int toku_db_get_key_after_bytes(DB *db, DB_TXN *txn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *extra), void *cb_extra, uint32_t UU(flags)) {
807     HANDLE_PANICKED_DB(db);
808     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
809     return toku_ft_get_key_after_bytes(db->i->ft_handle, start_key, skip_len, callback, cb_extra);
810 }
811 
812 // needed by loader.c
813 int
toku_db_pre_acquire_table_lock(DB * db,DB_TXN * txn)814 toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
815     HANDLE_PANICKED_DB(db);
816     if (!db->i->lt || !txn) return 0;
817     int r;
818     r = toku_db_get_range_lock(db, txn,
819             toku_dbt_negative_infinity(), toku_dbt_positive_infinity(),
820             toku::lock_request::type::WRITE);
821     return r;
822 }
823 
824 static int
locked_db_close(DB * db,uint32_t UU (flags))825 locked_db_close(DB * db, uint32_t UU(flags)) {
826     // cannot begin a checkpoint
827     toku_multi_operation_client_lock();
828     int r = toku_db_close(db);
829     toku_multi_operation_client_unlock();
830     return r;
831 }
832 
833 int
autotxn_db_get(DB * db,DB_TXN * txn,DBT * key,DBT * data,uint32_t flags)834 autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data, uint32_t flags) {
835     bool changed; int r;
836     r = toku_db_construct_autotxn(db, &txn, &changed, false);
837     if (r!=0) return r;
838     r = toku_db_get(db, txn, key, data, flags);
839     return toku_db_destruct_autotxn(txn, r, changed);
840 }
841 
842 static inline int
autotxn_db_getf_set(DB * db,DB_TXN * txn,uint32_t flags,DBT * key,YDB_CALLBACK_FUNCTION f,void * extra)843 autotxn_db_getf_set (DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
844     bool changed; int r;
845     r = toku_db_construct_autotxn(db, &txn, &changed, false);
846     if (r!=0) return r;
847     r = db_getf_set(db, txn, flags, key, f, extra);
848     return toku_db_destruct_autotxn(txn, r, changed);
849 }
850 
851 static int
locked_db_open(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)852 locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
853     int ret, r;
854     HANDLE_READ_ONLY_TXN(txn);
855     HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
856 
857     //
858     // Note that this function opens a db with a transaction. Should
859     // the transaction abort, the user is responsible for closing the DB
860     // before aborting the transaction. Not doing so results in undefined
861     // behavior.
862     //
863     DB_ENV *env = db->dbenv;
864     DB_TXN *child_txn = NULL;
865     int using_txns = env->i->open_flags & DB_INIT_TXN;
866     if (using_txns) {
867         ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
868         invariant_zero(ret);
869     }
870 
871     // cannot begin a checkpoint
872     toku_multi_operation_client_lock();
873     r = toku_db_open(db, child_txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
874     toku_multi_operation_client_unlock();
875 
876     if (using_txns) {
877         if (r == 0) {
878             ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
879             invariant_zero(ret);
880         } else {
881             ret = locked_txn_abort(child_txn);
882             invariant_zero(ret);
883         }
884     }
885     return r;
886 }
887 
888 static int
locked_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)889 locked_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
890     // cannot begin a checkpoint
891     toku_multi_operation_client_lock();
892     int r = toku_db_change_descriptor(db, txn, descriptor, flags);
893     toku_multi_operation_client_unlock();
894     return r;
895 }
896 
897 static int
autotxn_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)898 autotxn_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
899     bool changed; int r;
900     r = toku_db_construct_autotxn(db, &txn, &changed, false);
901     if (r != 0) { return r; }
902     r = locked_db_change_descriptor(db, txn, descriptor, flags);
903     return toku_db_destruct_autotxn(txn, r, changed);
904 }
905 
906 static void
toku_db_set_errfile(DB * db,FILE * errfile)907 toku_db_set_errfile (DB *db, FILE *errfile) {
908     db->dbenv->set_errfile(db->dbenv, errfile);
909 }
910 
911 // TODO 2216 delete this
912 static int
toku_db_fd(DB * UU (db),int * UU (fdp))913 toku_db_fd(DB * UU(db), int * UU(fdp)) {
914     return 0;
915 }
916 
917 static const DBT* toku_db_dbt_pos_infty(void) __attribute__((pure));
918 static const DBT*
toku_db_dbt_pos_infty(void)919 toku_db_dbt_pos_infty(void) {
920     return toku_dbt_positive_infinity();
921 }
922 
923 static const DBT* toku_db_dbt_neg_infty(void) __attribute__((pure));
924 static const DBT*
toku_db_dbt_neg_infty(void)925 toku_db_dbt_neg_infty(void) {
926     return toku_dbt_negative_infinity();
927 }
928 
929 static int
toku_db_optimize(DB * db)930 toku_db_optimize(DB *db) {
931     HANDLE_PANICKED_DB(db);
932     toku_ft_optimize(db->i->ft_handle);
933     return 0;
934 }
935 
936 static int
toku_db_hot_optimize(DB * db,DBT * left,DBT * right,int (* progress_callback)(void * extra,float progress),void * progress_extra,uint64_t * loops_run)937 toku_db_hot_optimize(DB *db, DBT* left, DBT* right,
938                      int (*progress_callback)(void *extra, float progress),
939                      void *progress_extra, uint64_t* loops_run)
940 {
941     HANDLE_PANICKED_DB(db);
942     int r = 0;
943     r = toku_ft_hot_optimize(db->i->ft_handle, left, right,
944                               progress_callback,
945                               progress_extra, loops_run);
946 
947     return r;
948 }
949 
950 static int
locked_db_optimize(DB * db)951 locked_db_optimize(DB *db) {
952     // need to protect from checkpointing because
953     // toku_db_optimize does a message injection
954     toku_multi_operation_client_lock(); //Cannot begin checkpoint
955     int r = toku_db_optimize(db);
956     toku_multi_operation_client_unlock();
957     return r;
958 }
959 
960 
961 struct last_key_extra {
962     YDB_CALLBACK_FUNCTION func;
963     void* extra;
964 };
965 
966 static int
db_get_last_key_callback(uint32_t keylen,const void * key,uint32_t vallen UU (),const void * val UU (),void * extra,bool lock_only)967 db_get_last_key_callback(uint32_t keylen, const void *key, uint32_t vallen UU(), const void *val UU(), void *extra, bool lock_only) {
968     if (!lock_only) {
969         DBT keydbt;
970         toku_fill_dbt(&keydbt, key, keylen);
971         struct last_key_extra * CAST_FROM_VOIDP(info, extra);
972         info->func(&keydbt, NULL, info->extra);
973     }
974     return 0;
975 }
976 
977 static int
toku_db_get_last_key(DB * db,DB_TXN * txn,YDB_CALLBACK_FUNCTION func,void * extra)978 toku_db_get_last_key(DB * db, DB_TXN *txn, YDB_CALLBACK_FUNCTION func, void* extra) {
979     int r;
980     LE_CURSOR cursor = nullptr;
981     struct last_key_extra last_extra = { .func = func, .extra = extra };
982 
983     r = toku_le_cursor_create(&cursor, db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
984     if (r != 0) { goto cleanup; }
985 
986     // Goes in reverse order.  First key returned is last in dictionary.
987     r = toku_le_cursor_next(cursor, db_get_last_key_callback, &last_extra);
988     if (r != 0) { goto cleanup; }
989 
990 cleanup:
991     if (cursor) {
992         toku_le_cursor_close(cursor);
993     }
994     return r;
995 }
996 
997 static int
autotxn_db_get_last_key(DB * db,YDB_CALLBACK_FUNCTION func,void * extra)998 autotxn_db_get_last_key(DB* db, YDB_CALLBACK_FUNCTION func, void* extra) {
999     bool changed; int r;
1000     DB_TXN *txn = nullptr;
1001     // Cursors inside require transactions, but this is _not_ a transactional function.
1002     // Create transaction in a wrapper and then later close it.
1003     r = toku_db_construct_autotxn(db, &txn, &changed, false);
1004     if (r!=0) return r;
1005     r = toku_db_get_last_key(db, txn, func, extra);
1006     return toku_db_destruct_autotxn(txn, r, changed);
1007 }
1008 
1009 static int
toku_db_get_fragmentation(DB * db,TOKU_DB_FRAGMENTATION report)1010 toku_db_get_fragmentation(DB * db, TOKU_DB_FRAGMENTATION report) {
1011     HANDLE_PANICKED_DB(db);
1012     int r;
1013     if (!db_opened(db))
1014         r = toku_ydb_do_error(db->dbenv, EINVAL, "Fragmentation report available only on open DBs.\n");
1015     else
1016         r = toku_ft_get_fragmentation(db->i->ft_handle, report);
1017     return r;
1018 }
1019 
1020 int
toku_db_set_indexer(DB * db,DB_INDEXER * indexer)1021 toku_db_set_indexer(DB *db, DB_INDEXER * indexer) {
1022     int r = 0;
1023     if ( db->i->indexer != NULL && indexer != NULL ) {
1024         // you are trying to overwrite a valid indexer
1025         r = EINVAL;
1026     }
1027     else {
1028         db->i->indexer = indexer;
1029     }
1030     return r;
1031 }
1032 
1033 DB_INDEXER *
toku_db_get_indexer(DB * db)1034 toku_db_get_indexer(DB *db) {
1035     return db->i->indexer;
1036 }
1037 
1038 static void
db_get_indexer(DB * db,DB_INDEXER ** indexer_ptr)1039 db_get_indexer(DB *db, DB_INDEXER **indexer_ptr) {
1040     *indexer_ptr = toku_db_get_indexer(db);
1041 }
1042 
1043 struct ydb_verify_context {
1044     int (*progress_callback)(void *extra, float progress);
1045     void *progress_extra;
1046 };
1047 
1048 static int
ydb_verify_progress_callback(void * extra,float progress)1049 ydb_verify_progress_callback(void *extra, float progress) {
1050     struct ydb_verify_context *context = (struct ydb_verify_context *) extra;
1051     int r = 0;
1052     if (context->progress_callback) {
1053         r = context->progress_callback(context->progress_extra, progress);
1054     }
1055     return r;
1056 }
1057 
1058 static int
toku_db_verify_with_progress(DB * db,int (* progress_callback)(void * extra,float progress),void * progress_extra,int verbose,int keep_going)1059 toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_going) {
1060     struct ydb_verify_context context = { progress_callback, progress_extra };
1061     int r = toku_verify_ft_with_progress(db->i->ft_handle, ydb_verify_progress_callback, &context, verbose, keep_going);
1062     return r;
1063 }
1064 
1065 
1066 static int
toku_db_recount_rows(DB * db,int (* progress_callback)(uint64_t count,uint64_t deleted,void * progress_extra),void * progress_extra)1067 toku_db_recount_rows(DB* db, int (*progress_callback)(uint64_t count,
1068                                                       uint64_t deleted,
1069                                                       void* progress_extra),
1070                      void* progress_extra) {
1071 
1072     HANDLE_PANICKED_DB(db);
1073     int r = 0;
1074     r =
1075         toku_ft_recount_rows(
1076             db->i->ft_handle,
1077             progress_callback,
1078             progress_extra);
1079 
1080     return r;
1081 }
1082 
1083 
toku_setup_db_internal(DB ** dbp,DB_ENV * env,uint32_t flags,FT_HANDLE ft_handle,bool is_open)1084 int toku_setup_db_internal (DB **dbp, DB_ENV *env, uint32_t flags, FT_HANDLE ft_handle, bool is_open) {
1085     if (flags || env == NULL)
1086         return EINVAL;
1087 
1088     if (!env_opened(env))
1089         return EINVAL;
1090 
1091     DB *MALLOC(result);
1092     if (result == 0) {
1093         return ENOMEM;
1094     }
1095     memset(result, 0, sizeof *result);
1096     result->dbenv = env;
1097     MALLOC(result->i);
1098     if (result->i == 0) {
1099         toku_free(result);
1100         return ENOMEM;
1101     }
1102     memset(result->i, 0, sizeof *result->i);
1103     result->i->ft_handle = ft_handle;
1104     result->i->opened = is_open;
1105     *dbp = result;
1106     return 0;
1107 }
1108 
1109 int
toku_db_create(DB ** db,DB_ENV * env,uint32_t flags)1110 toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) {
1111     if (flags || env == NULL)
1112         return EINVAL;
1113 
1114     if (!env_opened(env))
1115         return EINVAL;
1116 
1117 
1118     FT_HANDLE ft_handle;
1119     toku_ft_handle_create(&ft_handle);
1120 
1121     int r = toku_setup_db_internal(db, env, flags, ft_handle, false);
1122     if (r != 0) return r;
1123 
1124     DB *result=*db;
1125     // methods that grab the ydb lock
1126 #define SDB(name) result->name = locked_db_ ## name
1127     SDB(close);
1128     SDB(open);
1129     SDB(optimize);
1130 #undef SDB
1131     // methods that do not take the ydb lock
1132 #define USDB(name) result->name = toku_db_ ## name
1133     USDB(set_errfile);
1134     USDB(set_pagesize);
1135     USDB(get_pagesize);
1136     USDB(change_pagesize);
1137     USDB(set_readpagesize);
1138     USDB(get_readpagesize);
1139     USDB(change_readpagesize);
1140     USDB(set_compression_method);
1141     USDB(get_compression_method);
1142     USDB(change_compression_method);
1143     USDB(set_fanout);
1144     USDB(get_fanout);
1145     USDB(set_memcmp_magic);
1146     USDB(change_fanout);
1147     USDB(set_flags);
1148     USDB(get_flags);
1149     USDB(fd);
1150     USDB(get_max_row_size);
1151     USDB(set_indexer);
1152     USDB(pre_acquire_table_lock);
1153     USDB(pre_acquire_fileops_lock);
1154     USDB(key_range64);
1155     USDB(keys_range64);
1156     USDB(get_key_after_bytes);
1157     USDB(hot_optimize);
1158     USDB(stat64);
1159     USDB(get_fractal_tree_info64);
1160     USDB(iterate_fractal_tree_block_map);
1161     USDB(get_dname);
1162     USDB(verify_with_progress);
1163     USDB(cursor);
1164     USDB(dbt_pos_infty);
1165     USDB(dbt_neg_infty);
1166     USDB(get_fragmentation);
1167     USDB(recount_rows);
1168 #undef USDB
1169     result->get_indexer = db_get_indexer;
1170     result->del = autotxn_db_del;
1171     result->put = autotxn_db_put;
1172     result->update = autotxn_db_update;
1173     result->update_broadcast = autotxn_db_update_broadcast;
1174     result->change_descriptor = autotxn_db_change_descriptor;
1175     result->get_last_key = autotxn_db_get_last_key;
1176 
1177     // unlocked methods
1178     result->get = autotxn_db_get;
1179     result->getf_set = autotxn_db_getf_set;
1180 
1181     result->i->dict_id = DICTIONARY_ID_NONE;
1182     result->i->opened = 0;
1183     result->i->open_flags = 0;
1184     result->i->open_mode = 0;
1185     result->i->indexer = NULL;
1186     *db = result;
1187     return 0;
1188 }
1189 
1190 // When the loader is created, it makes this call (toku_env_load_inames).
1191 // For each dictionary to be loaded, replace old iname in directory
1192 // with a newly generated iname.  This will also take a write lock
1193 // on the directory entries.  The write lock will be released when
1194 // the transaction of the loader is completed.
1195 // If the transaction commits, the new inames are in place.
1196 // If the transaction aborts, the old inames will be restored.
1197 // The new inames are returned to the caller.
1198 // It is the caller's responsibility to free them.
1199 // If "mark_as_loader" is true, then include a mark in the iname
1200 // to indicate that the file is created by the ft loader.
1201 // Return 0 on success (could fail if write lock not available).
1202 static int
load_inames(DB_ENV * env,DB_TXN * txn,int N,DB * dbs[],const char * new_inames_in_env[],LSN * load_lsn,bool mark_as_loader)1203 load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], const char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1204     int rval = 0;
1205     int i;
1206 
1207     TXNID_PAIR xid = TXNID_PAIR_NONE;
1208     DBT dname_dbt;  // holds dname
1209     DBT iname_dbt;  // holds new iname
1210 
1211     const char *mark;
1212 
1213     if (mark_as_loader) {
1214         mark = "B";
1215     } else {
1216         mark = "P";
1217     }
1218 
1219     for (i=0; i<N; i++) {
1220         new_inames_in_env[i] = NULL;
1221     }
1222 
1223     if (txn) {
1224         xid = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
1225     }
1226     for (i = 0; i < N; i++) {
1227         char * dname = dbs[i]->i->dname;
1228         toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
1229 
1230         // now create new iname
1231         char hint[strlen(dname) + 1];
1232         create_iname_hint(env, dname, hint);
1233 
1234         // allocates memory for iname_in_env
1235         const char *new_iname =
1236             create_iname(env, xid.parent_id64, xid.child_id64, hint, mark, i);
1237         new_inames_in_env[i] = new_iname;
1238 
1239         // iname_in_env goes in directory
1240         toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1);
1241         rval = toku_db_put(env->i->directory, txn, &dname_dbt, &iname_dbt, 0, true);
1242         if (rval) break;
1243     }
1244 
1245     // Generate load log entries.
1246     if (!rval && txn) {
1247         TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
1248         int do_fsync = 0;
1249         LSN *get_lsn = NULL;
1250         for (i = 0; i < N; i++) {
1251             FT_HANDLE ft_handle  = dbs[i]->i->ft_handle;
1252             //Fsync is necessary for the last one only.
1253             if (i==N-1) {
1254                 do_fsync = 1; //We only need a single fsync of logs.
1255                 get_lsn  = load_lsn; //Set pointer to capture the last lsn.
1256             }
1257             toku_ft_load(ft_handle, ttxn, new_inames_in_env[i], do_fsync, get_lsn);
1258         }
1259     }
1260     return rval;
1261 }
1262 
1263 int
locked_load_inames(DB_ENV * env,DB_TXN * txn,int N,DB * dbs[],char * new_inames_in_env[],LSN * load_lsn,bool mark_as_loader)1264 locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1265     int r;
1266     HANDLE_READ_ONLY_TXN(txn);
1267 
1268     // cannot begin a checkpoint
1269     toku_multi_operation_client_lock();
1270     r = load_inames(env, txn, N, dbs, (const char **) new_inames_in_env, load_lsn, mark_as_loader);
1271     toku_multi_operation_client_unlock();
1272 
1273     return r;
1274 
1275 }
1276 
1277 #undef STATUS_VALUE
1278 
1279 #include <toku_race_tools.h>
1280 void __attribute__((constructor)) toku_ydb_db_helgrind_ignore(void);
1281 void
toku_ydb_db_helgrind_ignore(void)1282 toku_ydb_db_helgrind_ignore(void) {
1283     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_db_layer_status, sizeof ydb_db_layer_status);
1284 }
1285