1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 #include <my_global.h>
40 #include <ctype.h>
41
42 #include <db.h>
43 #include <locktree/locktree.h>
44 #include <ft/ft.h>
45 #include <ft/ft-flusher.h>
46 #include <ft/cachetable/checkpoint.h>
47
48 #include "ydb_cursor.h"
49 #include "ydb_row_lock.h"
50 #include "ydb_db.h"
51 #include "ydb_write.h"
52 #include "ydb-internal.h"
53 #include "ydb_load.h"
54 #include "indexer.h"
55 #include <portability/toku_atomic.h>
56 #include <util/status.h>
57 #include <ft/le-cursor.h>
58
59 static YDB_DB_LAYER_STATUS_S ydb_db_layer_status;
60 #ifdef STATUS_VALUE
61 #undef STATUS_VALUE
62 #endif
63 #define STATUS_VALUE(x) ydb_db_layer_status.status[x].value.num
64
65 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_db_layer_status, k, c, t, l, inc)
66
67 static void
ydb_db_layer_status_init(void)68 ydb_db_layer_status_init (void) {
69 // Note, this function initializes the keyname, type, and legend fields.
70 // Value fields are initialized to zero by compiler.
71
72 STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS, nullptr, UINT64, "directory write locks", TOKU_ENGINE_STATUS);
73 STATUS_INIT(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL, nullptr, UINT64, "directory write locks fail", TOKU_ENGINE_STATUS);
74 STATUS_INIT(YDB_LAYER_LOGSUPPRESS, nullptr, UINT64, "log suppress", TOKU_ENGINE_STATUS);
75 STATUS_INIT(YDB_LAYER_LOGSUPPRESS_FAIL, nullptr, UINT64, "log suppress fail", TOKU_ENGINE_STATUS);
76 ydb_db_layer_status.initialized = true;
77 }
78 #undef STATUS_INIT
79
80 void
ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp)81 ydb_db_layer_get_status(YDB_DB_LAYER_STATUS statp) {
82 if (!ydb_db_layer_status.initialized)
83 ydb_db_layer_status_init();
84 *statp = ydb_db_layer_status;
85 }
86
create_iname_hint(DB_ENV * env,const char * dname,char * hint)87 void create_iname_hint(DB_ENV *env, const char *dname, char *hint) {
88 //Requires: size of hint array must be > strlen(dname)
89 //Copy alphanumeric characters only.
90 //Replace strings of non-alphanumeric characters with a single underscore.
91 if (env->get_dir_per_db(env) && !toku_os_is_absolute_name(dname)) {
92 assert(dname);
93 if (*dname == '.')
94 ++dname;
95 if (*dname == '/')
96 ++dname;
97 bool underscored = false;
98 bool dbdir_is_parsed = false;
99 // Do not change the first '/' because this is
100 // delimiter which splits name into database dir
101 // and table dir.
102 while (*dname) {
103 if (isalnum(*dname) || (*dname == '/' && !dbdir_is_parsed)) {
104 char c = *dname++;
105 *hint++ = c;
106 if (c == '/')
107 dbdir_is_parsed = true;
108 underscored = false;
109 } else if (!dbdir_is_parsed) {
110 char c = *dname++;
111 *hint++ = c;
112 } else {
113 if (!underscored)
114 *hint++ = '_';
115 dname++;
116 underscored = true;
117 }
118 }
119 *hint = '\0';
120 } else {
121 bool underscored = false;
122 while (*dname) {
123 if (isalnum(*dname)) {
124 char c = *dname++;
125 *hint++ = c;
126 underscored = false;
127 }
128 else {
129 if (!underscored)
130 *hint++ = '_';
131 dname++;
132 underscored = true;
133 }
134 }
135 *hint = '\0';
136 }
137 }
138
139 // n < 0 means to ignore mark and ignore n
140 // n >= 0 means to include mark ("_B_" or "_P_") with hex value of n in iname
141 // (intended for use by loader, which will create many inames using one txnid).
create_iname(DB_ENV * env,uint64_t id1,uint64_t id2,char * hint,const char * mark,int n)142 char *create_iname(DB_ENV *env,
143 uint64_t id1,
144 uint64_t id2,
145 char *hint,
146 const char *mark,
147 int n) {
148 int bytes;
149 char inamebase[strlen(hint) +
150 8 + // hex file format version
151 24 + // hex id (normally the txnid's parent and child)
152 8 + // hex value of n if non-neg
153 sizeof("_B___.") + // extra pieces
154 strlen(toku_product_name)];
155 if (n < 0)
156 bytes = snprintf(inamebase, sizeof(inamebase),
157 "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32 ".%s",
158 hint, id1, id2, FT_LAYOUT_VERSION, toku_product_name);
159 else {
160 invariant(strlen(mark) == 1);
161 bytes = snprintf(inamebase, sizeof(inamebase),
162 "%s_%" PRIx64 "_%" PRIx64 "_%" PRIx32 "_%s_%" PRIx32 ".%s",
163 hint, id1, id2, FT_LAYOUT_VERSION, mark, n, toku_product_name);
164 }
165 assert(bytes>0);
166 assert(bytes<=(int)sizeof(inamebase)-1);
167 char *rval;
168 if (env->i->data_dir)
169 rval = toku_construct_full_name(2, env->i->data_dir, inamebase);
170 else
171 rval = toku_construct_full_name(1, inamebase);
172 assert(rval);
173 return rval;
174 }
175
176 static uint64_t nontransactional_open_id = 0;
177
generate_iname_for_rename_or_open(DB_ENV * env,DB_TXN * txn,const char * dname,bool is_open)178 std::unique_ptr<char[], decltype(&toku_free)> generate_iname_for_rename_or_open(
179 DB_ENV *env,
180 DB_TXN *txn,
181 const char *dname,
182 bool is_open) {
183 std::unique_ptr<char[], decltype(&toku_free)> result(nullptr, &toku_free);
184 char hint[strlen(dname) + 1];
185 uint64_t id1 = 0;
186 uint64_t id2 = 0;
187
188 if (txn) {
189 id1 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).parent_id64;
190 id2 = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn).child_id64;
191 } else if (is_open)
192 id1 = toku_sync_fetch_and_add(&nontransactional_open_id, 1);
193
194 create_iname_hint(env, dname, hint);
195
196 result.reset(create_iname(env, id1, id2, hint, NULL, -1));
197
198 return result;
199 }
200
201 static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode);
202
203 // Effect: Do the work required of DB->close().
204 // requires: the multi_operation client lock is held.
205 int
toku_db_close(DB * db)206 toku_db_close(DB * db) {
207 int r = 0;
208 if (db_opened(db) && db->i->dname) {
209 // internal (non-user) dictionary has no dname
210 env_note_db_closed(db->dbenv, db); // tell env that this db is no longer in use by the user of this api (user-closed, may still be in use by fractal tree internals)
211 }
212 // close the ft handle, and possibly close the locktree
213 toku_ft_handle_close(db->i->ft_handle);
214 if (db->i->lt) {
215 db->dbenv->i->ltm.release_lt(db->i->lt);
216 }
217 toku_sdbt_cleanup(&db->i->skey);
218 toku_sdbt_cleanup(&db->i->sval);
219 if (db->i->dname) {
220 toku_free(db->i->dname);
221 }
222 toku_free(db->i);
223 toku_free(db);
224 return r;
225 }
226
227 ///////////
228 //db_getf_XXX is equivalent to c_getf_XXX, without a persistent cursor
229
230 int
db_getf_set(DB * db,DB_TXN * txn,uint32_t flags,DBT * key,YDB_CALLBACK_FUNCTION f,void * extra)231 db_getf_set(DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
232 HANDLE_PANICKED_DB(db);
233 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
234 DBC c;
235 uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
236 flags &= ~DB_ISOLATION_FLAGS;
237 int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
238 if (r==0) {
239 r = toku_c_getf_set(&c, flags, key, f, extra);
240 int r2 = toku_c_close_internal(&c);
241 if (r==0) r = r2;
242 }
243 return r;
244 }
245
246 static inline int
db_thread_need_flags(DBT * dbt)247 db_thread_need_flags(DBT *dbt) {
248 return (dbt->flags & (DB_DBT_MALLOC+DB_DBT_REALLOC+DB_DBT_USERMEM)) == 0;
249 }
250
251 int
toku_db_get(DB * db,DB_TXN * txn,DBT * key,DBT * data,uint32_t flags)252 toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, uint32_t flags) {
253 HANDLE_PANICKED_DB(db);
254 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
255 int r;
256 uint32_t iso_flags = flags & DB_ISOLATION_FLAGS;
257
258 if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
259 return EINVAL;
260
261 uint32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
262 flags &= ~lock_flags;
263 flags &= ~DB_ISOLATION_FLAGS;
264 // And DB_GET_BOTH is no longer supported. #2862.
265 if (flags != 0) return EINVAL;
266
267 DBC dbc;
268 r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
269 if (r!=0) return r;
270 uint32_t c_get_flags = DB_SET;
271 r = toku_c_get(&dbc, key, data, c_get_flags | lock_flags);
272 int r2 = toku_c_close_internal(&dbc);
273 return r ? r : r2;
274 }
275
276 static int
db_open_subdb(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)277 db_open_subdb(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
278 int r;
279 if (!fname || !dbname) r = EINVAL;
280 else {
281 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
282 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
283 assert(bytes==(int)sizeof(subdb_full_name)-1);
284 const char *null_subdbname = NULL;
285 r = toku_db_open(db, txn, subdb_full_name, null_subdbname, dbtype, flags, mode);
286 }
287 return r;
288 }
289
290 // inames are created here.
291 // algorithm:
292 // begin txn
293 // convert dname to iname (possibly creating new iname)
294 // open file (toku_ft_handle_open() will handle logging)
295 // close txn
296 // if created a new iname, take full range lock
297 // Requires: no checkpoint may take place during this function, which is enforced by holding the multi_operation_client_lock.
298 static int
toku_db_open(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)299 toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
300 HANDLE_PANICKED_DB(db);
301 HANDLE_READ_ONLY_TXN(txn);
302 if (dbname != NULL) {
303 return db_open_subdb(db, txn, fname, dbname, dbtype, flags, mode);
304 }
305
306 // at this point fname is the dname
307 //This code ONLY supports single-db files.
308 assert(dbname == NULL);
309 const char * dname = fname; // db_open_subdb() converts (fname, dbname) to dname
310
311 ////////////////////////////// do some level of parameter checking.
312 uint32_t unused_flags = flags;
313 int r;
314 if (dbtype!=DB_BTREE && dbtype!=DB_UNKNOWN) return EINVAL;
315 int is_db_excl = flags & DB_EXCL; unused_flags&=~DB_EXCL;
316 int is_db_create = flags & DB_CREATE; unused_flags&=~DB_CREATE;
317 int is_db_hot_index = flags & DB_IS_HOT_INDEX; unused_flags&=~DB_IS_HOT_INDEX;
318
319 //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
320 unused_flags&=~DB_READ_UNCOMMITTED;
321 unused_flags&=~DB_READ_COMMITTED;
322 unused_flags&=~DB_SERIALIZABLE;
323
324 // DB_THREAD is implicitly supported and DB_BLACKHOLE is supported at the ft-layer
325 unused_flags &= ~DB_THREAD;
326 unused_flags &= ~DB_BLACKHOLE;
327 unused_flags &= ~DB_RDONLY;
328
329 // check for unknown or conflicting flags
330 if (unused_flags) return EINVAL; // unknown flags
331 if (is_db_excl && !is_db_create) return EINVAL;
332 if (dbtype==DB_UNKNOWN && is_db_excl) return EINVAL;
333
334 if (db_opened(db)) {
335 // it was already open
336 return EINVAL;
337 }
338 //////////////////////////////
339
340 // convert dname to iname
341 // - look up dname, get iname
342 // - if dname does not exist, create iname and make entry in directory
343 DBT dname_dbt; // holds dname
344 DBT iname_dbt; // holds iname_in_env
345 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
346 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
347 r = toku_db_get(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
348 std::unique_ptr<char[], decltype(&toku_free)> iname(
349 static_cast<char *>(iname_dbt.data), &toku_free);
350 if (r == DB_NOTFOUND && !is_db_create) {
351 r = ENOENT;
352 } else if (r==0 && is_db_excl) {
353 r = EEXIST;
354 } else if (r == DB_NOTFOUND) {
355 iname = generate_iname_for_rename_or_open(db->dbenv, txn, dname, true);
356 toku_fill_dbt(&iname_dbt, iname.get(), strlen(iname.get()) + 1);
357 //
358 // put_flags will be 0 for performance only, avoid unnecessary query
359 // if we are creating a hot index, per #3166, we do not want the write lock in directory grabbed.
360 // directory read lock is grabbed in toku_db_get above
361 //
362 uint32_t put_flags = 0 | ((is_db_hot_index) ? DB_PRELOCKED_WRITE : 0);
363 r = toku_db_put(db->dbenv->i->directory, txn, &dname_dbt, &iname_dbt, put_flags, true);
364 }
365
366 // we now have an iname
367 if (r == 0) {
368 r = toku_db_open_iname(db, txn, iname.get(), flags, mode);
369 if (r == 0) {
370 db->i->dname = toku_xstrdup(dname);
371 env_note_db_opened(db->dbenv, db); // tell env that a new db handle is open (using dname)
372 }
373 }
374
375 return r;
376 }
377
378 // set the descriptor and cmp_descriptor to the
379 // descriptors from the given ft, updating the
380 // locktree's descriptor pointer if necessary
381 static void
db_set_descriptors(DB * db,FT_HANDLE ft_handle)382 db_set_descriptors(DB *db, FT_HANDLE ft_handle) {
383 const toku::comparator &cmp = toku_ft_get_comparator(ft_handle);
384 db->descriptor = toku_ft_get_descriptor(ft_handle);
385 db->cmp_descriptor = toku_ft_get_cmp_descriptor(ft_handle);
386 invariant(db->cmp_descriptor == cmp.get_descriptor());
387 if (db->i->lt) {
388 db->i->lt->set_comparator(cmp);
389 }
390 }
391
392 // callback that sets the descriptors when
393 // a dictionary is redirected at the ft layer
394 static void
db_on_redirect_callback(FT_HANDLE ft_handle,void * extra)395 db_on_redirect_callback(FT_HANDLE ft_handle, void* extra) {
396 DB *db = (DB *) extra;
397 db_set_descriptors(db, ft_handle);
398 }
399
400 // when a locktree is created, clone a ft handle and store it
401 // as userdata so we can close it later.
toku_db_lt_on_create_callback(toku::locktree * lt,void * extra)402 int toku_db_lt_on_create_callback(toku::locktree *lt, void *extra) {
403 int r;
404 struct lt_on_create_callback_extra *info = (struct lt_on_create_callback_extra *) extra;
405 TOKUTXN ttxn = info->txn ? db_txn_struct_i(info->txn)->tokutxn : NULL;
406 FT_HANDLE ft_handle = info->ft_handle;
407
408 FT_HANDLE cloned_ft_handle;
409 r = toku_ft_handle_clone(&cloned_ft_handle, ft_handle, ttxn, info->open_rw);
410 if (r == 0) {
411 assert(lt->get_userdata() == NULL);
412 lt->set_userdata(cloned_ft_handle);
413 }
414 return r;
415 }
416
417 // when a locktree is about to be destroyed,
418 // close the ft handle stored as userdata.
toku_db_lt_on_destroy_callback(toku::locktree * lt)419 void toku_db_lt_on_destroy_callback(toku::locktree *lt) {
420 FT_HANDLE ft_handle = (FT_HANDLE) lt->get_userdata();
421 assert(ft_handle);
422 toku_ft_handle_close(ft_handle);
423 }
424
425 // Instruct db to use the default (built-in) key comparison function
426 // by setting the flag bits in the db and ft structs
toku_db_use_builtin_key_cmp(DB * db)427 int toku_db_use_builtin_key_cmp(DB *db) {
428 HANDLE_PANICKED_DB(db);
429 int r = 0;
430 if (db_opened(db)) {
431 r = toku_ydb_do_error(db->dbenv, EINVAL, "Comparison functions cannot be set after DB open.\n");
432 } else if (db->i->key_compare_was_set) {
433 r = toku_ydb_do_error(db->dbenv, EINVAL, "Key comparison function already set.\n");
434 } else {
435 uint32_t tflags;
436 toku_ft_get_flags(db->i->ft_handle, &tflags);
437
438 tflags |= TOKU_DB_KEYCMP_BUILTIN;
439 toku_ft_set_flags(db->i->ft_handle, tflags);
440 db->i->key_compare_was_set = true;
441 }
442 return r;
443 }
444
toku_db_open_iname(DB * db,DB_TXN * txn,const char * iname_in_env,uint32_t flags,int mode)445 int toku_db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, uint32_t flags, int mode) {
446 //Set comparison functions if not yet set.
447 HANDLE_READ_ONLY_TXN(txn);
448 if (!db->i->key_compare_was_set && db->dbenv->i->bt_compare) {
449 toku_ft_set_bt_compare(db->i->ft_handle, db->dbenv->i->bt_compare);
450 db->i->key_compare_was_set = true;
451 }
452 if (db->dbenv->i->update_function) {
453 toku_ft_set_update(db->i->ft_handle,db->dbenv->i->update_function);
454 }
455 toku_ft_set_redirect_callback(
456 db->i->ft_handle,
457 db_on_redirect_callback,
458 db
459 );
460 bool need_locktree = (bool)((db->dbenv->i->open_flags & DB_INIT_LOCK) &&
461 (db->dbenv->i->open_flags & DB_INIT_TXN));
462
463 int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL;
464 int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE;
465 //We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
466 flags&=~DB_READ_UNCOMMITTED;
467 flags&=~DB_READ_COMMITTED;
468 flags&=~DB_SERIALIZABLE;
469 flags&=~DB_IS_HOT_INDEX;
470 flags&=~DB_RDONLY;
471 // unknown or conflicting flags are bad
472 int unknown_flags = flags & ~DB_THREAD;
473 unknown_flags &= ~DB_BLACKHOLE;
474 if (unknown_flags || (is_db_excl && !is_db_create)) {
475 return EINVAL;
476 }
477
478 if (db_opened(db)) {
479 return EINVAL; /* It was already open. */
480 }
481
482 db->i->open_flags = flags;
483 db->i->open_mode = mode;
484
485 bool open_rw = mode & (S_IWUSR | S_IWOTH | S_IWGRP);
486 FT_HANDLE ft_handle = db->i->ft_handle;
487 int r = toku_ft_handle_open(ft_handle, iname_in_env,
488 is_db_create, is_db_excl,
489 db->dbenv->i->cachetable,
490 txn ? db_txn_struct_i(txn)->tokutxn : nullptr, open_rw);
491 if (r != 0) {
492 goto out;
493 }
494
495 // if the dictionary was opened as a blackhole, mark the
496 // fractal tree as blackhole too.
497 if (flags & DB_BLACKHOLE) {
498 toku_ft_set_blackhole(ft_handle);
499 }
500
501 db->i->opened = 1;
502
503 // now that the handle has successfully opened, a valid descriptor
504 // is in the ft. we need to set the db's descriptor pointers
505 db_set_descriptors(db, ft_handle);
506
507 if (need_locktree) {
508 db->i->dict_id = toku_ft_get_dictionary_id(db->i->ft_handle);
509 struct lt_on_create_callback_extra on_create_extra = {
510 .txn = txn,
511 .ft_handle = db->i->ft_handle,
512 .open_rw = false
513 };
514 db->i->lt = db->dbenv->i->ltm.get_lt(db->i->dict_id,
515 toku_ft_get_comparator(db->i->ft_handle),
516 &on_create_extra);
517 if (db->i->lt == nullptr) {
518 r = errno;
519 if (r == 0) {
520 r = EINVAL;
521 }
522 goto out;
523 }
524 }
525 r = 0;
526
527 out:
528 if (r != 0) {
529 db->i->dict_id = DICTIONARY_ID_NONE;
530 db->i->opened = 0;
531 if (db->i->lt) {
532 db->dbenv->i->ltm.release_lt(db->i->lt);
533 db->i->lt = nullptr;
534 }
535 }
536 return r;
537 }
538
539 // Return the maximum key and val size in
540 // *key_size and *val_size respectively
541 static void
toku_db_get_max_row_size(DB * UU (db),uint32_t * max_key_size,uint32_t * max_val_size)542 toku_db_get_max_row_size(DB * UU(db), uint32_t * max_key_size, uint32_t * max_val_size) {
543 *max_key_size = 0;
544 *max_val_size = 0;
545 toku_ft_get_maximum_advised_key_value_lengths(max_key_size, max_val_size);
546 }
547
toku_db_pre_acquire_fileops_lock(DB * db,DB_TXN * txn)548 int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
549 // bad hack because some environment dictionaries do not have a dname
550 char *dname = db->i->dname;
551 if (!dname)
552 return 0;
553
554 DBT key_in_directory = { .data = dname, .size = (uint32_t) strlen(dname)+1 };
555 //Left end of range == right end of range (point lock)
556 int r = toku_db_get_range_lock(db->dbenv->i->directory, txn,
557 &key_in_directory, &key_in_directory,
558 toku::lock_request::type::WRITE);
559 if (r == 0)
560 STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS)++; // accountability
561 else
562 STATUS_VALUE(YDB_LAYER_DIRECTORY_WRITE_LOCKS_FAIL)++; // accountability
563 return r;
564 }
565
566 //
567 // This function is used both to set an initial descriptor of a DB and to
568 // change a descriptor. (only way to set a descriptor of a DB)
569 //
570 // Requires:
571 // - The caller must not call put_multiple, del_multiple, or update_multiple concurrently
572 // - The caller must not have a hot index running concurrently on db
573 // - If the caller has passed DB_UPDATE_CMP_DESCRIPTOR as a flag, then he is calling this function
574 // ONLY immediately after creating the dictionary and before doing any actual work on the dictionary.
575 //
576 static int
toku_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)577 toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, uint32_t flags) {
578 HANDLE_PANICKED_DB(db);
579 HANDLE_READ_ONLY_TXN(txn);
580 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
581 int r = 0;
582 TOKUTXN ttxn = txn ? db_txn_struct_i(txn)->tokutxn : NULL;
583 bool is_db_hot_index = ((flags & DB_IS_HOT_INDEX) != 0);
584 bool update_cmp_descriptor = ((flags & DB_UPDATE_CMP_DESCRIPTOR) != 0);
585
586 DBT old_descriptor_dbt;
587 toku_init_dbt(&old_descriptor_dbt);
588
589 if (!db_opened(db) || !descriptor || (descriptor->size>0 && !descriptor->data)){
590 r = EINVAL;
591 goto cleanup;
592 }
593 // For a hot index, this is an initial descriptor.
594 // We do not support (yet) hcad with hot index concurrently on a single table, which
595 // would require changing a descriptor for a hot index.
596 if (!is_db_hot_index) {
597 r = toku_db_pre_acquire_table_lock(db, txn);
598 if (r != 0) { goto cleanup; }
599 }
600
601 toku_clone_dbt(&old_descriptor_dbt, db->descriptor->dbt);
602 toku_ft_change_descriptor(db->i->ft_handle, &old_descriptor_dbt, descriptor,
603 true, ttxn, update_cmp_descriptor);
604
605 cleanup:
606 toku_destroy_dbt(&old_descriptor_dbt);
607 return r;
608 }
609
610 static int
toku_db_set_flags(DB * db,uint32_t flags)611 toku_db_set_flags(DB *db, uint32_t flags) {
612 HANDLE_PANICKED_DB(db);
613
614 /* the following matches BDB */
615 if (db_opened(db) && flags != 0) return EINVAL;
616
617 return 0;
618 }
619
620 static int
toku_db_get_flags(DB * db,uint32_t * pflags)621 toku_db_get_flags(DB *db, uint32_t *pflags) {
622 HANDLE_PANICKED_DB(db);
623 if (!pflags) return EINVAL;
624 *pflags = 0;
625 return 0;
626 }
627
628 static int
toku_db_change_pagesize(DB * db,uint32_t pagesize)629 toku_db_change_pagesize(DB *db, uint32_t pagesize) {
630 HANDLE_PANICKED_DB(db);
631 if (!db_opened(db)) return EINVAL;
632 toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
633 return 0;
634 }
635
636 static int
toku_db_set_pagesize(DB * db,uint32_t pagesize)637 toku_db_set_pagesize(DB *db, uint32_t pagesize) {
638 HANDLE_PANICKED_DB(db);
639 if (db_opened(db)) return EINVAL;
640 toku_ft_handle_set_nodesize(db->i->ft_handle, pagesize);
641 return 0;
642 }
643
644 static int
toku_db_get_pagesize(DB * db,uint32_t * pagesize_ptr)645 toku_db_get_pagesize(DB *db, uint32_t *pagesize_ptr) {
646 HANDLE_PANICKED_DB(db);
647 toku_ft_handle_get_nodesize(db->i->ft_handle, pagesize_ptr);
648 return 0;
649 }
650
651 static int
toku_db_change_readpagesize(DB * db,uint32_t readpagesize)652 toku_db_change_readpagesize(DB *db, uint32_t readpagesize) {
653 HANDLE_PANICKED_DB(db);
654 if (!db_opened(db)) return EINVAL;
655 toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
656 return 0;
657 }
658
659 static int
toku_db_set_readpagesize(DB * db,uint32_t readpagesize)660 toku_db_set_readpagesize(DB *db, uint32_t readpagesize) {
661 HANDLE_PANICKED_DB(db);
662 if (db_opened(db)) return EINVAL;
663 toku_ft_handle_set_basementnodesize(db->i->ft_handle, readpagesize);
664 return 0;
665 }
666
667 static int
toku_db_get_readpagesize(DB * db,uint32_t * readpagesize_ptr)668 toku_db_get_readpagesize(DB *db, uint32_t *readpagesize_ptr) {
669 HANDLE_PANICKED_DB(db);
670 toku_ft_handle_get_basementnodesize(db->i->ft_handle, readpagesize_ptr);
671 return 0;
672 }
673
674 static int
toku_db_change_compression_method(DB * db,enum toku_compression_method compression_method)675 toku_db_change_compression_method(DB *db, enum toku_compression_method compression_method) {
676 HANDLE_PANICKED_DB(db);
677 if (!db_opened(db)) return EINVAL;
678 toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
679 return 0;
680 }
681
682 static int
toku_db_set_compression_method(DB * db,enum toku_compression_method compression_method)683 toku_db_set_compression_method(DB *db, enum toku_compression_method compression_method) {
684 HANDLE_PANICKED_DB(db);
685 if (db_opened(db)) return EINVAL;
686 toku_ft_handle_set_compression_method(db->i->ft_handle, compression_method);
687 return 0;
688 }
689
690 static int
toku_db_get_compression_method(DB * db,enum toku_compression_method * compression_method_ptr)691 toku_db_get_compression_method(DB *db, enum toku_compression_method *compression_method_ptr) {
692 HANDLE_PANICKED_DB(db);
693 toku_ft_handle_get_compression_method(db->i->ft_handle, compression_method_ptr);
694 return 0;
695 }
696
697 static int
toku_db_change_fanout(DB * db,unsigned int fanout)698 toku_db_change_fanout(DB *db, unsigned int fanout) {
699 HANDLE_PANICKED_DB(db);
700 if (!db_opened(db)) return EINVAL;
701 toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
702 return 0;
703 }
704
705 static int
toku_db_set_fanout(DB * db,unsigned int fanout)706 toku_db_set_fanout(DB *db, unsigned int fanout) {
707 HANDLE_PANICKED_DB(db);
708 if (db_opened(db)) return EINVAL;
709 toku_ft_handle_set_fanout(db->i->ft_handle, fanout);
710 return 0;
711 }
712
713 static int
toku_db_get_fanout(DB * db,unsigned int * fanout)714 toku_db_get_fanout(DB *db, unsigned int *fanout) {
715 HANDLE_PANICKED_DB(db);
716 toku_ft_handle_get_fanout(db->i->ft_handle, fanout);
717 return 0;
718 }
719
720 static int
toku_db_set_memcmp_magic(DB * db,uint8_t magic)721 toku_db_set_memcmp_magic(DB *db, uint8_t magic) {
722 HANDLE_PANICKED_DB(db);
723 if (db_opened(db)) {
724 return EINVAL;
725 }
726 return toku_ft_handle_set_memcmp_magic(db->i->ft_handle, magic);
727 }
728
729 static int
toku_db_get_fractal_tree_info64(DB * db,uint64_t * num_blocks_allocated,uint64_t * num_blocks_in_use,uint64_t * size_allocated,uint64_t * size_in_use)730 toku_db_get_fractal_tree_info64(DB *db, uint64_t *num_blocks_allocated, uint64_t *num_blocks_in_use, uint64_t *size_allocated, uint64_t *size_in_use) {
731 HANDLE_PANICKED_DB(db);
732 struct ftinfo64 ftinfo;
733 toku_ft_handle_get_fractal_tree_info64(db->i->ft_handle, &ftinfo);
734 *num_blocks_allocated = ftinfo.num_blocks_allocated;
735 *num_blocks_in_use = ftinfo.num_blocks_in_use;
736 *size_allocated = ftinfo.size_allocated;
737 *size_in_use = ftinfo.size_in_use;
738 return 0;
739 }
740
741 static int
toku_db_iterate_fractal_tree_block_map(DB * db,int (* iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void *),void * iter_extra)742 toku_db_iterate_fractal_tree_block_map(DB *db, int (*iter)(uint64_t,int64_t,int64_t,int64_t,int64_t,void*), void *iter_extra) {
743 HANDLE_PANICKED_DB(db);
744 return toku_ft_handle_iterate_fractal_tree_block_map(db->i->ft_handle, iter, iter_extra);
745 }
746
747 static int
toku_db_stat64(DB * db,DB_TXN * txn,DB_BTREE_STAT64 * s)748 toku_db_stat64(DB * db, DB_TXN *txn, DB_BTREE_STAT64 *s) {
749 HANDLE_PANICKED_DB(db);
750 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
751 struct ftstat64_s ftstat;
752 TOKUTXN tokutxn = NULL;
753 if (txn != NULL) {
754 tokutxn = db_txn_struct_i(txn)->tokutxn;
755 }
756 toku_ft_handle_stat64(db->i->ft_handle, tokutxn, &ftstat);
757 s->bt_nkeys = ftstat.nkeys;
758 s->bt_ndata = ftstat.ndata;
759 s->bt_dsize = ftstat.dsize;
760 s->bt_fsize = ftstat.fsize;
761 s->bt_create_time_sec = ftstat.create_time_sec;
762 s->bt_modify_time_sec = ftstat.modify_time_sec;
763 s->bt_verify_time_sec = ftstat.verify_time_sec;
764 return 0;
765 }
766
767 static const char *
toku_db_get_dname(DB * db)768 toku_db_get_dname(DB *db) {
769 if (!db_opened(db)) {
770 return nullptr;
771 }
772 if (db->i->dname == nullptr) {
773 return "";
774 }
775 return db->i->dname;
776 }
777
778 static int
toku_db_keys_range64(DB * db,DB_TXN * txn,DBT * keyleft,DBT * keyright,uint64_t * less,uint64_t * left,uint64_t * between,uint64_t * right,uint64_t * greater,bool * middle_3_exact)779 toku_db_keys_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* keyleft, DBT* keyright, uint64_t* less, uint64_t* left, uint64_t* between, uint64_t *right, uint64_t *greater, bool* middle_3_exact) {
780 HANDLE_PANICKED_DB(db);
781 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
782
783 // note that we ignore the txn param. It would be more complicated to support it.
784 // TODO(yoni): Maybe add support for txns later? How would we do this? ydb lock comment about db_keyrange64 is obsolete.
785 toku_ft_keysrange(db->i->ft_handle, keyleft, keyright, less, left, between, right, greater, middle_3_exact);
786 return 0;
787 }
788
789 static int
toku_db_key_range64(DB * db,DB_TXN * txn,DBT * key,uint64_t * less_p,uint64_t * equal_p,uint64_t * greater_p,int * is_exact)790 toku_db_key_range64(DB* db, DB_TXN* txn, DBT* key, uint64_t* less_p, uint64_t* equal_p, uint64_t* greater_p, int* is_exact) {
791 uint64_t less, equal_left, middle, equal_right, greater;
792 bool ignore;
793 int r = toku_db_keys_range64(db, txn, key, NULL, &less, &equal_left, &middle, &equal_right, &greater, &ignore);
794 if (r == 0) {
795 *less_p = less;
796 *equal_p = equal_left;
797 *greater_p = middle;
798 paranoid_invariant_zero(greater); // no keys are greater than positive infinity
799 paranoid_invariant_zero(equal_right); // no keys are equal to positive infinity
800 // toku_ft_keysrange does not know when all 3 are exact, so set is_exact to false
801 *is_exact = false;
802 }
803 return 0;
804 }
805
toku_db_get_key_after_bytes(DB * db,DB_TXN * txn,const DBT * start_key,uint64_t skip_len,void (* callback)(const DBT * end_key,uint64_t actually_skipped,void * extra),void * cb_extra,uint32_t UU (flags))806 static int toku_db_get_key_after_bytes(DB *db, DB_TXN *txn, const DBT *start_key, uint64_t skip_len, void (*callback)(const DBT *end_key, uint64_t actually_skipped, void *extra), void *cb_extra, uint32_t UU(flags)) {
807 HANDLE_PANICKED_DB(db);
808 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
809 return toku_ft_get_key_after_bytes(db->i->ft_handle, start_key, skip_len, callback, cb_extra);
810 }
811
812 // needed by loader.c
813 int
toku_db_pre_acquire_table_lock(DB * db,DB_TXN * txn)814 toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
815 HANDLE_PANICKED_DB(db);
816 if (!db->i->lt || !txn) return 0;
817 int r;
818 r = toku_db_get_range_lock(db, txn,
819 toku_dbt_negative_infinity(), toku_dbt_positive_infinity(),
820 toku::lock_request::type::WRITE);
821 return r;
822 }
823
824 static int
locked_db_close(DB * db,uint32_t UU (flags))825 locked_db_close(DB * db, uint32_t UU(flags)) {
826 // cannot begin a checkpoint
827 toku_multi_operation_client_lock();
828 int r = toku_db_close(db);
829 toku_multi_operation_client_unlock();
830 return r;
831 }
832
833 int
autotxn_db_get(DB * db,DB_TXN * txn,DBT * key,DBT * data,uint32_t flags)834 autotxn_db_get(DB* db, DB_TXN* txn, DBT* key, DBT* data, uint32_t flags) {
835 bool changed; int r;
836 r = toku_db_construct_autotxn(db, &txn, &changed, false);
837 if (r!=0) return r;
838 r = toku_db_get(db, txn, key, data, flags);
839 return toku_db_destruct_autotxn(txn, r, changed);
840 }
841
842 static inline int
autotxn_db_getf_set(DB * db,DB_TXN * txn,uint32_t flags,DBT * key,YDB_CALLBACK_FUNCTION f,void * extra)843 autotxn_db_getf_set (DB *db, DB_TXN *txn, uint32_t flags, DBT *key, YDB_CALLBACK_FUNCTION f, void *extra) {
844 bool changed; int r;
845 r = toku_db_construct_autotxn(db, &txn, &changed, false);
846 if (r!=0) return r;
847 r = db_getf_set(db, txn, flags, key, f, extra);
848 return toku_db_destruct_autotxn(txn, r, changed);
849 }
850
851 static int
locked_db_open(DB * db,DB_TXN * txn,const char * fname,const char * dbname,DBTYPE dbtype,uint32_t flags,int mode)852 locked_db_open(DB *db, DB_TXN *txn, const char *fname, const char *dbname, DBTYPE dbtype, uint32_t flags, int mode) {
853 int ret, r;
854 HANDLE_READ_ONLY_TXN(txn);
855 HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
856
857 //
858 // Note that this function opens a db with a transaction. Should
859 // the transaction abort, the user is responsible for closing the DB
860 // before aborting the transaction. Not doing so results in undefined
861 // behavior.
862 //
863 DB_ENV *env = db->dbenv;
864 DB_TXN *child_txn = NULL;
865 int using_txns = env->i->open_flags & DB_INIT_TXN;
866 if (using_txns) {
867 ret = toku_txn_begin(env, txn, &child_txn, DB_TXN_NOSYNC);
868 invariant_zero(ret);
869 }
870
871 // cannot begin a checkpoint
872 toku_multi_operation_client_lock();
873 r = toku_db_open(db, child_txn, fname, dbname, dbtype, flags & ~DB_AUTO_COMMIT, mode);
874 toku_multi_operation_client_unlock();
875
876 if (using_txns) {
877 if (r == 0) {
878 ret = locked_txn_commit(child_txn, DB_TXN_NOSYNC);
879 invariant_zero(ret);
880 } else {
881 ret = locked_txn_abort(child_txn);
882 invariant_zero(ret);
883 }
884 }
885 return r;
886 }
887
888 static int
locked_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)889 locked_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
890 // cannot begin a checkpoint
891 toku_multi_operation_client_lock();
892 int r = toku_db_change_descriptor(db, txn, descriptor, flags);
893 toku_multi_operation_client_unlock();
894 return r;
895 }
896
897 static int
autotxn_db_change_descriptor(DB * db,DB_TXN * txn,const DBT * descriptor,uint32_t flags)898 autotxn_db_change_descriptor(DB *db, DB_TXN *txn, const DBT *descriptor, uint32_t flags) {
899 bool changed; int r;
900 r = toku_db_construct_autotxn(db, &txn, &changed, false);
901 if (r != 0) { return r; }
902 r = locked_db_change_descriptor(db, txn, descriptor, flags);
903 return toku_db_destruct_autotxn(txn, r, changed);
904 }
905
906 static void
toku_db_set_errfile(DB * db,FILE * errfile)907 toku_db_set_errfile (DB *db, FILE *errfile) {
908 db->dbenv->set_errfile(db->dbenv, errfile);
909 }
910
911 // TODO 2216 delete this
912 static int
toku_db_fd(DB * UU (db),int * UU (fdp))913 toku_db_fd(DB * UU(db), int * UU(fdp)) {
914 return 0;
915 }
916
917 static const DBT* toku_db_dbt_pos_infty(void) __attribute__((pure));
918 static const DBT*
toku_db_dbt_pos_infty(void)919 toku_db_dbt_pos_infty(void) {
920 return toku_dbt_positive_infinity();
921 }
922
923 static const DBT* toku_db_dbt_neg_infty(void) __attribute__((pure));
924 static const DBT*
toku_db_dbt_neg_infty(void)925 toku_db_dbt_neg_infty(void) {
926 return toku_dbt_negative_infinity();
927 }
928
929 static int
toku_db_optimize(DB * db)930 toku_db_optimize(DB *db) {
931 HANDLE_PANICKED_DB(db);
932 toku_ft_optimize(db->i->ft_handle);
933 return 0;
934 }
935
936 static int
toku_db_hot_optimize(DB * db,DBT * left,DBT * right,int (* progress_callback)(void * extra,float progress),void * progress_extra,uint64_t * loops_run)937 toku_db_hot_optimize(DB *db, DBT* left, DBT* right,
938 int (*progress_callback)(void *extra, float progress),
939 void *progress_extra, uint64_t* loops_run)
940 {
941 HANDLE_PANICKED_DB(db);
942 int r = 0;
943 r = toku_ft_hot_optimize(db->i->ft_handle, left, right,
944 progress_callback,
945 progress_extra, loops_run);
946
947 return r;
948 }
949
950 static int
locked_db_optimize(DB * db)951 locked_db_optimize(DB *db) {
952 // need to protect from checkpointing because
953 // toku_db_optimize does a message injection
954 toku_multi_operation_client_lock(); //Cannot begin checkpoint
955 int r = toku_db_optimize(db);
956 toku_multi_operation_client_unlock();
957 return r;
958 }
959
960
961 struct last_key_extra {
962 YDB_CALLBACK_FUNCTION func;
963 void* extra;
964 };
965
966 static int
db_get_last_key_callback(uint32_t keylen,const void * key,uint32_t vallen UU (),const void * val UU (),void * extra,bool lock_only)967 db_get_last_key_callback(uint32_t keylen, const void *key, uint32_t vallen UU(), const void *val UU(), void *extra, bool lock_only) {
968 if (!lock_only) {
969 DBT keydbt;
970 toku_fill_dbt(&keydbt, key, keylen);
971 struct last_key_extra * CAST_FROM_VOIDP(info, extra);
972 info->func(&keydbt, NULL, info->extra);
973 }
974 return 0;
975 }
976
977 static int
toku_db_get_last_key(DB * db,DB_TXN * txn,YDB_CALLBACK_FUNCTION func,void * extra)978 toku_db_get_last_key(DB * db, DB_TXN *txn, YDB_CALLBACK_FUNCTION func, void* extra) {
979 int r;
980 LE_CURSOR cursor = nullptr;
981 struct last_key_extra last_extra = { .func = func, .extra = extra };
982
983 r = toku_le_cursor_create(&cursor, db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
984 if (r != 0) { goto cleanup; }
985
986 // Goes in reverse order. First key returned is last in dictionary.
987 r = toku_le_cursor_next(cursor, db_get_last_key_callback, &last_extra);
988 if (r != 0) { goto cleanup; }
989
990 cleanup:
991 if (cursor) {
992 toku_le_cursor_close(cursor);
993 }
994 return r;
995 }
996
997 static int
autotxn_db_get_last_key(DB * db,YDB_CALLBACK_FUNCTION func,void * extra)998 autotxn_db_get_last_key(DB* db, YDB_CALLBACK_FUNCTION func, void* extra) {
999 bool changed; int r;
1000 DB_TXN *txn = nullptr;
1001 // Cursors inside require transactions, but this is _not_ a transactional function.
1002 // Create transaction in a wrapper and then later close it.
1003 r = toku_db_construct_autotxn(db, &txn, &changed, false);
1004 if (r!=0) return r;
1005 r = toku_db_get_last_key(db, txn, func, extra);
1006 return toku_db_destruct_autotxn(txn, r, changed);
1007 }
1008
1009 static int
toku_db_get_fragmentation(DB * db,TOKU_DB_FRAGMENTATION report)1010 toku_db_get_fragmentation(DB * db, TOKU_DB_FRAGMENTATION report) {
1011 HANDLE_PANICKED_DB(db);
1012 int r;
1013 if (!db_opened(db))
1014 r = toku_ydb_do_error(db->dbenv, EINVAL, "Fragmentation report available only on open DBs.\n");
1015 else
1016 r = toku_ft_get_fragmentation(db->i->ft_handle, report);
1017 return r;
1018 }
1019
1020 int
toku_db_set_indexer(DB * db,DB_INDEXER * indexer)1021 toku_db_set_indexer(DB *db, DB_INDEXER * indexer) {
1022 int r = 0;
1023 if ( db->i->indexer != NULL && indexer != NULL ) {
1024 // you are trying to overwrite a valid indexer
1025 r = EINVAL;
1026 }
1027 else {
1028 db->i->indexer = indexer;
1029 }
1030 return r;
1031 }
1032
1033 DB_INDEXER *
toku_db_get_indexer(DB * db)1034 toku_db_get_indexer(DB *db) {
1035 return db->i->indexer;
1036 }
1037
1038 static void
db_get_indexer(DB * db,DB_INDEXER ** indexer_ptr)1039 db_get_indexer(DB *db, DB_INDEXER **indexer_ptr) {
1040 *indexer_ptr = toku_db_get_indexer(db);
1041 }
1042
1043 struct ydb_verify_context {
1044 int (*progress_callback)(void *extra, float progress);
1045 void *progress_extra;
1046 };
1047
1048 static int
ydb_verify_progress_callback(void * extra,float progress)1049 ydb_verify_progress_callback(void *extra, float progress) {
1050 struct ydb_verify_context *context = (struct ydb_verify_context *) extra;
1051 int r = 0;
1052 if (context->progress_callback) {
1053 r = context->progress_callback(context->progress_extra, progress);
1054 }
1055 return r;
1056 }
1057
1058 static int
toku_db_verify_with_progress(DB * db,int (* progress_callback)(void * extra,float progress),void * progress_extra,int verbose,int keep_going)1059 toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float progress), void *progress_extra, int verbose, int keep_going) {
1060 struct ydb_verify_context context = { progress_callback, progress_extra };
1061 int r = toku_verify_ft_with_progress(db->i->ft_handle, ydb_verify_progress_callback, &context, verbose, keep_going);
1062 return r;
1063 }
1064
1065
1066 static int
toku_db_recount_rows(DB * db,int (* progress_callback)(uint64_t count,uint64_t deleted,void * progress_extra),void * progress_extra)1067 toku_db_recount_rows(DB* db, int (*progress_callback)(uint64_t count,
1068 uint64_t deleted,
1069 void* progress_extra),
1070 void* progress_extra) {
1071
1072 HANDLE_PANICKED_DB(db);
1073 int r = 0;
1074 r =
1075 toku_ft_recount_rows(
1076 db->i->ft_handle,
1077 progress_callback,
1078 progress_extra);
1079
1080 return r;
1081 }
1082
1083
toku_setup_db_internal(DB ** dbp,DB_ENV * env,uint32_t flags,FT_HANDLE ft_handle,bool is_open)1084 int toku_setup_db_internal (DB **dbp, DB_ENV *env, uint32_t flags, FT_HANDLE ft_handle, bool is_open) {
1085 if (flags || env == NULL)
1086 return EINVAL;
1087
1088 if (!env_opened(env))
1089 return EINVAL;
1090
1091 DB *MALLOC(result);
1092 if (result == 0) {
1093 return ENOMEM;
1094 }
1095 memset(result, 0, sizeof *result);
1096 result->dbenv = env;
1097 MALLOC(result->i);
1098 if (result->i == 0) {
1099 toku_free(result);
1100 return ENOMEM;
1101 }
1102 memset(result->i, 0, sizeof *result->i);
1103 result->i->ft_handle = ft_handle;
1104 result->i->opened = is_open;
1105 *dbp = result;
1106 return 0;
1107 }
1108
1109 int
toku_db_create(DB ** db,DB_ENV * env,uint32_t flags)1110 toku_db_create(DB ** db, DB_ENV * env, uint32_t flags) {
1111 if (flags || env == NULL)
1112 return EINVAL;
1113
1114 if (!env_opened(env))
1115 return EINVAL;
1116
1117
1118 FT_HANDLE ft_handle;
1119 toku_ft_handle_create(&ft_handle);
1120
1121 int r = toku_setup_db_internal(db, env, flags, ft_handle, false);
1122 if (r != 0) return r;
1123
1124 DB *result=*db;
1125 // methods that grab the ydb lock
1126 #define SDB(name) result->name = locked_db_ ## name
1127 SDB(close);
1128 SDB(open);
1129 SDB(optimize);
1130 #undef SDB
1131 // methods that do not take the ydb lock
1132 #define USDB(name) result->name = toku_db_ ## name
1133 USDB(set_errfile);
1134 USDB(set_pagesize);
1135 USDB(get_pagesize);
1136 USDB(change_pagesize);
1137 USDB(set_readpagesize);
1138 USDB(get_readpagesize);
1139 USDB(change_readpagesize);
1140 USDB(set_compression_method);
1141 USDB(get_compression_method);
1142 USDB(change_compression_method);
1143 USDB(set_fanout);
1144 USDB(get_fanout);
1145 USDB(set_memcmp_magic);
1146 USDB(change_fanout);
1147 USDB(set_flags);
1148 USDB(get_flags);
1149 USDB(fd);
1150 USDB(get_max_row_size);
1151 USDB(set_indexer);
1152 USDB(pre_acquire_table_lock);
1153 USDB(pre_acquire_fileops_lock);
1154 USDB(key_range64);
1155 USDB(keys_range64);
1156 USDB(get_key_after_bytes);
1157 USDB(hot_optimize);
1158 USDB(stat64);
1159 USDB(get_fractal_tree_info64);
1160 USDB(iterate_fractal_tree_block_map);
1161 USDB(get_dname);
1162 USDB(verify_with_progress);
1163 USDB(cursor);
1164 USDB(dbt_pos_infty);
1165 USDB(dbt_neg_infty);
1166 USDB(get_fragmentation);
1167 USDB(recount_rows);
1168 #undef USDB
1169 result->get_indexer = db_get_indexer;
1170 result->del = autotxn_db_del;
1171 result->put = autotxn_db_put;
1172 result->update = autotxn_db_update;
1173 result->update_broadcast = autotxn_db_update_broadcast;
1174 result->change_descriptor = autotxn_db_change_descriptor;
1175 result->get_last_key = autotxn_db_get_last_key;
1176
1177 // unlocked methods
1178 result->get = autotxn_db_get;
1179 result->getf_set = autotxn_db_getf_set;
1180
1181 result->i->dict_id = DICTIONARY_ID_NONE;
1182 result->i->opened = 0;
1183 result->i->open_flags = 0;
1184 result->i->open_mode = 0;
1185 result->i->indexer = NULL;
1186 *db = result;
1187 return 0;
1188 }
1189
1190 // When the loader is created, it makes this call (toku_env_load_inames).
1191 // For each dictionary to be loaded, replace old iname in directory
1192 // with a newly generated iname. This will also take a write lock
1193 // on the directory entries. The write lock will be released when
1194 // the transaction of the loader is completed.
1195 // If the transaction commits, the new inames are in place.
1196 // If the transaction aborts, the old inames will be restored.
1197 // The new inames are returned to the caller.
1198 // It is the caller's responsibility to free them.
1199 // If "mark_as_loader" is true, then include a mark in the iname
1200 // to indicate that the file is created by the ft loader.
1201 // Return 0 on success (could fail if write lock not available).
1202 static int
load_inames(DB_ENV * env,DB_TXN * txn,int N,DB * dbs[],const char * new_inames_in_env[],LSN * load_lsn,bool mark_as_loader)1203 load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], const char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1204 int rval = 0;
1205 int i;
1206
1207 TXNID_PAIR xid = TXNID_PAIR_NONE;
1208 DBT dname_dbt; // holds dname
1209 DBT iname_dbt; // holds new iname
1210
1211 const char *mark;
1212
1213 if (mark_as_loader) {
1214 mark = "B";
1215 } else {
1216 mark = "P";
1217 }
1218
1219 for (i=0; i<N; i++) {
1220 new_inames_in_env[i] = NULL;
1221 }
1222
1223 if (txn) {
1224 xid = toku_txn_get_txnid(db_txn_struct_i(txn)->tokutxn);
1225 }
1226 for (i = 0; i < N; i++) {
1227 char * dname = dbs[i]->i->dname;
1228 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
1229
1230 // now create new iname
1231 char hint[strlen(dname) + 1];
1232 create_iname_hint(env, dname, hint);
1233
1234 // allocates memory for iname_in_env
1235 const char *new_iname =
1236 create_iname(env, xid.parent_id64, xid.child_id64, hint, mark, i);
1237 new_inames_in_env[i] = new_iname;
1238
1239 // iname_in_env goes in directory
1240 toku_fill_dbt(&iname_dbt, new_iname, strlen(new_iname) + 1);
1241 rval = toku_db_put(env->i->directory, txn, &dname_dbt, &iname_dbt, 0, true);
1242 if (rval) break;
1243 }
1244
1245 // Generate load log entries.
1246 if (!rval && txn) {
1247 TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
1248 int do_fsync = 0;
1249 LSN *get_lsn = NULL;
1250 for (i = 0; i < N; i++) {
1251 FT_HANDLE ft_handle = dbs[i]->i->ft_handle;
1252 //Fsync is necessary for the last one only.
1253 if (i==N-1) {
1254 do_fsync = 1; //We only need a single fsync of logs.
1255 get_lsn = load_lsn; //Set pointer to capture the last lsn.
1256 }
1257 toku_ft_load(ft_handle, ttxn, new_inames_in_env[i], do_fsync, get_lsn);
1258 }
1259 }
1260 return rval;
1261 }
1262
1263 int
locked_load_inames(DB_ENV * env,DB_TXN * txn,int N,DB * dbs[],char * new_inames_in_env[],LSN * load_lsn,bool mark_as_loader)1264 locked_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[/*N*/], char * new_inames_in_env[/*N*/], LSN *load_lsn, bool mark_as_loader) {
1265 int r;
1266 HANDLE_READ_ONLY_TXN(txn);
1267
1268 // cannot begin a checkpoint
1269 toku_multi_operation_client_lock();
1270 r = load_inames(env, txn, N, dbs, (const char **) new_inames_in_env, load_lsn, mark_as_loader);
1271 toku_multi_operation_client_unlock();
1272
1273 return r;
1274
1275 }
1276
1277 #undef STATUS_VALUE
1278
1279 #include <toku_race_tools.h>
1280 void __attribute__((constructor)) toku_ydb_db_helgrind_ignore(void);
1281 void
toku_ydb_db_helgrind_ignore(void)1282 toku_ydb_db_helgrind_ignore(void) {
1283 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_db_layer_status, sizeof ydb_db_layer_status);
1284 }
1285