1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 extern const char *toku_patent_string;
40 const char *toku_copyright_string = "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.";
41
42 #include <my_global.h>
43
44 extern int writing_rollback;
45
46 #include <db.h>
47 #include <errno.h>
48 #include <string.h>
49
50 #include "portability/memory.h"
51 #include "portability/toku_assert.h"
52 #include "portability/toku_portability.h"
53 #include "portability/toku_pthread.h"
54 #include "portability/toku_stdlib.h"
55
56 #include "ft/ft-flusher.h"
57 #include "ft/cachetable/cachetable.h"
58 #include "ft/cachetable/checkpoint.h"
59 #include "ft/logger/log.h"
60 #include "ft/loader/loader.h"
61 #include "ft/log_header.h"
62 #include "ft/ft.h"
63 #include "ft/txn/txn_manager.h"
64 #include "src/ydb.h"
65 #include "src/ydb-internal.h"
66 #include "src/ydb_cursor.h"
67 #include "src/ydb_row_lock.h"
68 #include "src/ydb_env_func.h"
69 #include "src/ydb_db.h"
70 #include "src/ydb_write.h"
71 #include "src/ydb_txn.h"
72 #include "src/loader.h"
73 #include "src/indexer.h"
74 #include "util/status.h"
75 #include "util/context.h"
76
77 #include <functional>
78
79 // Include ydb_lib.cc here so that its constructor/destructor gets put into
80 // ydb.o, to make sure they don't get erased at link time (when linking to
81 // a static libtokufractaltree.a that was compiled with gcc). See #5094.
82 #include "ydb_lib.cc"
83
84 #ifdef TOKUTRACE
85 #define DB_ENV_CREATE_FUN db_env_create_toku10
86 #define DB_CREATE_FUN db_create_toku10
87 #else
88 #define DB_ENV_CREATE_FUN db_env_create
89 #define DB_CREATE_FUN db_create
toku_set_trace_file(const char * fname)90 int toku_set_trace_file (const char *fname __attribute__((__unused__))) { return 0; }
toku_close_trace_file(void)91 int toku_close_trace_file (void) { return 0; }
92 #endif
93
94 extern uint force_recovery;
95
96 // Set when env is panicked, never cleared.
97 static int env_is_panicked = 0;
98
99 void
env_panic(DB_ENV * env,int cause,const char * msg)100 env_panic(DB_ENV * env, int cause, const char * msg) {
101 if (cause == 0)
102 cause = -1; // if unknown cause, at least guarantee panic
103 if (msg == NULL)
104 msg = "Unknown cause in env_panic\n";
105 env_is_panicked = cause;
106 env->i->is_panicked = cause;
107 env->i->panic_string = toku_strdup(msg);
108 }
109
110 static int env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp);
111
112 /********************************************************************************
113 * Status is intended for display to humans to help understand system behavior.
114 * It does not need to be perfectly thread-safe.
115 */
116
117 typedef enum {
118 YDB_LAYER_TIME_CREATION = 0, /* timestamp of environment creation, read from persistent environment */
119 YDB_LAYER_TIME_STARTUP, /* timestamp of system startup */
120 YDB_LAYER_TIME_NOW, /* timestamp of engine status query */
121 YDB_LAYER_NUM_DB_OPEN,
122 YDB_LAYER_NUM_DB_CLOSE,
123 YDB_LAYER_NUM_OPEN_DBS,
124 YDB_LAYER_MAX_OPEN_DBS,
125 YDB_LAYER_FSYNC_LOG_PERIOD,
126 #if 0
127 YDB_LAYER_ORIGINAL_ENV_VERSION, /* version of original environment, read from persistent environment */
128 YDB_LAYER_STARTUP_ENV_VERSION, /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
129 YDB_LAYER_LAST_LSN_OF_V13, /* read from persistent environment */
130 YDB_LAYER_UPGRADE_V14_TIME, /* timestamp of upgrade to version 14, read from persistent environment */
131 YDB_LAYER_UPGRADE_V14_FOOTPRINT, /* footprint of upgrade to version 14, read from persistent environment */
132 #endif
133 YDB_LAYER_STATUS_NUM_ROWS /* number of rows in this status array */
134 } ydb_layer_status_entry;
135
136 typedef struct {
137 bool initialized;
138 TOKU_ENGINE_STATUS_ROW_S status[YDB_LAYER_STATUS_NUM_ROWS];
139 } YDB_LAYER_STATUS_S, *YDB_LAYER_STATUS;
140
141 static YDB_LAYER_STATUS_S ydb_layer_status;
142 #define STATUS_VALUE(x) ydb_layer_status.status[x].value.num
143
144 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_layer_status, k, c, t, l, inc)
145
146 static void
ydb_layer_status_init(void)147 ydb_layer_status_init (void) {
148 // Note, this function initializes the keyname, type, and legend fields.
149 // Value fields are initialized to zero by compiler.
150
151 STATUS_INIT(YDB_LAYER_TIME_CREATION, nullptr, UNIXTIME, "time of environment creation", TOKU_ENGINE_STATUS);
152 STATUS_INIT(YDB_LAYER_TIME_STARTUP, nullptr, UNIXTIME, "time of engine startup", TOKU_ENGINE_STATUS);
153 STATUS_INIT(YDB_LAYER_TIME_NOW, nullptr, UNIXTIME, "time now", TOKU_ENGINE_STATUS);
154 STATUS_INIT(YDB_LAYER_NUM_DB_OPEN, DB_OPENS, UINT64, "db opens", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
155 STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE, DB_CLOSES, UINT64, "db closes", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
156 STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS, DB_OPEN_CURRENT, UINT64, "num open dbs now", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
157 STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS, DB_OPEN_MAX, UINT64, "max open dbs", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
158 STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD, nullptr, UINT64, "period, in ms, that recovery log is automatically fsynced", TOKU_ENGINE_STATUS);
159
160 STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
161 ydb_layer_status.initialized = true;
162 }
163 #undef STATUS_INIT
164
165 static void
ydb_layer_get_status(DB_ENV * env,YDB_LAYER_STATUS statp)166 ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
167 STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
168 STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
169 *statp = ydb_layer_status;
170 }
171
172 /********************************************************************************
173 * End of ydb_layer local status section.
174 */
175
176 static DB_ENV * volatile most_recent_env; // most recently opened env, used for engine status on crash. Note there are likely to be races on this if you have multiple threads creating and closing environments in parallel. We'll declare it volatile since at least that helps make sure the compiler doesn't optimize away certain code (e.g., if while debugging, you write a code that spins on most_recent_env, you'd like to compiler not to optimize your code away.)
177
178 static int env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt);
179 static int toku_maybe_get_engine_status_text (char* buff, int buffsize); // for use by toku_assert
180 static int toku_maybe_err_engine_status (void);
181 static void toku_maybe_set_env_panic(int code, const char * msg); // for use by toku_assert
182
183 int
toku_ydb_init(void)184 toku_ydb_init(void) {
185 int r = 0;
186 //Lower level must be initialized first.
187 r = toku_ft_layer_init();
188 return r;
189 }
190
191 // Do not clean up resources if env is panicked, just exit ugly
192 void
toku_ydb_destroy(void)193 toku_ydb_destroy(void) {
194 if (!ydb_layer_status.initialized)
195 return;
196 if (env_is_panicked == 0) {
197 toku_ft_layer_destroy();
198 }
199 ydb_layer_status.initialized = false;
200 }
201
202 static int
ydb_getf_do_nothing(DBT const * UU (key),DBT const * UU (val),void * UU (extra))203 ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
204 return 0;
205 }
206
207 /* env methods */
208
209 static void
env_fs_report_in_yellow(DB_ENV * UU (env))210 env_fs_report_in_yellow(DB_ENV *UU(env)) {
211 char tbuf[26];
212 time_t tnow = time(NULL);
213 fprintf(stderr, "%.24s PerconaFT file system space is low\n", ctime_r(&tnow, tbuf)); fflush(stderr);
214 }
215
216 static void
env_fs_report_in_red(DB_ENV * UU (env))217 env_fs_report_in_red(DB_ENV *UU(env)) {
218 char tbuf[26];
219 time_t tnow = time(NULL);
220 fprintf(stderr, "%.24s PerconaFT file system space is really low and access is restricted\n", ctime_r(&tnow, tbuf)); fflush(stderr);
221 }
222
223 static inline uint64_t
env_fs_redzone(DB_ENV * env,uint64_t total)224 env_fs_redzone(DB_ENV *env, uint64_t total) {
225 return total * env->i->redzone / 100;
226 }
227
228 #define ZONEREPORTLIMIT 12
229 // Check the available space in the file systems used by tokuft and erect barriers when available space gets low.
230 static int
env_fs_poller(void * arg)231 env_fs_poller(void *arg) {
232 if(force_recovery == 6) {
233 return 0;
234 }
235 DB_ENV *env = (DB_ENV *) arg;
236 int r;
237
238 int in_yellow; // set true to issue warning to user
239 int in_red; // set true to prevent certain operations (returning ENOSPC)
240
241 // get the fs sizes for the home dir
242 uint64_t avail_size = 0, total_size = 0;
243 r = toku_get_filesystem_sizes(env->i->dir, &avail_size, NULL, &total_size);
244 assert(r == 0);
245 in_yellow = (avail_size < 2 * env_fs_redzone(env, total_size));
246 in_red = (avail_size < env_fs_redzone(env, total_size));
247
248 // get the fs sizes for the data dir if different than the home dir
249 if (strcmp(env->i->dir, env->i->real_data_dir) != 0) {
250 r = toku_get_filesystem_sizes(env->i->real_data_dir, &avail_size, NULL, &total_size);
251 assert(r == 0);
252 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
253 in_red += (avail_size < env_fs_redzone(env, total_size));
254 }
255
256 // get the fs sizes for the log dir if different than the home dir and data dir
257 if (strcmp(env->i->dir, env->i->real_log_dir) != 0 && strcmp(env->i->real_data_dir, env->i->real_log_dir) != 0) {
258 r = toku_get_filesystem_sizes(env->i->real_log_dir, &avail_size, NULL, &total_size);
259 assert(r == 0);
260 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
261 in_red += (avail_size < env_fs_redzone(env, total_size));
262 }
263
264 env->i->fs_seq++; // how many times through this polling loop?
265 uint64_t now = env->i->fs_seq;
266
267 // Don't issue report if we have not been out of this fs_state for a while, unless we're at system startup
268 switch (env->i->fs_state) {
269 case FS_RED:
270 if (!in_red) {
271 if (in_yellow) {
272 env->i->fs_state = FS_YELLOW;
273 } else {
274 env->i->fs_state = FS_GREEN;
275 }
276 }
277 break;
278 case FS_YELLOW:
279 if (in_red) {
280 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
281 env_fs_report_in_red(env);
282 env->i->fs_state = FS_RED;
283 env->i->last_seq_entered_red = now;
284 } else if (!in_yellow) {
285 env->i->fs_state = FS_GREEN;
286 }
287 break;
288 case FS_GREEN:
289 if (in_red) {
290 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
291 env_fs_report_in_red(env);
292 env->i->fs_state = FS_RED;
293 env->i->last_seq_entered_red = now;
294 } else if (in_yellow) {
295 if ((now - env->i->last_seq_entered_yellow > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
296 env_fs_report_in_yellow(env);
297 env->i->fs_state = FS_YELLOW;
298 env->i->last_seq_entered_yellow = now;
299 }
300 break;
301 default:
302 assert(0);
303 }
304 return 0;
305 }
306 #undef ZONEREPORTLIMIT
307
308 static void
env_fs_init(DB_ENV * env)309 env_fs_init(DB_ENV *env) {
310 env->i->fs_state = FS_GREEN;
311 env->i->fs_poll_time = 5; // seconds
312 env->i->redzone = 5; // percent of total space
313 env->i->fs_poller_is_init = false;
314 }
315
316 // Initialize the minicron that polls file system space
317 static int
env_fs_init_minicron(DB_ENV * env)318 env_fs_init_minicron(DB_ENV *env) {
319 if(force_recovery == 6) {
320 return 0;
321 }
322 int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
323 if (r == 0)
324 env->i->fs_poller_is_init = true;
325 return r;
326 }
327
328 // Destroy the file system space minicron
329 static void
env_fs_destroy(DB_ENV * env)330 env_fs_destroy(DB_ENV *env) {
331 if (env->i->fs_poller_is_init) {
332 int r = toku_minicron_shutdown(&env->i->fs_poller);
333 assert(r == 0);
334 env->i->fs_poller_is_init = false;
335 }
336 }
337
338 static int
env_fsync_log_on_minicron(void * arg)339 env_fsync_log_on_minicron(void *arg) {
340 DB_ENV *env = (DB_ENV *) arg;
341 int r = env->log_flush(env, 0);
342 assert(r == 0);
343 return 0;
344 }
345
346 static void
env_fsync_log_init(DB_ENV * env)347 env_fsync_log_init(DB_ENV *env) {
348 env->i->fsync_log_period_ms = 0;
349 env->i->fsync_log_cron_is_init = false;
350 }
351
UU()352 static void UU()
353 env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
354 env->i->fsync_log_period_ms = period_ms;
355 if (env->i->fsync_log_cron_is_init) {
356 toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
357 }
358 }
359
360 static int
env_fsync_log_cron_init(DB_ENV * env)361 env_fsync_log_cron_init(DB_ENV *env) {
362 int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
363 if (r == 0)
364 env->i->fsync_log_cron_is_init = true;
365 return r;
366 }
367
368 static void
env_fsync_log_cron_destroy(DB_ENV * env)369 env_fsync_log_cron_destroy(DB_ENV *env) {
370 if (env->i->fsync_log_cron_is_init) {
371 int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
372 assert(r == 0);
373 env->i->fsync_log_cron_is_init = false;
374 }
375 }
376
377 static void
env_setup_real_dir(DB_ENV * env,char ** real_dir,const char * nominal_dir)378 env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
379 toku_free(*real_dir);
380 *real_dir = NULL;
381
382 assert(env->i->dir);
383 if (nominal_dir)
384 *real_dir = toku_construct_full_name(2, env->i->dir, nominal_dir);
385 else
386 *real_dir = toku_strdup(env->i->dir);
387 }
388
389 static void
env_setup_real_data_dir(DB_ENV * env)390 env_setup_real_data_dir(DB_ENV *env) {
391 env_setup_real_dir(env, &env->i->real_data_dir, env->i->data_dir);
392 }
393
394 static void
env_setup_real_log_dir(DB_ENV * env)395 env_setup_real_log_dir(DB_ENV *env) {
396 env_setup_real_dir(env, &env->i->real_log_dir, env->i->lg_dir);
397 }
398
399 static void
env_setup_real_tmp_dir(DB_ENV * env)400 env_setup_real_tmp_dir(DB_ENV *env) {
401 env_setup_real_dir(env, &env->i->real_tmp_dir, env->i->tmp_dir);
402 }
403
keep_cachetable_callback(DB_ENV * env,CACHETABLE cachetable)404 static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
405 {
406 env->i->cachetable = cachetable;
407 }
408
409 static int
ydb_do_recovery(DB_ENV * env)410 ydb_do_recovery (DB_ENV *env) {
411 assert(env->i->real_log_dir);
412 int r = tokuft_recover(env,
413 toku_keep_prepared_txn_callback,
414 keep_cachetable_callback,
415 env->i->logger,
416 env->i->dir, env->i->real_log_dir, env->i->bt_compare,
417 env->i->update_function,
418 env->i->generate_row_for_put, env->i->generate_row_for_del,
419 env->i->cachetable_size);
420 return r;
421 }
422
423 static int
needs_recovery(DB_ENV * env)424 needs_recovery (DB_ENV *env) {
425 assert(env->i->real_log_dir);
426 int recovery_needed = tokuft_needs_recovery(env->i->real_log_dir, true);
427 return recovery_needed ? DB_RUNRECOVERY : 0;
428 }
429
430 static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, uint32_t flags);
431
432 // Keys used in persistent environment dictionary:
433 // Following keys added in version 12
434 static const char * orig_env_ver_key = "original_version";
435 static const char * curr_env_ver_key = "current_version";
436 // Following keys added in version 14, add more keys for future versions
437 static const char * creation_time_key = "creation_time";
438
get_upgrade_time_key(int version)439 static char * get_upgrade_time_key(int version) {
440 static char upgrade_time_key[sizeof("upgrade_v_time") + 12];
441 {
442 int n;
443 n = snprintf(upgrade_time_key, sizeof(upgrade_time_key), "upgrade_v%d_time", version);
444 assert(n >= 0 && n < (int)sizeof(upgrade_time_key));
445 }
446 return &upgrade_time_key[0];
447 }
448
get_upgrade_footprint_key(int version)449 static char * get_upgrade_footprint_key(int version) {
450 static char upgrade_footprint_key[sizeof("upgrade_v_footprint") + 12];
451 {
452 int n;
453 n = snprintf(upgrade_footprint_key, sizeof(upgrade_footprint_key), "upgrade_v%d_footprint", version);
454 assert(n >= 0 && n < (int)sizeof(upgrade_footprint_key));
455 }
456 return &upgrade_footprint_key[0];
457 }
458
get_upgrade_last_lsn_key(int version)459 static char * get_upgrade_last_lsn_key(int version) {
460 static char upgrade_last_lsn_key[sizeof("upgrade_v_last_lsn") + 12];
461 {
462 int n;
463 n = snprintf(upgrade_last_lsn_key, sizeof(upgrade_last_lsn_key), "upgrade_v%d_last_lsn", version);
464 assert(n >= 0 && n < (int)sizeof(upgrade_last_lsn_key));
465 }
466 return &upgrade_last_lsn_key[0];
467 }
468
469 // Values read from (or written into) persistent environment,
470 // kept here for read-only access from engine status.
471 // Note, persistent_upgrade_status info is separate in part to simplify its exclusion from engine status until relevant.
472 typedef enum {
473 PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION = 0,
474 PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, // read from curr_env_ver_key, prev version as of this startup
475 PERSISTENT_UPGRADE_LAST_LSN_OF_V13,
476 PERSISTENT_UPGRADE_V14_TIME,
477 PERSISTENT_UPGRADE_V14_FOOTPRINT,
478 PERSISTENT_UPGRADE_STATUS_NUM_ROWS
479 } persistent_upgrade_status_entry;
480
481 typedef struct {
482 bool initialized;
483 TOKU_ENGINE_STATUS_ROW_S status[PERSISTENT_UPGRADE_STATUS_NUM_ROWS];
484 } PERSISTENT_UPGRADE_STATUS_S, *PERSISTENT_UPGRADE_STATUS;
485
486 static PERSISTENT_UPGRADE_STATUS_S persistent_upgrade_status;
487
488 #define PERSISTENT_UPGRADE_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(persistent_upgrade_status, k, c, t, "upgrade: " l, inc)
489
490 static void
persistent_upgrade_status_init(void)491 persistent_upgrade_status_init (void) {
492 // Note, this function initializes the keyname, type, and legend fields.
493 // Value fields are initialized to zero by compiler.
494
495 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION, nullptr, UINT64, "original version (at time of environment creation)", TOKU_ENGINE_STATUS);
496 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, nullptr, UINT64, "version at time of startup", TOKU_ENGINE_STATUS);
497 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_LAST_LSN_OF_V13, nullptr, UINT64, "last LSN of version 13", TOKU_ENGINE_STATUS);
498 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_TIME, nullptr, UNIXTIME, "time of upgrade to version 14", TOKU_ENGINE_STATUS);
499 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_FOOTPRINT, nullptr, UINT64, "footprint from version 13 to 14", TOKU_ENGINE_STATUS);
500 persistent_upgrade_status.initialized = true;
501 }
502
503 #define PERSISTENT_UPGRADE_STATUS_VALUE(x) persistent_upgrade_status.status[x].value.num
504
505 // Requires: persistent environment dictionary is already open.
506 // Input arg is lsn of clean shutdown of previous version,
507 // or ZERO_LSN if no upgrade or if crash between log upgrade and here.
508 // NOTE: To maintain compatibility with previous versions, do not change the
509 // format of any information stored in the persistent environment dictionary.
510 // For example, some values are stored as 32 bits, even though they are immediately
511 // converted to 64 bits when read. Do not change them to be stored as 64 bits.
512 //
513 static int
maybe_upgrade_persistent_environment_dictionary(DB_ENV * env,DB_TXN * txn,LSN last_lsn_of_clean_shutdown_read_from_log)514 maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN last_lsn_of_clean_shutdown_read_from_log) {
515 int r;
516 DBT key, val;
517 DB *persistent_environment = env->i->persistent_environment;
518
519 if (!persistent_upgrade_status.initialized)
520 persistent_upgrade_status_init();
521
522 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
523 toku_init_dbt(&val);
524 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
525 assert(r == 0);
526 uint32_t stored_env_version = toku_dtoh32(*(uint32_t*)val.data);
527 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP) = stored_env_version;
528 if (stored_env_version > FT_LAYOUT_VERSION)
529 r = TOKUDB_DICTIONARY_TOO_NEW;
530 else if (stored_env_version < FT_LAYOUT_MIN_SUPPORTED_VERSION)
531 r = TOKUDB_DICTIONARY_TOO_OLD;
532 else if (stored_env_version < FT_LAYOUT_VERSION) {
533 const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
534 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
535 toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
536 r = toku_db_put(persistent_environment, txn, &key, &val, 0, false);
537 assert_zero(r);
538
539 time_t upgrade_time_d = toku_htod64(time(NULL));
540 uint64_t upgrade_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
541 uint64_t upgrade_last_lsn_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
542 for (int version = stored_env_version+1; version <= FT_LAYOUT_VERSION; version++) {
543 uint32_t put_flag = DB_NOOVERWRITE;
544 if (version <= FT_LAYOUT_VERSION_19) {
545 // See #5902.
546 // To prevent a crash (and any higher complexity code) we'll simply
547 // silently not overwrite anything if it exists.
548 // The keys existing for version <= 19 is not necessarily an error.
549 // If this happens for versions > 19 it IS an error and we'll use DB_NOOVERWRITE.
550 put_flag = DB_NOOVERWRITE_NO_ERROR;
551 }
552
553
554 char* upgrade_time_key = get_upgrade_time_key(version);
555 toku_fill_dbt(&key, upgrade_time_key, strlen(upgrade_time_key));
556 toku_fill_dbt(&val, &upgrade_time_d, sizeof(upgrade_time_d));
557 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
558 assert_zero(r);
559
560 char* upgrade_footprint_key = get_upgrade_footprint_key(version);
561 toku_fill_dbt(&key, upgrade_footprint_key, strlen(upgrade_footprint_key));
562 toku_fill_dbt(&val, &upgrade_footprint_d, sizeof(upgrade_footprint_d));
563 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
564 assert_zero(r);
565
566 char* upgrade_last_lsn_key = get_upgrade_last_lsn_key(version);
567 toku_fill_dbt(&key, upgrade_last_lsn_key, strlen(upgrade_last_lsn_key));
568 toku_fill_dbt(&val, &upgrade_last_lsn_d, sizeof(upgrade_last_lsn_d));
569 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
570 assert_zero(r);
571 }
572
573 }
574 return r;
575 }
576
577 // Capture contents of persistent_environment dictionary so that it can be read by engine status
578 static void
capture_persistent_env_contents(DB_ENV * env,DB_TXN * txn)579 capture_persistent_env_contents (DB_ENV * env, DB_TXN * txn) {
580 int r;
581 DBT key, val;
582 DB *persistent_environment = env->i->persistent_environment;
583
584 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
585 toku_init_dbt(&val);
586 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
587 assert_zero(r);
588 uint32_t curr_env_version = toku_dtoh32(*(uint32_t*)val.data);
589 assert(curr_env_version == FT_LAYOUT_VERSION);
590
591 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
592 toku_init_dbt(&val);
593 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
594 assert_zero(r);
595 uint64_t persistent_original_env_version = toku_dtoh32(*(uint32_t*)val.data);
596 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION) = persistent_original_env_version;
597 assert(persistent_original_env_version <= curr_env_version);
598
599 // make no assertions about timestamps, clock may have been reset
600 if (persistent_original_env_version >= FT_LAYOUT_VERSION_14) {
601 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
602 toku_init_dbt(&val);
603 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
604 assert_zero(r);
605 STATUS_VALUE(YDB_LAYER_TIME_CREATION) = toku_dtoh64((*(time_t*)val.data));
606 }
607
608 if (persistent_original_env_version != curr_env_version) {
609 // an upgrade was performed at some time, capture info about the upgrade
610
611 char * last_lsn_key = get_upgrade_last_lsn_key(curr_env_version);
612 toku_fill_dbt(&key, last_lsn_key, strlen(last_lsn_key));
613 toku_init_dbt(&val);
614 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
615 assert_zero(r);
616 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_LAST_LSN_OF_V13) = toku_dtoh64(*(uint64_t*)val.data);
617
618 char * time_key = get_upgrade_time_key(curr_env_version);
619 toku_fill_dbt(&key, time_key, strlen(time_key));
620 toku_init_dbt(&val);
621 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
622 assert_zero(r);
623 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_TIME) = toku_dtoh64(*(time_t*)val.data);
624
625 char * footprint_key = get_upgrade_footprint_key(curr_env_version);
626 toku_fill_dbt(&key, footprint_key, strlen(footprint_key));
627 toku_init_dbt(&val);
628 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
629 assert_zero(r);
630 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_FOOTPRINT) = toku_dtoh64(*(uint64_t*)val.data);
631 }
632
633 }
634
635 // return 0 if log exists or ENOENT if log does not exist
636 static int
ydb_recover_log_exists(DB_ENV * env)637 ydb_recover_log_exists(DB_ENV *env) {
638 int r = tokuft_recover_log_exists(env->i->real_log_dir);
639 return r;
640 }
641
642 // Validate that all required files are present, no side effects.
643 // Return 0 if all is well, ENOENT if some files are present but at least one is
644 // missing,
645 // other non-zero value if some other error occurs.
646 // Set *valid_newenv if creating a new environment (all files missing).
647 // (Note, if special dictionaries exist, then they were created transactionally
648 // and log should exist.)
validate_env(DB_ENV * env,bool * valid_newenv,bool need_rollback_cachefile)649 static int validate_env(DB_ENV *env,
650 bool *valid_newenv,
651 bool need_rollback_cachefile) {
652 int r;
653 bool expect_newenv = false; // set true if we expect to create a new env
654 toku_struct_stat buf;
655 char *path = NULL;
656
657 // Test for persistent environment
658 path = toku_construct_full_name(
659 2, env->i->dir, toku_product_name_strings.environmentdictionary);
660 assert(path);
661 r = toku_stat(path, &buf, toku_uninstrumented);
662 if (r == 0) {
663 expect_newenv = false; // persistent info exists
664 } else {
665 int stat_errno = get_error_errno();
666 if (stat_errno == ENOENT) {
667 expect_newenv = true;
668 r = 0;
669 } else {
670 r = toku_ydb_do_error(
671 env,
672 stat_errno,
673 "Unable to access persistent environment [%s] in [%s]\n",
674 toku_product_name_strings.environmentdictionary,
675 env->i->dir);
676 assert(r);
677 }
678 }
679 toku_free(path);
680
681 // Test for existence of rollback cachefile if it is expected to exist
682 if (r == 0 && need_rollback_cachefile) {
683 path = toku_construct_full_name(
684 2, env->i->dir, toku_product_name_strings.rollback_cachefile);
685 assert(path);
686 r = toku_stat(path, &buf, toku_uninstrumented);
687 if (r == 0) {
688 if (expect_newenv) // rollback cachefile exists, but persistent env
689 // is missing
690 r = toku_ydb_do_error(
691 env,
692 ENOENT,
693 "Persistent environment is missing while looking for "
694 "rollback cachefile [%s] in [%s]\n",
695 toku_product_name_strings.rollback_cachefile, env->i->dir);
696 } else {
697 int stat_errno = get_error_errno();
698 if (stat_errno == ENOENT) {
699 if (!expect_newenv) // rollback cachefile is missing but
700 // persistent env exists
701 r = toku_ydb_do_error(
702 env,
703 ENOENT,
704 "rollback cachefile [%s] is missing from [%s]\n",
705 toku_product_name_strings.rollback_cachefile,
706 env->i->dir);
707 else
708 r = 0; // both rollback cachefile and persistent env are
709 // missing
710 } else {
711 r = toku_ydb_do_error(
712 env,
713 stat_errno,
714 "Unable to access rollback cachefile [%s] in [%s]\n",
715 toku_product_name_strings.rollback_cachefile,
716 env->i->dir);
717 assert(r);
718 }
719 }
720 toku_free(path);
721 }
722
723 // Test for fileops directory
724 if (r == 0 && force_recovery != 6) {
725 path = toku_construct_full_name(
726 2, env->i->dir, toku_product_name_strings.fileopsdirectory);
727 assert(path);
728 r = toku_stat(path, &buf, toku_uninstrumented);
729 if (r == 0) {
730 if (expect_newenv) // fileops directory exists, but persistent env
731 // is missing
732 r = toku_ydb_do_error(
733 env,
734 ENOENT,
735 "Persistent environment is missing while looking for "
736 "fileops directory [%s] in [%s]\n",
737 toku_product_name_strings.fileopsdirectory,
738 env->i->dir);
739 } else {
740 int stat_errno = get_error_errno();
741 if (stat_errno == ENOENT) {
742 if (!expect_newenv) // fileops directory is missing but
743 // persistent env exists
744 r = toku_ydb_do_error(
745 env,
746 ENOENT,
747 "Fileops directory [%s] is missing from [%s]\n",
748 toku_product_name_strings.fileopsdirectory,
749 env->i->dir);
750 else
751 r = 0; // both fileops directory and persistent env are
752 // missing
753 } else {
754 r = toku_ydb_do_error(
755 env,
756 stat_errno,
757 "Unable to access fileops directory [%s] in [%s]\n",
758 toku_product_name_strings.fileopsdirectory,
759 env->i->dir);
760 assert(r);
761 }
762 }
763 toku_free(path);
764 }
765
766 // Test for recovery log
767 if ((r == 0) && (env->i->open_flags & DB_INIT_LOG) && force_recovery != 6) {
768 // if using transactions, test for existence of log
769 r = ydb_recover_log_exists(env); // return 0 or ENOENT
770 if (expect_newenv && (r != ENOENT))
771 r = toku_ydb_do_error(env,
772 ENOENT,
773 "Persistent environment information is "
774 "missing (but log exists) while looking for "
775 "recovery log files in [%s]\n",
776 env->i->real_log_dir);
777 else if (!expect_newenv && r == ENOENT)
778 r = toku_ydb_do_error(env,
779 ENOENT,
780 "Recovery log is missing (persistent "
781 "environment information is present) while "
782 "looking for recovery log files in [%s]\n",
783 env->i->real_log_dir);
784 else
785 r = 0;
786 }
787
788 if (r == 0)
789 *valid_newenv = expect_newenv;
790 else
791 *valid_newenv = false;
792 return r;
793 }
794
795 // The version of the environment (on disk) is the version of the recovery log.
796 // If the recovery log is of the current version, then there is no upgrade to be done.
797 // If the recovery log is of an old version, then replacing it with a new recovery log
798 // of the current version is how the upgrade is done.
799 // Note, the upgrade procedure takes a checkpoint, so we must release the ydb lock.
800 static int
ydb_maybe_upgrade_env(DB_ENV * env,LSN * last_lsn_of_clean_shutdown_read_from_log,bool * upgrade_in_progress)801 ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, bool * upgrade_in_progress) {
802 int r = 0;
803 if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
804 r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
805 }
806 return r;
807 }
808
809 static void
unlock_single_process(DB_ENV * env)810 unlock_single_process(DB_ENV *env) {
811 int r;
812 r = toku_single_process_unlock(&env->i->envdir_lockfd);
813 lazy_assert_zero(r);
814 r = toku_single_process_unlock(&env->i->datadir_lockfd);
815 lazy_assert_zero(r);
816 r = toku_single_process_unlock(&env->i->logdir_lockfd);
817 lazy_assert_zero(r);
818 r = toku_single_process_unlock(&env->i->tmpdir_lockfd);
819 lazy_assert_zero(r);
820 }
821
822 // Open the environment.
823 // If this is a new environment, then create the necessary files.
824 // Return 0 on success, ENOENT if any of the expected necessary files are missing.
825 // (The set of necessary files is defined in the function validate_env() above.)
826 static int
env_open(DB_ENV * env,const char * home,uint32_t flags,int mode)827 env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
828
829 if(force_recovery == 6) {
830 {
831 const int len = strlen(toku_product_name_strings.rollback_cachefile);
832 toku_product_name_strings.rollback_cachefile[len] = '2';
833 toku_product_name_strings.rollback_cachefile[len+1] = 0;
834 }
835
836 {
837 const int len = strlen(toku_product_name_strings.single_process_lock);
838 toku_product_name_strings.single_process_lock[len] = '2';
839 toku_product_name_strings.single_process_lock[len+1] = 0;
840 }
841
842 {
843 const int len = strlen(toku_product_name_strings.environmentdictionary);
844 toku_product_name_strings.environmentdictionary[len] = '2';
845 toku_product_name_strings.environmentdictionary[len+1] = 0;
846 }
847 }
848
849 HANDLE_PANICKED_ENV(env);
850 int r;
851 bool newenv; // true iff creating a new environment
852 uint32_t unused_flags=flags;
853 CHECKPOINTER cp;
854 DB_TXN *txn = NULL;
855
856 if (env_opened(env)) {
857 r = toku_ydb_do_error(env, EINVAL, "The environment is already open\n");
858 goto cleanup;
859 }
860
861 if (env->get_check_thp(env) && toku_os_huge_pages_enabled()) {
862 r = toku_ydb_do_error(env, TOKUDB_HUGE_PAGES_ENABLED,
863 "Huge pages are enabled, disable them before continuing\n");
864 goto cleanup;
865 }
866
867 most_recent_env = NULL;
868
869 assert(sizeof(time_t) == sizeof(uint64_t));
870
871 HANDLE_EXTRA_FLAGS(env, flags,
872 DB_CREATE|DB_PRIVATE|DB_INIT_LOG|DB_INIT_TXN|DB_RECOVER|DB_INIT_MPOOL|DB_INIT_LOCK|DB_THREAD);
873
874 // DB_CREATE means create if env does not exist, and PerconaFT requires it because
875 // PerconaFT requries DB_PRIVATE.
876 if ((flags & DB_PRIVATE) && !(flags & DB_CREATE)) {
877 r = toku_ydb_do_error(env, ENOENT, "DB_PRIVATE requires DB_CREATE (seems gratuitous to us, but that's BDB's behavior\n");
878 goto cleanup;
879 }
880
881 if (!(flags & DB_PRIVATE)) {
882 r = toku_ydb_do_error(env, ENOENT, "PerconaFT requires DB_PRIVATE\n");
883 goto cleanup;
884 }
885
886 if ((flags & DB_INIT_LOG) && !(flags & DB_INIT_TXN)) {
887 r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires transactions for logging\n");
888 goto cleanup;
889 }
890
891 if (!home) home = ".";
892
893 // Verify that the home exists.
894 toku_struct_stat buf;
895 r = toku_stat(home, &buf, toku_uninstrumented);
896 if (r != 0) {
897 int e = get_error_errno();
898 r = toku_ydb_do_error(
899 env, e, "Error from toku_stat(\"%s\",...)\n", home);
900 goto cleanup;
901 }
902 unused_flags &= ~DB_PRIVATE;
903
904 if (env->i->dir) {
905 toku_free(env->i->dir);
906 }
907 env->i->dir = toku_strdup(home);
908 if (env->i->dir == 0) {
909 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
910 goto cleanup;
911 }
912 env->i->open_flags = flags;
913 env->i->open_mode = mode;
914
915 // Instrumentation probe start
916 TOKU_PROBE_START(toku_instr_probe_1);
917
918 env_setup_real_data_dir(env);
919 env_setup_real_log_dir(env);
920 env_setup_real_tmp_dir(env);
921
922 // Instrumentation probe stop
923 toku_instr_probe_1->stop();
924
925 r = toku_single_process_lock(
926 env->i->dir, "environment", &env->i->envdir_lockfd);
927 if (r != 0)
928 goto cleanup;
929 r = toku_single_process_lock(
930 env->i->real_data_dir, "data", &env->i->datadir_lockfd);
931 if (r!=0) goto cleanup;
932 r = toku_single_process_lock(env->i->real_log_dir, "logs", &env->i->logdir_lockfd);
933 if (r!=0) goto cleanup;
934 r = toku_single_process_lock(env->i->real_tmp_dir, "temp", &env->i->tmpdir_lockfd);
935 if (r!=0) goto cleanup;
936
937 bool need_rollback_cachefile;
938 need_rollback_cachefile = false;
939 if (flags & (DB_INIT_TXN | DB_INIT_LOG) && force_recovery != 6) {
940 need_rollback_cachefile = true;
941 }
942
943 ydb_layer_status_init(); // do this before possibly upgrading, so upgrade work is counted in status counters
944
945 LSN last_lsn_of_clean_shutdown_read_from_log;
946 last_lsn_of_clean_shutdown_read_from_log = ZERO_LSN;
947 bool upgrade_in_progress;
948 upgrade_in_progress = false;
949 r = ydb_maybe_upgrade_env(env, &last_lsn_of_clean_shutdown_read_from_log, &upgrade_in_progress);
950 if (r!=0) goto cleanup;
951
952 if (upgrade_in_progress || force_recovery == 6) {
953 // Delete old rollback file. There was a clean shutdown, so it has nothing useful,
954 // and there is no value in upgrading it. It is simpler to just create a new one.
955 char* rollback_filename = toku_construct_full_name(2, env->i->dir, toku_product_name_strings.rollback_cachefile);
956 assert(rollback_filename);
957 r = unlink(rollback_filename);
958 if (r != 0) {
959 assert(get_error_errno() == ENOENT);
960 }
961 toku_free(rollback_filename);
962 need_rollback_cachefile = false; // we're not expecting it to exist now
963 }
964
965 r = validate_env(env, &newenv, need_rollback_cachefile); // make sure that environment is either new or complete
966 if (r != 0) goto cleanup;
967
968 unused_flags &= ~DB_INIT_TXN & ~DB_INIT_LOG;
969
970 if(force_recovery == 6) {
971 flags |= DB_INIT_LOG | DB_INIT_TXN;
972 }
973
974 // do recovery only if there exists a log and recovery is requested
975 // otherwise, a log is created when the logger is opened later
976 if (!newenv && force_recovery == 0) {
977 if (flags & DB_INIT_LOG) {
978 // the log does exist
979 if (flags & DB_RECOVER) {
980 r = ydb_do_recovery(env);
981 if (r != 0) goto cleanup;
982 } else {
983 // the log is required to have clean shutdown if recovery is not requested
984 r = needs_recovery(env);
985 if (r != 0) goto cleanup;
986 }
987 }
988 }
989
990 toku_loader_cleanup_temp_files(env);
991
992 if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
993 assert(env->i->logger);
994 toku_logger_write_log_files(env->i->logger, (bool)((flags & DB_INIT_LOG) != 0));
995 if (!toku_logger_is_open(env->i->logger)) {
996 r = toku_logger_open(env->i->real_log_dir, env->i->logger);
997 if (r!=0) {
998 toku_ydb_do_error(env, r, "Could not open logger\n");
999 }
1000 }
1001 } else {
1002 r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
1003 assert_zero(r);
1004 }
1005
1006 unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
1007 unused_flags &= ~DB_CREATE; // we always do DB_CREATE
1008 unused_flags &= ~DB_INIT_LOCK; // we check this later (e.g. in db->open)
1009 unused_flags &= ~DB_RECOVER;
1010
1011 // This is probably correct, but it will be pain...
1012 // if ((flags & DB_THREAD)==0) {
1013 // r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires DB_THREAD");
1014 // goto cleanup;
1015 // }
1016 unused_flags &= ~DB_THREAD;
1017
1018 if (unused_flags!=0) {
1019 r = toku_ydb_do_error(env, EINVAL, "Extra flags not understood by tokuft: %u\n", unused_flags);
1020 goto cleanup;
1021 }
1022
1023 if (env->i->cachetable==NULL) {
1024 // If we ran recovery then the cachetable should be set here.
1025 r = toku_cachetable_create_ex(&env->i->cachetable, env->i->cachetable_size,
1026 env->i->client_pool_threads,
1027 env->i->cachetable_pool_threads,
1028 env->i->checkpoint_pool_threads,
1029 ZERO_LSN, env->i->logger);
1030 if (r != 0) {
1031 r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
1032 goto cleanup;
1033 }
1034 }
1035
1036 toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
1037
1038 int using_txns;
1039 using_txns = env->i->open_flags & DB_INIT_TXN;
1040 if (env->i->logger) {
1041 // if this is a newborn env or if this is an upgrade, then create a brand new rollback file
1042 assert (using_txns);
1043 toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
1044 if (!toku_logger_rollback_is_open(env->i->logger)) {
1045 bool create_new_rollback_file = newenv | upgrade_in_progress | (force_recovery == 6);
1046 r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
1047 if (r != 0) {
1048 r = toku_ydb_do_error(env, r, "Cant open rollback\n");
1049 goto cleanup;
1050 }
1051 }
1052 }
1053
1054 if (using_txns) {
1055 r = toku_txn_begin(env, 0, &txn, 0);
1056 assert_zero(r);
1057 }
1058
1059 {
1060 r = toku_db_create(&env->i->persistent_environment, env, 0);
1061 assert_zero(r);
1062 r = toku_db_use_builtin_key_cmp(env->i->persistent_environment);
1063 assert_zero(r);
1064 writing_rollback++;
1065 r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
1066 if (r != 0) {
1067 r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
1068 goto cleanup;
1069 }
1070 if (newenv) {
1071 // create new persistent_environment
1072 DBT key, val;
1073 uint32_t persistent_original_env_version = FT_LAYOUT_VERSION;
1074 const uint32_t environment_version = toku_htod32(persistent_original_env_version);
1075
1076 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
1077 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1078 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1079 assert_zero(r);
1080
1081 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
1082 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1083 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1084 assert_zero(r);
1085
1086 time_t creation_time_d = toku_htod64(time(NULL));
1087 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
1088 toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
1089 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1090 assert_zero(r);
1091 }
1092 else {
1093 r = maybe_upgrade_persistent_environment_dictionary(env, txn, last_lsn_of_clean_shutdown_read_from_log);
1094 assert_zero(r);
1095 }
1096 capture_persistent_env_contents(env, txn);
1097 writing_rollback--;
1098 }
1099 {
1100 r = toku_db_create(&env->i->directory, env, 0);
1101 assert_zero(r);
1102 r = toku_db_use_builtin_key_cmp(env->i->directory);
1103 assert_zero(r);
1104 r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
1105 if (r != 0) {
1106 r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
1107 goto cleanup;
1108 }
1109 }
1110 if (using_txns) {
1111 r = locked_txn_commit(txn, 0);
1112 assert_zero(r);
1113 txn = NULL;
1114 }
1115 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1116 if (!force_recovery) {
1117 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
1118 }
1119 writing_rollback--;
1120 env_fs_poller(env); // get the file system state at startup
1121 r = env_fs_init_minicron(env);
1122 if (r != 0) {
1123 r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
1124 goto cleanup;
1125 }
1126 r = env_fsync_log_cron_init(env);
1127 if (r != 0) {
1128 r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
1129 goto cleanup;
1130 }
1131 cleanup:
1132 if (r!=0) {
1133 if (txn) {
1134 locked_txn_abort(txn);
1135 }
1136 if (env && env->i) {
1137 unlock_single_process(env);
1138 }
1139 }
1140 if (r == 0) {
1141 set_errno(0); // tabula rasa. If there's a crash after env was successfully opened, no misleading errno will have been left around by this code.
1142 most_recent_env = env;
1143 uint64_t num_rows;
1144 env_get_engine_status_num_rows(env, &num_rows);
1145 toku_assert_set_fpointers(toku_maybe_get_engine_status_text, toku_maybe_err_engine_status, toku_maybe_set_env_panic, num_rows);
1146 }
1147 return r;
1148 }
1149
1150 static int
env_close(DB_ENV * env,uint32_t flags)1151 env_close(DB_ENV * env, uint32_t flags) {
1152 int r = 0;
1153 const char * err_msg = NULL;
1154 bool clean_shutdown = true;
1155
1156 if (flags & TOKUFT_DIRTY_SHUTDOWN) {
1157 clean_shutdown = false;
1158 flags &= ~TOKUFT_DIRTY_SHUTDOWN;
1159 }
1160
1161 most_recent_env = NULL; // Set most_recent_env to NULL so that we don't have a dangling pointer (and if there's an error, the toku assert code would try to look at the env.)
1162
1163 // if panicked, or if any open transactions, or any open dbs, then do nothing.
1164
1165 if (toku_env_is_panicked(env)) {
1166 goto panic_and_quit_early;
1167 }
1168 if (env->i->logger && toku_logger_txns_exist(env->i->logger)) {
1169 err_msg = "Cannot close environment due to open transactions\n";
1170 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1171 goto panic_and_quit_early;
1172 }
1173 if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
1174 if (env->i->open_dbs_by_dname->size() > 0) {
1175 err_msg = "Cannot close environment due to open DBs\n";
1176 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1177 goto panic_and_quit_early;
1178 }
1179 }
1180 if (env->i->persistent_environment) {
1181 r = toku_db_close(env->i->persistent_environment);
1182 if (r) {
1183 err_msg = "Cannot close persistent environment dictionary (DB->close error)\n";
1184 toku_ydb_do_error(env, r, "%s", err_msg);
1185 goto panic_and_quit_early;
1186 }
1187 }
1188 if (env->i->directory) {
1189 r = toku_db_close(env->i->directory);
1190 if (r) {
1191 err_msg = "Cannot close Directory dictionary (DB->close error)\n";
1192 toku_ydb_do_error(env, r, "%s", err_msg);
1193 goto panic_and_quit_early;
1194 }
1195 }
1196 env_fsync_log_cron_destroy(env);
1197 if (env->i->cachetable) {
1198 toku_cachetable_prepare_close(env->i->cachetable);
1199 toku_cachetable_minicron_shutdown(env->i->cachetable);
1200 if (env->i->logger) {
1201 CHECKPOINTER cp = nullptr;
1202 if (clean_shutdown) {
1203 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1204 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1205 if (r) {
1206 err_msg = "Cannot close environment (error during checkpoint)\n";
1207 toku_ydb_do_error(env, r, "%s", err_msg);
1208 goto panic_and_quit_early;
1209 }
1210 }
1211 toku_logger_close_rollback_check_empty(env->i->logger, clean_shutdown);
1212 if (clean_shutdown) {
1213 //Do a second checkpoint now that the rollback cachefile is closed.
1214 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1215 if (r) {
1216 err_msg = "Cannot close environment (error during checkpoint)\n";
1217 toku_ydb_do_error(env, r, "%s", err_msg);
1218 goto panic_and_quit_early;
1219 }
1220 toku_logger_shutdown(env->i->logger);
1221 }
1222 }
1223 toku_cachetable_close(&env->i->cachetable);
1224 }
1225 if (env->i->logger) {
1226 r = toku_logger_close(&env->i->logger);
1227 if (r) {
1228 err_msg = "Cannot close environment (logger close error)\n";
1229 env->i->logger = NULL;
1230 toku_ydb_do_error(env, r, "%s", err_msg);
1231 goto panic_and_quit_early;
1232 }
1233 }
1234 // Even if nothing else went wrong, but we were panicked, then raise an error.
1235 // But if something else went wrong then raise that error (above)
1236 if (toku_env_is_panicked(env)) {
1237 goto panic_and_quit_early;
1238 } else {
1239 assert(env->i->panic_string == 0);
1240 }
1241
1242 env_fs_destroy(env);
1243 env->i->ltm.destroy();
1244 if (env->i->data_dir)
1245 toku_free(env->i->data_dir);
1246 if (env->i->lg_dir)
1247 toku_free(env->i->lg_dir);
1248 if (env->i->tmp_dir)
1249 toku_free(env->i->tmp_dir);
1250 if (env->i->real_data_dir)
1251 toku_free(env->i->real_data_dir);
1252 if (env->i->real_log_dir)
1253 toku_free(env->i->real_log_dir);
1254 if (env->i->real_tmp_dir)
1255 toku_free(env->i->real_tmp_dir);
1256 if (env->i->open_dbs_by_dname) {
1257 env->i->open_dbs_by_dname->destroy();
1258 toku_free(env->i->open_dbs_by_dname);
1259 }
1260 if (env->i->open_dbs_by_dict_id) {
1261 env->i->open_dbs_by_dict_id->destroy();
1262 toku_free(env->i->open_dbs_by_dict_id);
1263 }
1264 if (env->i->dir)
1265 toku_free(env->i->dir);
1266 toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
1267
1268 // Immediately before freeing internal environment unlock the directories
1269 unlock_single_process(env);
1270 toku_free(env->i);
1271 toku_free(env);
1272 toku_sync_fetch_and_add(&tokuft_num_envs, -1);
1273 if (flags != 0) {
1274 r = EINVAL;
1275 }
1276 return r;
1277
1278 panic_and_quit_early:
1279 //release lock files.
1280 unlock_single_process(env);
1281 //r is the panic error
1282 if (toku_env_is_panicked(env)) {
1283 char *panic_string = env->i->panic_string;
1284 r = toku_ydb_do_error(env, toku_env_is_panicked(env), "Cannot close environment due to previous error: %s\n", panic_string);
1285 }
1286 else {
1287 env_panic(env, r, err_msg);
1288 }
1289 return r;
1290 }
1291
1292 static int
env_log_archive(DB_ENV * env,char ** list[],uint32_t flags)1293 env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
1294 return toku_logger_log_archive(env->i->logger, list, flags);
1295 }
1296
1297 static int
env_log_flush(DB_ENV * env,const DB_LSN * lsn)1298 env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
1299 HANDLE_PANICKED_ENV(env);
1300 // do nothing if no logger
1301 if (env->i->logger) {
1302 // We just flush everything. MySQL uses lsn == 0 which means flush everything.
1303 // For anyone else using the log, it is correct to flush too much, so we are OK.
1304 toku_logger_fsync(env->i->logger);
1305 }
1306 return 0;
1307 }
1308
1309 static int
env_set_cachesize(DB_ENV * env,uint32_t gbytes,uint32_t bytes,int ncache)1310 env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
1311 HANDLE_PANICKED_ENV(env);
1312 if (ncache != 1) {
1313 return EINVAL;
1314 }
1315 uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
1316 unsigned long cs = cs64;
1317 if (cs64 > cs) {
1318 return EINVAL;
1319 }
1320 env->i->cachetable_size = cs;
1321 return 0;
1322 }
1323
1324 static int
env_set_client_pool_threads(DB_ENV * env,uint32_t threads)1325 env_set_client_pool_threads(DB_ENV * env, uint32_t threads) {
1326 HANDLE_PANICKED_ENV(env);
1327 env->i->client_pool_threads = threads;
1328 return 0;
1329 }
1330
1331 static int
env_set_cachetable_pool_threads(DB_ENV * env,uint32_t threads)1332 env_set_cachetable_pool_threads(DB_ENV * env, uint32_t threads) {
1333 HANDLE_PANICKED_ENV(env);
1334 env->i->cachetable_pool_threads = threads;
1335 return 0;
1336 }
1337
1338 static int
env_set_checkpoint_pool_threads(DB_ENV * env,uint32_t threads)1339 env_set_checkpoint_pool_threads(DB_ENV * env, uint32_t threads) {
1340 HANDLE_PANICKED_ENV(env);
1341 env->i->checkpoint_pool_threads = threads;
1342 return 0;
1343 }
1344
1345 static void
env_set_check_thp(DB_ENV * env,bool new_val)1346 env_set_check_thp(DB_ENV * env, bool new_val) {
1347 assert(env);
1348 env->i->check_thp = new_val;
1349 }
1350
1351 static bool
env_get_check_thp(DB_ENV * env)1352 env_get_check_thp(DB_ENV * env) {
1353 assert(env);
1354 return env->i->check_thp;
1355 }
1356
env_set_dir_per_db(DB_ENV * env,bool new_val)1357 static bool env_set_dir_per_db(DB_ENV *env, bool new_val) {
1358 HANDLE_PANICKED_ENV(env);
1359 bool r = env->i->dir_per_db;
1360 env->i->dir_per_db = new_val;
1361 return r;
1362 }
1363
env_get_dir_per_db(DB_ENV * env)1364 static bool env_get_dir_per_db(DB_ENV *env) {
1365 HANDLE_PANICKED_ENV(env);
1366 return env->i->dir_per_db;
1367 }
1368
env_get_data_dir(DB_ENV * env)1369 static const char *env_get_data_dir(DB_ENV *env) {
1370 return env->i->real_data_dir;
1371 }
1372
env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1373 static int env_dirtool_attach(DB_ENV *env,
1374 DB_TXN *txn,
1375 const char *dname,
1376 const char *iname) {
1377 int r;
1378 DBT dname_dbt;
1379 DBT iname_dbt;
1380
1381 HANDLE_PANICKED_ENV(env);
1382 if (!env_opened(env)) {
1383 return EINVAL;
1384 }
1385 HANDLE_READ_ONLY_TXN(txn);
1386 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1387 toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
1388
1389 r = toku_db_put(env->i->directory,
1390 txn,
1391 &dname_dbt,
1392 &iname_dbt,
1393 0,
1394 true);
1395 return r;
1396 }
1397
env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1398 static int env_dirtool_detach(DB_ENV *env,
1399 DB_TXN *txn,
1400 const char *dname) {
1401 int r;
1402 DBT dname_dbt;
1403 DBT old_iname_dbt;
1404
1405 HANDLE_PANICKED_ENV(env);
1406 if (!env_opened(env)) {
1407 return EINVAL;
1408 }
1409 HANDLE_READ_ONLY_TXN(txn);
1410
1411 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1412 toku_init_dbt_flags(&old_iname_dbt, DB_DBT_REALLOC);
1413
1414 r = toku_db_get(env->i->directory,
1415 txn,
1416 &dname_dbt,
1417 &old_iname_dbt,
1418 DB_SERIALIZABLE); // allocates memory for iname
1419 if (r == DB_NOTFOUND)
1420 return EEXIST;
1421 toku_free(old_iname_dbt.data);
1422
1423 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
1424
1425 return r;
1426 }
1427
env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1428 static int env_dirtool_move(DB_ENV *env,
1429 DB_TXN *txn,
1430 const char *old_dname,
1431 const char *new_dname) {
1432 int r;
1433 DBT old_dname_dbt;
1434 DBT new_dname_dbt;
1435 DBT iname_dbt;
1436
1437 HANDLE_PANICKED_ENV(env);
1438 if (!env_opened(env)) {
1439 return EINVAL;
1440 }
1441 HANDLE_READ_ONLY_TXN(txn);
1442
1443 toku_fill_dbt(&old_dname_dbt, old_dname, strlen(old_dname) + 1);
1444 toku_fill_dbt(&new_dname_dbt, new_dname, strlen(new_dname) + 1);
1445 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
1446
1447 r = toku_db_get(env->i->directory,
1448 txn,
1449 &old_dname_dbt,
1450 &iname_dbt,
1451 DB_SERIALIZABLE); // allocates memory for iname
1452 if (r == DB_NOTFOUND)
1453 return EEXIST;
1454
1455 r = toku_db_del(
1456 env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
1457 if (r != 0)
1458 goto exit;
1459
1460 r = toku_db_put(
1461 env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, true);
1462
1463 exit:
1464 toku_free(iname_dbt.data);
1465 return r;
1466 }
1467
locked_env_op(DB_ENV * env,DB_TXN * txn,std::function<int (DB_TXN *)> f)1468 static int locked_env_op(DB_ENV *env,
1469 DB_TXN *txn,
1470 std::function<int(DB_TXN *)> f) {
1471 int ret, r;
1472 HANDLE_READ_ONLY_TXN(txn);
1473 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1474
1475 DB_TXN *child_txn = NULL;
1476 int using_txns = env->i->open_flags & DB_INIT_TXN;
1477 if (using_txns) {
1478 ret = toku_txn_begin(env, txn, &child_txn, 0);
1479 lazy_assert_zero(ret);
1480 }
1481
1482 // cannot begin a checkpoint
1483 toku_multi_operation_client_lock();
1484 r = f(child_txn);
1485 toku_multi_operation_client_unlock();
1486
1487 if (using_txns) {
1488 if (r == 0) {
1489 ret = locked_txn_commit(child_txn, 0);
1490 lazy_assert_zero(ret);
1491 } else {
1492 ret = locked_txn_abort(child_txn);
1493 lazy_assert_zero(ret);
1494 }
1495 }
1496 return r;
1497
1498 }
1499
locked_env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1500 static int locked_env_dirtool_attach(DB_ENV *env,
1501 DB_TXN *txn,
1502 const char *dname,
1503 const char *iname) {
1504 auto f = std::bind(
1505 env_dirtool_attach, env, std::placeholders::_1, dname, iname);
1506 return locked_env_op(env, txn, f);
1507 }
1508
locked_env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1509 static int locked_env_dirtool_detach(DB_ENV *env,
1510 DB_TXN *txn,
1511 const char *dname) {
1512 auto f = std::bind(
1513 env_dirtool_detach, env, std::placeholders::_1, dname);
1514 return locked_env_op(env, txn, f);
1515 }
1516
locked_env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1517 static int locked_env_dirtool_move(DB_ENV *env,
1518 DB_TXN *txn,
1519 const char *old_dname,
1520 const char *new_dname) {
1521 auto f = std::bind(
1522 env_dirtool_move, env, std::placeholders::_1, old_dname, new_dname);
1523 return locked_env_op(env, txn, f);
1524 }
1525
1526 static int env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags);
1527
1528 static int
locked_env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)1529 locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
1530 int ret, r;
1531 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1532 HANDLE_READ_ONLY_TXN(txn);
1533
1534 DB_TXN *child_txn = NULL;
1535 int using_txns = env->i->open_flags & DB_INIT_TXN;
1536 if (using_txns) {
1537 ret = toku_txn_begin(env, txn, &child_txn, 0);
1538 lazy_assert_zero(ret);
1539 }
1540
1541 // cannot begin a checkpoint
1542 toku_multi_operation_client_lock();
1543 r = env_dbremove(env, child_txn, fname, dbname, flags);
1544 toku_multi_operation_client_unlock();
1545
1546 if (using_txns) {
1547 if (r == 0) {
1548 ret = locked_txn_commit(child_txn, 0);
1549 lazy_assert_zero(ret);
1550 } else {
1551 ret = locked_txn_abort(child_txn);
1552 lazy_assert_zero(ret);
1553 }
1554 }
1555 return r;
1556 }
1557
1558 static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags);
1559
1560 static int
locked_env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)1561 locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
1562 int ret, r;
1563 HANDLE_READ_ONLY_TXN(txn);
1564 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1565
1566 DB_TXN *child_txn = NULL;
1567 int using_txns = env->i->open_flags & DB_INIT_TXN;
1568 if (using_txns) {
1569 ret = toku_txn_begin(env, txn, &child_txn, 0);
1570 lazy_assert_zero(ret);
1571 }
1572
1573 // cannot begin a checkpoint
1574 toku_multi_operation_client_lock();
1575 r = env_dbrename(env, child_txn, fname, dbname, newname, flags);
1576 toku_multi_operation_client_unlock();
1577
1578 if (using_txns) {
1579 if (r == 0) {
1580 ret = locked_txn_commit(child_txn, 0);
1581 lazy_assert_zero(ret);
1582 } else {
1583 ret = locked_txn_abort(child_txn);
1584 lazy_assert_zero(ret);
1585 }
1586 }
1587 return r;
1588 }
1589
1590 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
1591
1592 static int
env_get_cachesize(DB_ENV * env,uint32_t * gbytes,uint32_t * bytes,int * ncache)1593 env_get_cachesize(DB_ENV * env, uint32_t *gbytes, uint32_t *bytes, int *ncache) {
1594 HANDLE_PANICKED_ENV(env);
1595 *gbytes = env->i->cachetable_size >> 30;
1596 *bytes = env->i->cachetable_size & ((1<<30)-1);
1597 *ncache = 1;
1598 return 0;
1599 }
1600
1601 #endif
1602
1603 static int
env_set_data_dir(DB_ENV * env,const char * dir)1604 env_set_data_dir(DB_ENV * env, const char *dir) {
1605 HANDLE_PANICKED_ENV(env);
1606 int r;
1607
1608 if (env_opened(env) || !dir) {
1609 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
1610 }
1611 else if (env->i->data_dir)
1612 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir more than once.\n");
1613 else {
1614 env->i->data_dir = toku_strdup(dir);
1615 if (env->i->data_dir==NULL) {
1616 assert(get_error_errno() == ENOMEM);
1617 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1618 }
1619 else r = 0;
1620 }
1621 return r;
1622 }
1623
1624 static void
env_set_errcall(DB_ENV * env,toku_env_errcall_t errcall)1625 env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
1626 env->i->errcall = errcall;
1627 }
1628
1629 static void
env_set_errfile(DB_ENV * env,FILE * errfile)1630 env_set_errfile(DB_ENV*env, FILE*errfile) {
1631 env->i->errfile = errfile;
1632 }
1633
1634 static void
env_set_errpfx(DB_ENV * env,const char * errpfx)1635 env_set_errpfx(DB_ENV * env, const char *errpfx) {
1636 env->i->errpfx = errpfx;
1637 }
1638
1639 static int
env_set_flags(DB_ENV * env,uint32_t flags,int onoff)1640 env_set_flags(DB_ENV * env, uint32_t flags, int onoff) {
1641 HANDLE_PANICKED_ENV(env);
1642
1643 uint32_t change = 0;
1644 if (flags & DB_AUTO_COMMIT) {
1645 change |= DB_AUTO_COMMIT;
1646 flags &= ~DB_AUTO_COMMIT;
1647 }
1648 if (flags != 0 && onoff) {
1649 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
1650 }
1651 if (onoff) env->i->open_flags |= change;
1652 else env->i->open_flags &= ~change;
1653 return 0;
1654 }
1655
1656 static int
env_set_lg_bsize(DB_ENV * env,uint32_t bsize)1657 env_set_lg_bsize(DB_ENV * env, uint32_t bsize) {
1658 HANDLE_PANICKED_ENV(env);
1659 return toku_logger_set_lg_bsize(env->i->logger, bsize);
1660 }
1661
1662 static int
env_set_lg_dir(DB_ENV * env,const char * dir)1663 env_set_lg_dir(DB_ENV * env, const char *dir) {
1664 HANDLE_PANICKED_ENV(env);
1665 if (env_opened(env)) {
1666 return toku_ydb_do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
1667 }
1668
1669 if (env->i->lg_dir) toku_free(env->i->lg_dir);
1670 if (dir) {
1671 env->i->lg_dir = toku_strdup(dir);
1672 if (!env->i->lg_dir) {
1673 return toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1674 }
1675 }
1676 else env->i->lg_dir = NULL;
1677 return 0;
1678 }
1679
1680 static int
env_set_lg_max(DB_ENV * env,uint32_t lg_max)1681 env_set_lg_max(DB_ENV * env, uint32_t lg_max) {
1682 HANDLE_PANICKED_ENV(env);
1683 return toku_logger_set_lg_max(env->i->logger, lg_max);
1684 }
1685
1686 static int
env_get_lg_max(DB_ENV * env,uint32_t * lg_maxp)1687 env_get_lg_max(DB_ENV * env, uint32_t *lg_maxp) {
1688 HANDLE_PANICKED_ENV(env);
1689 return toku_logger_get_lg_max(env->i->logger, lg_maxp);
1690 }
1691
1692 static int
env_set_lk_detect(DB_ENV * env,uint32_t UU (detect))1693 env_set_lk_detect(DB_ENV * env, uint32_t UU(detect)) {
1694 HANDLE_PANICKED_ENV(env);
1695 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support set_lk_detect\n");
1696 }
1697
1698 static int
env_set_lk_max_memory(DB_ENV * env,uint64_t lock_memory_limit)1699 env_set_lk_max_memory(DB_ENV *env, uint64_t lock_memory_limit) {
1700 HANDLE_PANICKED_ENV(env);
1701 int r = 0;
1702 if (env_opened(env)) {
1703 r = EINVAL;
1704 } else {
1705 r = env->i->ltm.set_max_lock_memory(lock_memory_limit);
1706 }
1707 return r;
1708 }
1709
1710 static int
env_get_lk_max_memory(DB_ENV * env,uint64_t * lk_maxp)1711 env_get_lk_max_memory(DB_ENV *env, uint64_t *lk_maxp) {
1712 HANDLE_PANICKED_ENV(env);
1713 uint32_t max_lock_memory = env->i->ltm.get_max_lock_memory();
1714 *lk_maxp = max_lock_memory;
1715 return 0;
1716 }
1717
1718 //void toku__env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
1719 // env->i->noticecall = noticecall;
1720 //}
1721
1722 static int
env_set_tmp_dir(DB_ENV * env,const char * tmp_dir)1723 env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
1724 HANDLE_PANICKED_ENV(env);
1725 if (env_opened(env)) {
1726 return toku_ydb_do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
1727 }
1728 if (!tmp_dir) {
1729 return toku_ydb_do_error(env, EINVAL, "Tmp dir bust be non-null\n");
1730 }
1731 if (env->i->tmp_dir)
1732 toku_free(env->i->tmp_dir);
1733 env->i->tmp_dir = toku_strdup(tmp_dir);
1734 return env->i->tmp_dir ? 0 : ENOMEM;
1735 }
1736
1737 static int
env_set_verbose(DB_ENV * env,uint32_t UU (which),int UU (onoff))1738 env_set_verbose(DB_ENV * env, uint32_t UU(which), int UU(onoff)) {
1739 HANDLE_PANICKED_ENV(env);
1740 return 1;
1741 }
1742
1743 static int
toku_env_txn_checkpoint(DB_ENV * env,uint32_t kbyte,uint32_t min,uint32_t flags)1744 toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte __attribute__((__unused__)), uint32_t min __attribute__((__unused__)), uint32_t flags __attribute__((__unused__))) {
1745 CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1746 int r = toku_checkpoint(cp, env->i->logger,
1747 checkpoint_callback_f, checkpoint_callback_extra,
1748 checkpoint_callback2_f, checkpoint_callback2_extra,
1749 CLIENT_CHECKPOINT);
1750 if (r) {
1751 // Panicking the whole environment may be overkill, but I'm not sure what else to do.
1752 env_panic(env, r, "checkpoint error\n");
1753 toku_ydb_do_error(env, r, "Checkpoint\n");
1754 }
1755 return r;
1756 }
1757
1758 static int
env_txn_stat(DB_ENV * env,DB_TXN_STAT ** UU (statp),uint32_t UU (flags))1759 env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), uint32_t UU(flags)) {
1760 HANDLE_PANICKED_ENV(env);
1761 return 1;
1762 }
1763
1764 //
1765 // We can assume the client calls this function right after recovery
1766 // to return a list of prepared transactions to the user. When called,
1767 // we can assume that no other work is being done in the system,
1768 // as we are in the state of being after recovery,
1769 // but before client operations should commence
1770 //
1771 static int
env_txn_xa_recover(DB_ENV * env,TOKU_XA_XID xids[],long count,long * retp,uint32_t flags)1772 env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1773 struct tokulogger_preplist *MALLOC_N(count,preps);
1774 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1775 if (r==0) {
1776 assert(*retp<=count);
1777 for (int i=0; i<*retp; i++) {
1778 xids[i] = preps[i].xid;
1779 }
1780 }
1781 toku_free(preps);
1782 return r;
1783 }
1784
1785 //
1786 // We can assume the client calls this function right after recovery
1787 // to return a list of prepared transactions to the user. When called,
1788 // we can assume that no other work is being done in the system,
1789 // as we are in the state of being after recovery,
1790 // but before client operations should commence
1791 //
1792 static int
env_txn_recover(DB_ENV * env,DB_PREPLIST preplist[],long count,long * retp,uint32_t flags)1793 env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1794 struct tokulogger_preplist *MALLOC_N(count,preps);
1795 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1796 if (r==0) {
1797 assert(*retp<=count);
1798 for (int i=0; i<*retp; i++) {
1799 preplist[i].txn = preps[i].txn;
1800 memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
1801 }
1802 }
1803 toku_free(preps);
1804 return r;
1805 }
1806
1807 static int
env_get_txn_from_xid(DB_ENV * env,TOKU_XA_XID * xid,DB_TXN ** txnp)1808 env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
1809 return toku_txn_manager_get_root_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
1810 }
1811
1812 static int
env_checkpointing_set_period(DB_ENV * env,uint32_t seconds)1813 env_checkpointing_set_period(DB_ENV * env, uint32_t seconds) {
1814 HANDLE_PANICKED_ENV(env);
1815 int r = 0;
1816 if (!env_opened(env)) {
1817 r = EINVAL;
1818 } else {
1819 toku_set_checkpoint_period(env->i->cachetable, seconds);
1820 }
1821 return r;
1822 }
1823
1824 static int
env_cleaner_set_period(DB_ENV * env,uint32_t seconds)1825 env_cleaner_set_period(DB_ENV * env, uint32_t seconds) {
1826 HANDLE_PANICKED_ENV(env);
1827 int r = 0;
1828 if (!env_opened(env)) {
1829 r = EINVAL;
1830 } else {
1831 toku_set_cleaner_period(env->i->cachetable, seconds);
1832 }
1833 return r;
1834 }
1835
1836 static int
env_cleaner_set_iterations(DB_ENV * env,uint32_t iterations)1837 env_cleaner_set_iterations(DB_ENV * env, uint32_t iterations) {
1838 HANDLE_PANICKED_ENV(env);
1839 int r = 0;
1840 if (!env_opened(env)) {
1841 r = EINVAL;
1842 } else {
1843 toku_set_cleaner_iterations(env->i->cachetable, iterations);
1844 }
1845 return r;
1846 }
1847
1848 static int
env_create_loader(DB_ENV * env,DB_TXN * txn,DB_LOADER ** blp,DB * src_db,int N,DB * dbs[],uint32_t db_flags[],uint32_t dbt_flags[],uint32_t loader_flags)1849 env_create_loader(DB_ENV *env,
1850 DB_TXN *txn,
1851 DB_LOADER **blp,
1852 DB *src_db,
1853 int N,
1854 DB *dbs[],
1855 uint32_t db_flags[/*N*/],
1856 uint32_t dbt_flags[/*N*/],
1857 uint32_t loader_flags) {
1858 int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
1859 return r;
1860 }
1861
1862 static int
env_checkpointing_get_period(DB_ENV * env,uint32_t * seconds)1863 env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
1864 HANDLE_PANICKED_ENV(env);
1865 int r = 0;
1866 if (!env_opened(env)) r = EINVAL;
1867 else
1868 *seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
1869 return r;
1870 }
1871
1872 static int
env_cleaner_get_period(DB_ENV * env,uint32_t * seconds)1873 env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
1874 HANDLE_PANICKED_ENV(env);
1875 int r = 0;
1876 if (!env_opened(env)) r = EINVAL;
1877 else
1878 *seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
1879 return r;
1880 }
1881
1882 static int
env_cleaner_get_iterations(DB_ENV * env,uint32_t * iterations)1883 env_cleaner_get_iterations(DB_ENV * env, uint32_t *iterations) {
1884 HANDLE_PANICKED_ENV(env);
1885 int r = 0;
1886 if (!env_opened(env)) r = EINVAL;
1887 else
1888 *iterations = toku_get_cleaner_iterations(env->i->cachetable);
1889 return r;
1890 }
1891
1892 static int
env_evictor_set_enable_partial_eviction(DB_ENV * env,bool enabled)1893 env_evictor_set_enable_partial_eviction(DB_ENV* env, bool enabled) {
1894 HANDLE_PANICKED_ENV(env);
1895 int r = 0;
1896 if (!env_opened(env)) r = EINVAL;
1897 else toku_set_enable_partial_eviction(env->i->cachetable, enabled);
1898 return r;
1899 }
1900
1901 static int
env_evictor_get_enable_partial_eviction(DB_ENV * env,bool * enabled)1902 env_evictor_get_enable_partial_eviction(DB_ENV* env, bool *enabled) {
1903 HANDLE_PANICKED_ENV(env);
1904 int r = 0;
1905 if (!env_opened(env)) r = EINVAL;
1906 else *enabled = toku_get_enable_partial_eviction(env->i->cachetable);
1907 return r;
1908 }
1909
1910 static int
env_checkpointing_postpone(DB_ENV * env)1911 env_checkpointing_postpone(DB_ENV * env) {
1912 HANDLE_PANICKED_ENV(env);
1913 int r = 0;
1914 if (!env_opened(env)) r = EINVAL;
1915 else toku_checkpoint_safe_client_lock();
1916 return r;
1917 }
1918
1919 static int
env_checkpointing_resume(DB_ENV * env)1920 env_checkpointing_resume(DB_ENV * env) {
1921 HANDLE_PANICKED_ENV(env);
1922 int r = 0;
1923 if (!env_opened(env)) r = EINVAL;
1924 else toku_checkpoint_safe_client_unlock();
1925 return r;
1926 }
1927
1928 static int
env_checkpointing_begin_atomic_operation(DB_ENV * env)1929 env_checkpointing_begin_atomic_operation(DB_ENV * env) {
1930 HANDLE_PANICKED_ENV(env);
1931 int r = 0;
1932 if (!env_opened(env)) r = EINVAL;
1933 else toku_multi_operation_client_lock();
1934 return r;
1935 }
1936
1937 static int
env_checkpointing_end_atomic_operation(DB_ENV * env)1938 env_checkpointing_end_atomic_operation(DB_ENV * env) {
1939 HANDLE_PANICKED_ENV(env);
1940 int r = 0;
1941 if (!env_opened(env)) r = EINVAL;
1942 else toku_multi_operation_client_unlock();
1943 return r;
1944 }
1945
1946 static int
env_set_default_bt_compare(DB_ENV * env,int (* bt_compare)(DB *,const DBT *,const DBT *))1947 env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1948 HANDLE_PANICKED_ENV(env);
1949 int r = 0;
1950 if (env_opened(env)) r = EINVAL;
1951 else {
1952 env->i->bt_compare = bt_compare;
1953 }
1954 return r;
1955 }
1956
1957 static void
env_set_update(DB_ENV * env,int (* update_function)(DB *,const DBT * key,const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra))1958 env_set_update (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
1959 env->i->update_function = update_function;
1960 }
1961
1962 static int
env_set_generate_row_callback_for_put(DB_ENV * env,generate_row_for_put_func generate_row_for_put)1963 env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
1964 HANDLE_PANICKED_ENV(env);
1965 int r = 0;
1966 if (env_opened(env)) r = EINVAL;
1967 else {
1968 env->i->generate_row_for_put = generate_row_for_put;
1969 }
1970 return r;
1971 }
1972
1973 static int
env_set_generate_row_callback_for_del(DB_ENV * env,generate_row_for_del_func generate_row_for_del)1974 env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
1975 HANDLE_PANICKED_ENV(env);
1976 int r = 0;
1977 if (env_opened(env)) r = EINVAL;
1978 else {
1979 env->i->generate_row_for_del = generate_row_for_del;
1980 }
1981 return r;
1982 }
1983 static int
env_set_redzone(DB_ENV * env,int redzone)1984 env_set_redzone(DB_ENV *env, int redzone) {
1985 HANDLE_PANICKED_ENV(env);
1986 int r;
1987 if (env_opened(env))
1988 r = EINVAL;
1989 else {
1990 env->i->redzone = redzone;
1991 r = 0;
1992 }
1993 return r;
1994 }
1995
env_get_lock_timeout(DB_ENV * env,uint64_t * lock_timeout_msec)1996 static int env_get_lock_timeout(DB_ENV *env, uint64_t *lock_timeout_msec) {
1997 uint64_t t = env->i->default_lock_timeout_msec;
1998 if (env->i->get_lock_timeout_callback)
1999 t = env->i->get_lock_timeout_callback(t);
2000 *lock_timeout_msec = t;
2001 return 0;
2002 }
2003
env_set_lock_timeout(DB_ENV * env,uint64_t default_lock_timeout_msec,uint64_t (* get_lock_timeout_callback)(uint64_t default_lock_timeout_msec))2004 static int env_set_lock_timeout(DB_ENV *env, uint64_t default_lock_timeout_msec, uint64_t (*get_lock_timeout_callback)(uint64_t default_lock_timeout_msec)) {
2005 env->i->default_lock_timeout_msec = default_lock_timeout_msec;
2006 env->i->get_lock_timeout_callback = get_lock_timeout_callback;
2007 return 0;
2008 }
2009
2010 static int
env_set_lock_timeout_callback(DB_ENV * env,lock_timeout_callback callback)2011 env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
2012 env->i->lock_wait_timeout_callback = callback;
2013 return 0;
2014 }
2015
2016 static int
env_set_lock_wait_callback(DB_ENV * env,lock_wait_callback callback)2017 env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
2018 env->i->lock_wait_needed_callback = callback;
2019 return 0;
2020 }
2021
2022 static void
format_time(const time_t * timer,char * buf)2023 format_time(const time_t *timer, char *buf) {
2024 ctime_r(timer, buf);
2025 size_t len = strlen(buf);
2026 assert(len < 26);
2027 char end;
2028
2029 assert(len>=1);
2030 end = buf[len-1];
2031 while (end == '\n' || end == '\r') {
2032 buf[len-1] = '\0';
2033 len--;
2034 assert(len>=1);
2035 end = buf[len-1];
2036 }
2037 }
2038
2039 ////////////////////////////////////////////////////////////////////////////////////////////////
2040 // Local definition of status information from portability layer, which should not include db.h.
2041 // Local status structs are used to concentrate file system information collected from various places
2042 // and memory information collected from memory.c.
2043 //
2044 typedef enum {
2045 FS_ENOSPC_REDZONE_STATE = 0, // possible values are enumerated by fs_redzone_state
2046 FS_ENOSPC_THREADS_BLOCKED, // how many threads currently blocked on ENOSPC
2047 FS_ENOSPC_REDZONE_CTR, // number of operations rejected by enospc prevention (red zone)
2048 FS_ENOSPC_MOST_RECENT, // most recent time that file system was completely full
2049 FS_ENOSPC_COUNT, // total number of times ENOSPC was returned from an attempt to write
2050 FS_FSYNC_TIME,
2051 FS_FSYNC_COUNT,
2052 FS_LONG_FSYNC_TIME,
2053 FS_LONG_FSYNC_COUNT,
2054 FS_STATUS_NUM_ROWS, // must be last
2055 } fs_status_entry;
2056
2057 typedef struct {
2058 bool initialized;
2059 TOKU_ENGINE_STATUS_ROW_S status[FS_STATUS_NUM_ROWS];
2060 } FS_STATUS_S, *FS_STATUS;
2061
2062 static FS_STATUS_S fsstat;
2063
2064 #define FS_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(fsstat, k, c, t, "filesystem: " l, inc)
2065
2066 static void
fs_status_init(void)2067 fs_status_init(void) {
2068 FS_STATUS_INIT(FS_ENOSPC_REDZONE_STATE, nullptr, FS_STATE, "ENOSPC redzone state", TOKU_ENGINE_STATUS);
2069 FS_STATUS_INIT(FS_ENOSPC_THREADS_BLOCKED, FILESYSTEM_THREADS_BLOCKED_BY_FULL_DISK, UINT64, "threads currently blocked by full disk", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2070 FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR, nullptr, UINT64, "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
2071 FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT, nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
2072 FS_STATUS_INIT(FS_ENOSPC_COUNT, nullptr, UINT64, "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
2073 FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_TIME, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2074 FS_STATUS_INIT(FS_FSYNC_COUNT, FILESYSTEM_FSYNC_NUM, UINT64, "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2075 FS_STATUS_INIT(FS_LONG_FSYNC_TIME, FILESYSTEM_LONG_FSYNC_TIME, UINT64, "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2076 FS_STATUS_INIT(FS_LONG_FSYNC_COUNT, FILESYSTEM_LONG_FSYNC_NUM, UINT64, "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2077 fsstat.initialized = true;
2078 }
2079 #undef FS_STATUS_INIT
2080
2081 #define FS_STATUS_VALUE(x) fsstat.status[x].value.num
2082
2083 static void
fs_get_status(DB_ENV * env,fs_redzone_state * redzone_state)2084 fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
2085 if (!fsstat.initialized)
2086 fs_status_init();
2087
2088 time_t enospc_most_recent_timestamp;
2089 uint64_t enospc_threads_blocked, enospc_total;
2090 toku_fs_get_write_info(&enospc_most_recent_timestamp, &enospc_threads_blocked, &enospc_total);
2091 if (enospc_threads_blocked)
2092 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = FS_BLOCKED;
2093 else
2094 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = env->i->fs_state;
2095 *redzone_state = (fs_redzone_state) FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE);
2096 FS_STATUS_VALUE(FS_ENOSPC_THREADS_BLOCKED) = enospc_threads_blocked;
2097 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_CTR) = env->i->enospc_redzone_ctr;
2098 FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
2099 FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
2100
2101 uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
2102 toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
2103 FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
2104 FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
2105 FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
2106 FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
2107 }
2108 #undef FS_STATUS_VALUE
2109
2110 // Local status struct used to get information from memory.c
2111 typedef enum {
2112 MEMORY_MALLOC_COUNT = 0,
2113 MEMORY_FREE_COUNT,
2114 MEMORY_REALLOC_COUNT,
2115 MEMORY_MALLOC_FAIL,
2116 MEMORY_REALLOC_FAIL,
2117 MEMORY_REQUESTED,
2118 MEMORY_USED,
2119 MEMORY_FREED,
2120 MEMORY_MAX_REQUESTED_SIZE,
2121 MEMORY_LAST_FAILED_SIZE,
2122 MEMORY_MAX_IN_USE,
2123 MEMORY_MALLOCATOR_VERSION,
2124 MEMORY_MMAP_THRESHOLD,
2125 MEMORY_STATUS_NUM_ROWS
2126 } memory_status_entry;
2127
2128 typedef struct {
2129 bool initialized;
2130 TOKU_ENGINE_STATUS_ROW_S status[MEMORY_STATUS_NUM_ROWS];
2131 } MEMORY_STATUS_S, *MEMORY_STATUS;
2132
2133 static MEMORY_STATUS_S memory_status;
2134
2135 #define STATUS_INIT(k,c,t,l) TOKUFT_STATUS_INIT(memory_status, k, c, t, "memory: " l, TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS)
2136
2137 static void
memory_status_init(void)2138 memory_status_init(void) {
2139 // Note, this function initializes the keyname, type, and legend fields.
2140 // Value fields are initialized to zero by compiler.
2141 STATUS_INIT(MEMORY_MALLOC_COUNT, MEMORY_MALLOC_COUNT, UINT64, "number of malloc operations");
2142 STATUS_INIT(MEMORY_FREE_COUNT, MEMORY_FREE_COUNT, UINT64, "number of free operations");
2143 STATUS_INIT(MEMORY_REALLOC_COUNT, MEMORY_REALLOC_COUNT, UINT64, "number of realloc operations");
2144 STATUS_INIT(MEMORY_MALLOC_FAIL, MEMORY_MALLOC_FAIL, UINT64, "number of malloc operations that failed");
2145 STATUS_INIT(MEMORY_REALLOC_FAIL, MEMORY_REALLOC_FAIL, UINT64, "number of realloc operations that failed" );
2146 STATUS_INIT(MEMORY_REQUESTED, MEMORY_REQUESTED, UINT64, "number of bytes requested");
2147 STATUS_INIT(MEMORY_USED, MEMORY_USED, UINT64, "number of bytes used (requested + overhead)");
2148 STATUS_INIT(MEMORY_FREED, MEMORY_FREED, UINT64, "number of bytes freed");
2149 STATUS_INIT(MEMORY_MAX_REQUESTED_SIZE, MEMORY_MAX_REQUESTED_SIZE, UINT64, "largest attempted allocation size");
2150 STATUS_INIT(MEMORY_LAST_FAILED_SIZE, MEMORY_LAST_FAILED_SIZE, UINT64, "size of the last failed allocation attempt");
2151 STATUS_INIT(MEMORY_MAX_IN_USE, MEM_ESTIMATED_MAXIMUM_MEMORY_FOOTPRINT, UINT64, "estimated maximum memory footprint");
2152 STATUS_INIT(MEMORY_MALLOCATOR_VERSION, MEMORY_MALLOCATOR_VERSION, CHARSTR, "mallocator version");
2153 STATUS_INIT(MEMORY_MMAP_THRESHOLD, MEMORY_MMAP_THRESHOLD, UINT64, "mmap threshold");
2154 memory_status.initialized = true;
2155 }
2156 #undef STATUS_INIT
2157
2158 #define MEMORY_STATUS_VALUE(x) memory_status.status[x].value.num
2159
2160 static void
memory_get_status(void)2161 memory_get_status(void) {
2162 if (!memory_status.initialized)
2163 memory_status_init();
2164 LOCAL_MEMORY_STATUS_S local_memstat;
2165 toku_memory_get_status(&local_memstat);
2166 MEMORY_STATUS_VALUE(MEMORY_MALLOC_COUNT) = local_memstat.malloc_count;
2167 MEMORY_STATUS_VALUE(MEMORY_FREE_COUNT) = local_memstat.free_count;
2168 MEMORY_STATUS_VALUE(MEMORY_REALLOC_COUNT) = local_memstat.realloc_count;
2169 MEMORY_STATUS_VALUE(MEMORY_MALLOC_FAIL) = local_memstat.malloc_fail;
2170 MEMORY_STATUS_VALUE(MEMORY_REALLOC_FAIL) = local_memstat.realloc_fail;
2171 MEMORY_STATUS_VALUE(MEMORY_REQUESTED) = local_memstat.requested;
2172 MEMORY_STATUS_VALUE(MEMORY_USED) = local_memstat.used;
2173 MEMORY_STATUS_VALUE(MEMORY_FREED) = local_memstat.freed;
2174 MEMORY_STATUS_VALUE(MEMORY_MAX_IN_USE) = local_memstat.max_in_use;
2175 MEMORY_STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = local_memstat.mmap_threshold;
2176 memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str = local_memstat.mallocator_version;
2177 }
2178 #undef MEMORY_STATUS_VALUE
2179
2180 // how many rows are in engine status?
2181 static int
env_get_engine_status_num_rows(DB_ENV * UU (env),uint64_t * num_rowsp)2182 env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp) {
2183 uint64_t num_rows = 0;
2184 num_rows += YDB_LAYER_STATUS_NUM_ROWS;
2185 num_rows += YDB_C_LAYER_STATUS_NUM_ROWS;
2186 num_rows += YDB_WRITE_LAYER_STATUS_NUM_ROWS;
2187 num_rows += LE_STATUS_S::LE_STATUS_NUM_ROWS;
2188 num_rows += CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS;
2189 num_rows += CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS;
2190 num_rows += LTM_STATUS_S::LTM_STATUS_NUM_ROWS;
2191 num_rows += FT_STATUS_S::FT_STATUS_NUM_ROWS;
2192 num_rows += FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS;
2193 num_rows += FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS;
2194 num_rows += TXN_STATUS_S::TXN_STATUS_NUM_ROWS;
2195 num_rows += LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS;
2196 num_rows += MEMORY_STATUS_NUM_ROWS;
2197 num_rows += FS_STATUS_NUM_ROWS;
2198 num_rows += INDEXER_STATUS_NUM_ROWS;
2199 num_rows += LOADER_STATUS_NUM_ROWS;
2200 num_rows += CTX_STATUS_NUM_ROWS;
2201 #if 0
2202 // enable when upgrade is supported
2203 num_rows += FT_UPGRADE_STATUS_NUM_ROWS;
2204 num_rows += PERSISTENT_UPGRADE_STATUS_NUM_ROWS;
2205 #endif
2206 *num_rowsp = num_rows;
2207 return 0;
2208 }
2209
2210 // Do not take ydb lock or any other lock around or in this function.
2211 // If the engine is blocked because some thread is holding a lock, this function
2212 // can help diagnose the problem.
2213 // This function only collects information, and it does not matter if something gets garbled
2214 // because of a race condition.
2215 // Note, engine status is still collected even if the environment or logger is panicked
2216 static int
env_get_engine_status(DB_ENV * env,TOKU_ENGINE_STATUS_ROW engstat,uint64_t maxrows,uint64_t * num_rows,fs_redzone_state * redzone_state,uint64_t * env_panicp,char * env_panic_string_buf,int env_panic_string_length,toku_engine_status_include_type include_flags)2217 env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t maxrows, uint64_t *num_rows, fs_redzone_state* redzone_state, uint64_t * env_panicp, char * env_panic_string_buf, int env_panic_string_length, toku_engine_status_include_type include_flags) {
2218 int r;
2219
2220 if (env_panic_string_buf) {
2221 if (env && env->i && env->i->is_panicked && env->i->panic_string) {
2222 strncpy(env_panic_string_buf, env->i->panic_string, env_panic_string_length);
2223 env_panic_string_buf[env_panic_string_length - 1] = '\0'; // just in case
2224 }
2225 else
2226 *env_panic_string_buf = '\0';
2227 }
2228
2229 if ( !(env) ||
2230 !(env->i) ||
2231 !(env_opened(env)) ||
2232 !num_rows ||
2233 !include_flags)
2234 r = EINVAL;
2235 else {
2236 r = 0;
2237 uint64_t row = 0; // which row to fill next
2238 *env_panicp = env->i->is_panicked;
2239
2240 {
2241 YDB_LAYER_STATUS_S ydb_stat;
2242 ydb_layer_get_status(env, &ydb_stat);
2243 for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2244 if (ydb_stat.status[i].include & include_flags) {
2245 engstat[row++] = ydb_stat.status[i];
2246 }
2247 }
2248 }
2249 {
2250 YDB_C_LAYER_STATUS_S ydb_c_stat;
2251 ydb_c_layer_get_status(&ydb_c_stat);
2252 for (int i = 0; i < YDB_C_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2253 if (ydb_c_stat.status[i].include & include_flags) {
2254 engstat[row++] = ydb_c_stat.status[i];
2255 }
2256 }
2257 }
2258 {
2259 YDB_WRITE_LAYER_STATUS_S ydb_write_stat;
2260 ydb_write_layer_get_status(&ydb_write_stat);
2261 for (int i = 0; i < YDB_WRITE_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2262 if (ydb_write_stat.status[i].include & include_flags) {
2263 engstat[row++] = ydb_write_stat.status[i];
2264 }
2265 }
2266 }
2267 {
2268 LE_STATUS_S lestat; // Rice's vampire
2269 toku_le_get_status(&lestat);
2270 for (int i = 0; i < LE_STATUS_S::LE_STATUS_NUM_ROWS && row < maxrows; i++) {
2271 if (lestat.status[i].include & include_flags) {
2272 engstat[row++] = lestat.status[i];
2273 }
2274 }
2275 }
2276 {
2277 CHECKPOINT_STATUS_S cpstat;
2278 toku_checkpoint_get_status(env->i->cachetable, &cpstat);
2279 for (int i = 0; i < CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS && row < maxrows; i++) {
2280 if (cpstat.status[i].include & include_flags) {
2281 engstat[row++] = cpstat.status[i];
2282 }
2283 }
2284 }
2285 {
2286 CACHETABLE_STATUS_S ctstat;
2287 toku_cachetable_get_status(env->i->cachetable, &ctstat);
2288 for (int i = 0; i < CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS && row < maxrows; i++) {
2289 if (ctstat.status[i].include & include_flags) {
2290 engstat[row++] = ctstat.status[i];
2291 }
2292 }
2293 }
2294 {
2295 LTM_STATUS_S ltmstat;
2296 env->i->ltm.get_status(<mstat);
2297 for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS && row < maxrows; i++) {
2298 if (ltmstat.status[i].include & include_flags) {
2299 engstat[row++] = ltmstat.status[i];
2300 }
2301 }
2302 }
2303 {
2304 FT_STATUS_S ftstat;
2305 toku_ft_get_status(&ftstat);
2306 for (int i = 0; i < FT_STATUS_S::FT_STATUS_NUM_ROWS && row < maxrows; i++) {
2307 if (ftstat.status[i].include & include_flags) {
2308 engstat[row++] = ftstat.status[i];
2309 }
2310 }
2311 }
2312 {
2313 FT_FLUSHER_STATUS_S flusherstat;
2314 toku_ft_flusher_get_status(&flusherstat);
2315 for (int i = 0; i < FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS && row < maxrows; i++) {
2316 if (flusherstat.status[i].include & include_flags) {
2317 engstat[row++] = flusherstat.status[i];
2318 }
2319 }
2320 }
2321 {
2322 FT_HOT_STATUS_S hotstat;
2323 toku_ft_hot_get_status(&hotstat);
2324 for (int i = 0; i < FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS && row < maxrows; i++) {
2325 if (hotstat.status[i].include & include_flags) {
2326 engstat[row++] = hotstat.status[i];
2327 }
2328 }
2329 }
2330 {
2331 TXN_STATUS_S txnstat;
2332 toku_txn_get_status(&txnstat);
2333 for (int i = 0; i < TXN_STATUS_S::TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
2334 if (txnstat.status[i].include & include_flags) {
2335 engstat[row++] = txnstat.status[i];
2336 }
2337 }
2338 }
2339 {
2340 LOGGER_STATUS_S loggerstat;
2341 toku_logger_get_status(env->i->logger, &loggerstat);
2342 for (int i = 0; i < LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS && row < maxrows; i++) {
2343 if (loggerstat.status[i].include & include_flags) {
2344 engstat[row++] = loggerstat.status[i];
2345 }
2346 }
2347 }
2348
2349 {
2350 INDEXER_STATUS_S indexerstat;
2351 toku_indexer_get_status(&indexerstat);
2352 for (int i = 0; i < INDEXER_STATUS_NUM_ROWS && row < maxrows; i++) {
2353 if (indexerstat.status[i].include & include_flags) {
2354 engstat[row++] = indexerstat.status[i];
2355 }
2356 }
2357 }
2358 {
2359 LOADER_STATUS_S loaderstat;
2360 toku_loader_get_status(&loaderstat);
2361 for (int i = 0; i < LOADER_STATUS_NUM_ROWS && row < maxrows; i++) {
2362 if (loaderstat.status[i].include & include_flags) {
2363 engstat[row++] = loaderstat.status[i];
2364 }
2365 }
2366 }
2367
2368 {
2369 // memory_status is local to this file
2370 memory_get_status();
2371 for (int i = 0; i < MEMORY_STATUS_NUM_ROWS && row < maxrows; i++) {
2372 if (memory_status.status[i].include & include_flags) {
2373 engstat[row++] = memory_status.status[i];
2374 }
2375 }
2376 }
2377 {
2378 // Note, fs_get_status() and the fsstat structure are local to this file because they
2379 // are used to concentrate file system information collected from various places.
2380 fs_get_status(env, redzone_state);
2381 for (int i = 0; i < FS_STATUS_NUM_ROWS && row < maxrows; i++) {
2382 if (fsstat.status[i].include & include_flags) {
2383 engstat[row++] = fsstat.status[i];
2384 }
2385 }
2386 }
2387 {
2388 struct context_status ctxstatus;
2389 toku_context_get_status(&ctxstatus);
2390 for (int i = 0; i < CTX_STATUS_NUM_ROWS && row < maxrows; i++) {
2391 if (ctxstatus.status[i].include & include_flags) {
2392 engstat[row++] = ctxstatus.status[i];
2393 }
2394 }
2395 }
2396 #if 0
2397 // enable when upgrade is supported
2398 {
2399 for (int i = 0; i < PERSISTENT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2400 if (persistent_upgrade_status.status[i].include & include_flags) {
2401 engstat[row++] = persistent_upgrade_status.status[i];
2402 }
2403 }
2404 FT_UPGRADE_STATUS_S ft_upgradestat;
2405 toku_ft_upgrade_get_status(&ft_upgradestat);
2406 for (int i = 0; i < FT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2407 if (ft_upgradestat.status[i].include & include_flags) {
2408 engstat[row++] = ft_upgradestat.status[i];
2409 }
2410 }
2411
2412 }
2413 #endif
2414 if (r==0) {
2415 *num_rows = row;
2416 }
2417 }
2418 return r;
2419 }
2420
2421 // Fill buff with text description of engine status up to bufsiz bytes.
2422 // Intended for use by test programs that do not have the handlerton available,
2423 // and for use by toku_assert logic to print diagnostic info on crash.
2424 static int
env_get_engine_status_text(DB_ENV * env,char * buff,int bufsiz)2425 env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
2426 uint32_t stringsize = 1024;
2427 uint64_t panic;
2428 char panicstring[stringsize];
2429 int n = 0; // number of characters printed so far
2430 uint64_t num_rows;
2431 uint64_t max_rows;
2432 fs_redzone_state redzone_state;
2433
2434 n = snprintf(buff, bufsiz - n, "BUILD_ID = %d\n", BUILD_ID);
2435
2436 (void) env_get_engine_status_num_rows (env, &max_rows);
2437 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2438 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2439
2440 if (r) {
2441 n += snprintf(buff + n, bufsiz - n, "Engine status not available: ");
2442 if (!env) {
2443 n += snprintf(buff + n, bufsiz - n, "no environment\n");
2444 }
2445 else if (!(env->i)) {
2446 n += snprintf(buff + n, bufsiz - n, "environment internal struct is null\n");
2447 }
2448 else if (!env_opened(env)) {
2449 n += snprintf(buff + n, bufsiz - n, "environment is not open\n");
2450 }
2451 }
2452 else {
2453 if (panic) {
2454 n += snprintf(buff + n, bufsiz - n, "Env panic code: %" PRIu64 "\n", panic);
2455 if (strlen(panicstring)) {
2456 invariant(strlen(panicstring) <= stringsize);
2457 n += snprintf(buff + n, bufsiz - n, "Env panic string: %s\n", panicstring);
2458 }
2459 }
2460
2461 for (uint64_t row = 0; row < num_rows; row++) {
2462 n += snprintf(buff + n, bufsiz - n, "%s: ", mystat[row].legend);
2463 switch (mystat[row].type) {
2464 case FS_STATE:
2465 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2466 break;
2467 case UINT64:
2468 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2469 break;
2470 case CHARSTR:
2471 n += snprintf(buff + n, bufsiz - n, "%s\n", mystat[row].value.str);
2472 break;
2473 case UNIXTIME:
2474 {
2475 char tbuf[26];
2476 format_time((time_t*)&mystat[row].value.num, tbuf);
2477 n += snprintf(buff + n, bufsiz - n, "%s\n", tbuf);
2478 }
2479 break;
2480 case TOKUTIME:
2481 {
2482 double t = tokutime_to_seconds(mystat[row].value.num);
2483 n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
2484 }
2485 break;
2486 case PARCOUNT:
2487 {
2488 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2489 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
2490 }
2491 break;
2492 default:
2493 n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
2494 break;
2495 }
2496 }
2497 }
2498
2499 if (n > bufsiz) {
2500 const char * errmsg = "BUFFER TOO SMALL\n";
2501 int len = strlen(errmsg) + 1;
2502 (void) snprintf(buff + (bufsiz - 1) - len, len, "%s", errmsg);
2503 }
2504
2505 return r;
2506 }
2507
2508 // prints engine status using toku_env_err line-by-line
2509 static int
env_err_engine_status(DB_ENV * env)2510 env_err_engine_status(DB_ENV * env) {
2511 uint32_t stringsize = 1024;
2512 uint64_t panic;
2513 char panicstring[stringsize];
2514 uint64_t num_rows;
2515 uint64_t max_rows;
2516 fs_redzone_state redzone_state;
2517
2518 toku_env_err(env, 0, "BUILD_ID = %d", BUILD_ID);
2519
2520 (void) env_get_engine_status_num_rows (env, &max_rows);
2521 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2522 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2523
2524 if (r) {
2525 toku_env_err(env, 0, "Engine status not available: ");
2526 if (!env) {
2527 toku_env_err(env, 0, "no environment");
2528 }
2529 else if (!(env->i)) {
2530 toku_env_err(env, 0, "environment internal struct is null");
2531 }
2532 else if (!env_opened(env)) {
2533 toku_env_err(env, 0, "environment is not open");
2534 }
2535 }
2536 else {
2537 if (panic) {
2538 toku_env_err(env, 0, "Env panic code: %" PRIu64, panic);
2539 if (strlen(panicstring)) {
2540 invariant(strlen(panicstring) <= stringsize);
2541 toku_env_err(env, 0, "Env panic string: %s", panicstring);
2542 }
2543 }
2544
2545 for (uint64_t row = 0; row < num_rows; row++) {
2546 switch (mystat[row].type) {
2547 case FS_STATE:
2548 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2549 break;
2550 case UINT64:
2551 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2552 break;
2553 case CHARSTR:
2554 toku_env_err(env, 0, "%s: %s", mystat[row].legend, mystat[row].value.str);
2555 break;
2556 case UNIXTIME:
2557 {
2558 char tbuf[26];
2559 format_time((time_t*)&mystat[row].value.num, tbuf);
2560 toku_env_err(env, 0, "%s: %s", mystat[row].legend, tbuf);
2561 }
2562 break;
2563 case TOKUTIME:
2564 {
2565 double t = tokutime_to_seconds(mystat[row].value.num);
2566 toku_env_err(env, 0, "%s: %.6f", mystat[row].legend, t);
2567 }
2568 break;
2569 case PARCOUNT:
2570 {
2571 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2572 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, v);
2573 }
2574 break;
2575 default:
2576 toku_env_err(env, 0, "%s: UNKNOWN STATUS TYPE: %d", mystat[row].legend, mystat[row].type);
2577 break;
2578 }
2579 }
2580 }
2581
2582 return r;
2583 }
2584
2585 // intended for use by toku_assert logic, when env is not known
2586 static int
toku_maybe_get_engine_status_text(char * buff,int buffsize)2587 toku_maybe_get_engine_status_text (char * buff, int buffsize) {
2588 DB_ENV * env = most_recent_env;
2589 int r;
2590 if (engine_status_enable && env != NULL) {
2591 r = env_get_engine_status_text(env, buff, buffsize);
2592 }
2593 else {
2594 r = EOPNOTSUPP;
2595 snprintf(buff, buffsize, "Engine status not available: disabled by user. This should only happen in test programs.\n");
2596 }
2597 return r;
2598 }
2599
2600 static int
toku_maybe_err_engine_status(void)2601 toku_maybe_err_engine_status (void) {
2602 DB_ENV * env = most_recent_env;
2603 int r;
2604 if (engine_status_enable && env != NULL) {
2605 r = env_err_engine_status(env);
2606 }
2607 else {
2608 r = EOPNOTSUPP;
2609 }
2610 return r;
2611 }
2612
2613 // Set panic code and panic string if not already panicked,
2614 // intended for use by toku_assert when about to abort().
2615 static void
toku_maybe_set_env_panic(int code,const char * msg)2616 toku_maybe_set_env_panic(int code, const char * msg) {
2617 if (code == 0)
2618 code = -1;
2619 if (msg == NULL)
2620 msg = "Unknown cause from abort (failed assert)\n";
2621 env_is_panicked = code; // disable library destructor no matter what
2622 DB_ENV * env = most_recent_env;
2623 if (env &&
2624 env->i &&
2625 (env->i->is_panicked == 0)) {
2626 env_panic(env, code, msg);
2627 }
2628 }
2629
2630 // handlerton's call to fractal tree layer on failed assert in handlerton
2631 static int
env_crash(DB_ENV * UU (db_env),const char * msg,const char * fun,const char * file,int line,int caller_errno)2632 env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* file, int line, int caller_errno) {
2633 toku_do_assert_fail(msg, fun, file, line, caller_errno);
2634 return -1; // placate compiler
2635 }
2636
2637 static int
env_get_cursor_for_persistent_environment(DB_ENV * env,DB_TXN * txn,DBC ** c)2638 env_get_cursor_for_persistent_environment(DB_ENV* env, DB_TXN* txn, DBC** c) {
2639 if (!env_opened(env)) {
2640 return EINVAL;
2641 }
2642 return toku_db_cursor(env->i->persistent_environment, txn, c, 0);
2643 }
2644
2645 static int
env_get_cursor_for_directory(DB_ENV * env,DB_TXN * txn,DBC ** c)2646 env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
2647 if (!env_opened(env)) {
2648 return EINVAL;
2649 }
2650 return toku_db_cursor(env->i->directory, txn, c, 0);
2651 }
2652
2653 static DB *
env_get_db_for_directory(DB_ENV * env)2654 env_get_db_for_directory(DB_ENV* env) {
2655 if (!env_opened(env)) {
2656 return NULL;
2657 }
2658 return env->i->directory;
2659 }
2660
2661 struct ltm_iterate_requests_callback_extra {
ltm_iterate_requests_callback_extraltm_iterate_requests_callback_extra2662 ltm_iterate_requests_callback_extra(DB_ENV *e,
2663 iterate_requests_callback cb,
2664 void *ex) :
2665 env(e), callback(cb), extra(ex) {
2666 }
2667 DB_ENV *env;
2668 iterate_requests_callback callback;
2669 void *extra;
2670 };
2671
2672 static int
find_db_by_dict_id(DB * const & db,const DICTIONARY_ID & dict_id_find)2673 find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
2674 DICTIONARY_ID dict_id = db->i->dict_id;
2675 if (dict_id.dictid < dict_id_find.dictid) {
2676 return -1;
2677 } else if (dict_id.dictid > dict_id_find.dictid) {
2678 return 1;
2679 } else {
2680 return 0;
2681 }
2682 }
2683
2684 static DB *
locked_get_db_by_dict_id(DB_ENV * env,DICTIONARY_ID dict_id)2685 locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
2686 DB *db;
2687 int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
2688 return r == 0 ? db : nullptr;
2689 }
2690
ltm_iterate_requests_callback(DICTIONARY_ID dict_id,TXNID txnid,const DBT * left_key,const DBT * right_key,TXNID blocking_txnid,uint64_t start_time,void * extra)2691 static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
2692 const DBT *left_key,
2693 const DBT *right_key,
2694 TXNID blocking_txnid,
2695 uint64_t start_time,
2696 void *extra) {
2697 ltm_iterate_requests_callback_extra *info =
2698 reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
2699
2700 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2701 int r = 0;
2702 DB *db = locked_get_db_by_dict_id(info->env, dict_id);
2703 if (db != nullptr) {
2704 r = info->callback(db, txnid, left_key, right_key,
2705 blocking_txnid, start_time, info->extra);
2706 }
2707 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2708 return r;
2709 }
2710
2711 static int
env_iterate_pending_lock_requests(DB_ENV * env,iterate_requests_callback callback,void * extra)2712 env_iterate_pending_lock_requests(DB_ENV *env,
2713 iterate_requests_callback callback,
2714 void *extra) {
2715 if (!env_opened(env)) {
2716 return EINVAL;
2717 }
2718
2719 toku::locktree_manager *mgr = &env->i->ltm;
2720 ltm_iterate_requests_callback_extra e(env, callback, extra);
2721 return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
2722 }
2723
2724 // for the lifetime of this object:
2725 // - open_dbs_rwlock must be read locked (or better)
2726 // - txn_mutex must be held
2727 struct iter_txn_row_locks_callback_extra {
iter_txn_row_locks_callback_extraiter_txn_row_locks_callback_extra2728 iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
2729 env(e), current_db(nullptr), which_lt(0), lt_map(m) {
2730 if (lt_map->size() > 0) {
2731 set_iterator_and_current_db();
2732 }
2733 }
2734
set_iterator_and_current_dbiter_txn_row_locks_callback_extra2735 void set_iterator_and_current_db() {
2736 txn_lt_key_ranges ranges;
2737 const int r = lt_map->fetch(which_lt, &ranges);
2738 invariant_zero(r);
2739 current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
2740 iter = toku::range_buffer::iterator(ranges.buffer);
2741 }
2742
2743 DB_ENV *env;
2744 DB *current_db;
2745 size_t which_lt;
2746 toku::omt<txn_lt_key_ranges> *lt_map;
2747 toku::range_buffer::iterator iter;
2748 toku::range_buffer::iterator::record rec;
2749 };
2750
iter_txn_row_locks_callback(DB ** db,DBT * left_key,DBT * right_key,void * extra)2751 static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
2752 iter_txn_row_locks_callback_extra *info =
2753 reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
2754
2755 while (info->which_lt < info->lt_map->size()) {
2756 const bool more = info->iter.current(&info->rec);
2757 if (more) {
2758 *db = info->current_db;
2759 // The caller should interpret data/size == 0 to mean infinity.
2760 // Therefore, when we copyref pos/neg infinity into left/right_key,
2761 // the caller knows what we're talking about.
2762 toku_copyref_dbt(left_key, *info->rec.get_left_key());
2763 toku_copyref_dbt(right_key, *info->rec.get_right_key());
2764 info->iter.next();
2765 return 0;
2766 } else {
2767 info->which_lt++;
2768 if (info->which_lt < info->lt_map->size()) {
2769 info->set_iterator_and_current_db();
2770 }
2771 }
2772 }
2773 return DB_NOTFOUND;
2774 }
2775
2776 struct iter_txns_callback_extra {
iter_txns_callback_extraiter_txns_callback_extra2777 iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
2778 env(e), callback(cb), extra(ex) {
2779 }
2780 DB_ENV *env;
2781 iterate_transactions_callback callback;
2782 void *extra;
2783 };
2784
iter_txns_callback(TOKUTXN txn,void * extra)2785 static int iter_txns_callback(TOKUTXN txn, void *extra) {
2786 int r = 0;
2787 iter_txns_callback_extra *info =
2788 reinterpret_cast<iter_txns_callback_extra *>(extra);
2789 DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
2790 invariant_notnull(dbtxn);
2791 struct __toku_db_txn_internal *db_txn_internal __attribute__((__unused__)) = db_txn_struct_i(dbtxn);
2792 TOKU_VALGRIND_HG_DISABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2793 if (db_txn_struct_i(dbtxn)->tokutxn == txn) { // make sure that the dbtxn is fully initialized
2794 toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
2795 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2796
2797 iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
2798 r = info->callback(dbtxn, iter_txn_row_locks_callback, &e, info->extra);
2799
2800 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2801 toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
2802 }
2803 TOKU_VALGRIND_HG_ENABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2804
2805 return r;
2806 }
2807
2808 static int
env_iterate_live_transactions(DB_ENV * env,iterate_transactions_callback callback,void * extra)2809 env_iterate_live_transactions(DB_ENV *env,
2810 iterate_transactions_callback callback,
2811 void *extra) {
2812 if (!env_opened(env)) {
2813 return EINVAL;
2814 }
2815
2816 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
2817 iter_txns_callback_extra e(env, callback, extra);
2818 return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
2819 }
2820
env_set_loader_memory_size(DB_ENV * env,uint64_t (* get_loader_memory_size_callback)(void))2821 static void env_set_loader_memory_size(DB_ENV *env, uint64_t (*get_loader_memory_size_callback)(void)) {
2822 env->i->get_loader_memory_size_callback = get_loader_memory_size_callback;
2823 }
2824
env_get_loader_memory_size(DB_ENV * env)2825 static uint64_t env_get_loader_memory_size(DB_ENV *env) {
2826 uint64_t memory_size = 0;
2827 if (env->i->get_loader_memory_size_callback)
2828 memory_size = env->i->get_loader_memory_size_callback();
2829 return memory_size;
2830 }
2831
env_set_killed_callback(DB_ENV * env,uint64_t default_killed_time_msec,uint64_t (* get_killed_time_callback)(uint64_t default_killed_time_msec),int (* killed_callback)(void))2832 static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_msec, uint64_t (*get_killed_time_callback)(uint64_t default_killed_time_msec), int (*killed_callback)(void)) {
2833 env->i->default_killed_time_msec = default_killed_time_msec;
2834 env->i->get_killed_time_callback = get_killed_time_callback;
2835 env->i->killed_callback = killed_callback;
2836 }
2837
env_kill_waiter(DB_ENV * env,void * extra)2838 static void env_kill_waiter(DB_ENV *env, void *extra) {
2839 env->i->ltm.kill_waiter(extra);
2840 }
2841
env_do_backtrace(DB_ENV * env)2842 static void env_do_backtrace(DB_ENV *env) {
2843 if (env->i->errcall) {
2844 db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
2845 }
2846 if (env->i->errfile) {
2847 db_env_do_backtrace((FILE *) env->i->errfile);
2848 } else {
2849 db_env_do_backtrace(stderr);
2850 }
2851 }
2852
2853 static int
toku_env_create(DB_ENV ** envp,uint32_t flags)2854 toku_env_create(DB_ENV ** envp, uint32_t flags) {
2855 int r = ENOSYS;
2856 DB_ENV* result = NULL;
2857
2858 if (flags!=0) { r = EINVAL; goto cleanup; }
2859 MALLOC(result);
2860 if (result == 0) { r = ENOMEM; goto cleanup; }
2861 memset(result, 0, sizeof *result);
2862
2863 // locked methods
2864 result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_env_err;
2865 #define SENV(name) result->name = locked_env_ ## name
2866 SENV(dbremove);
2867 SENV(dbrename);
2868 SENV(dirtool_attach);
2869 SENV(dirtool_detach);
2870 SENV(dirtool_move);
2871 //SENV(set_noticecall);
2872 #undef SENV
2873 #define USENV(name) result->name = env_ ## name
2874 // methods with locking done internally
2875 USENV(put_multiple);
2876 USENV(del_multiple);
2877 USENV(update_multiple);
2878 // unlocked methods
2879 USENV(open);
2880 USENV(close);
2881 USENV(set_default_bt_compare);
2882 USENV(set_update);
2883 USENV(set_generate_row_callback_for_put);
2884 USENV(set_generate_row_callback_for_del);
2885 USENV(set_lg_bsize);
2886 USENV(set_lg_dir);
2887 USENV(set_lg_max);
2888 USENV(get_lg_max);
2889 USENV(set_lk_max_memory);
2890 USENV(get_lk_max_memory);
2891 USENV(get_iname);
2892 USENV(set_errcall);
2893 USENV(set_errfile);
2894 USENV(set_errpfx);
2895 USENV(set_data_dir);
2896 USENV(checkpointing_set_period);
2897 USENV(checkpointing_get_period);
2898 USENV(cleaner_set_period);
2899 USENV(cleaner_get_period);
2900 USENV(cleaner_set_iterations);
2901 USENV(cleaner_get_iterations);
2902 USENV(evictor_set_enable_partial_eviction);
2903 USENV(evictor_get_enable_partial_eviction);
2904 USENV(set_cachesize);
2905 USENV(set_client_pool_threads);
2906 USENV(set_cachetable_pool_threads);
2907 USENV(set_checkpoint_pool_threads);
2908 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
2909 USENV(get_cachesize);
2910 #endif
2911 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
2912 USENV(set_lk_max);
2913 #endif
2914 USENV(set_lk_detect);
2915 USENV(set_flags);
2916 USENV(set_tmp_dir);
2917 USENV(set_verbose);
2918 USENV(txn_recover);
2919 USENV(txn_xa_recover);
2920 USENV(get_txn_from_xid);
2921 USENV(txn_stat);
2922 USENV(get_lock_timeout);
2923 USENV(set_lock_timeout);
2924 USENV(set_lock_timeout_callback);
2925 USENV(set_lock_wait_callback);
2926 USENV(set_redzone);
2927 USENV(log_flush);
2928 USENV(log_archive);
2929 USENV(create_loader);
2930 USENV(get_cursor_for_persistent_environment);
2931 USENV(get_cursor_for_directory);
2932 USENV(get_db_for_directory);
2933 USENV(iterate_pending_lock_requests);
2934 USENV(iterate_live_transactions);
2935 USENV(change_fsync_log_period);
2936 USENV(set_loader_memory_size);
2937 USENV(get_loader_memory_size);
2938 USENV(set_killed_callback);
2939 USENV(do_backtrace);
2940 USENV(set_check_thp);
2941 USENV(get_check_thp);
2942 USENV(set_dir_per_db);
2943 USENV(get_dir_per_db);
2944 USENV(get_data_dir);
2945 USENV(kill_waiter);
2946 #undef USENV
2947
2948 // unlocked methods
2949 result->create_indexer = toku_indexer_create_indexer;
2950 result->txn_checkpoint = toku_env_txn_checkpoint;
2951 result->checkpointing_postpone = env_checkpointing_postpone;
2952 result->checkpointing_resume = env_checkpointing_resume;
2953 result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
2954 result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
2955 result->get_engine_status_num_rows = env_get_engine_status_num_rows;
2956 result->get_engine_status = env_get_engine_status;
2957 result->get_engine_status_text = env_get_engine_status_text;
2958 result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert
2959 result->txn_begin = toku_txn_begin;
2960
2961 MALLOC(result->i);
2962 if (result->i == 0) { r = ENOMEM; goto cleanup; }
2963 memset(result->i, 0, sizeof *result->i);
2964 result->i->envdir_lockfd = -1;
2965 result->i->datadir_lockfd = -1;
2966 result->i->logdir_lockfd = -1;
2967 result->i->tmpdir_lockfd = -1;
2968 env_fs_init(result);
2969 env_fsync_log_init(result);
2970
2971 result->i->check_thp = true;
2972
2973 result->i->bt_compare = toku_builtin_compare_fun;
2974
2975 r = toku_logger_create(&result->i->logger);
2976 invariant_zero(r);
2977 invariant_notnull(result->i->logger);
2978
2979 // Create the locktree manager, passing in the create/destroy/escalate callbacks.
2980 // The extra parameter for escalation is simply a pointer to this environment.
2981 // The escalate callback will need it to translate txnids to DB_TXNs
2982 result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
2983
2984 XMALLOC(result->i->open_dbs_by_dname);
2985 result->i->open_dbs_by_dname->create();
2986 XMALLOC(result->i->open_dbs_by_dict_id);
2987 result->i->open_dbs_by_dict_id->create();
2988 toku_pthread_rwlock_init(
2989 *result_i_open_dbs_rwlock_key, &result->i->open_dbs_rwlock, nullptr);
2990
2991 *envp = result;
2992 r = 0;
2993 toku_sync_fetch_and_add(&tokuft_num_envs, 1);
2994 cleanup:
2995 if (r!=0) {
2996 if (result) {
2997 toku_free(result->i);
2998 toku_free(result);
2999 }
3000 }
3001 return r;
3002 }
3003
3004 int
DB_ENV_CREATE_FUN(DB_ENV ** envp,uint32_t flags)3005 DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
3006 int r = toku_env_create(envp, flags);
3007 return r;
3008 }
3009
3010 // return 0 if v and dbv refer to same db (including same dname)
3011 // return <0 if v is earlier in omt than dbv
3012 // return >0 if v is later in omt than dbv
3013 static int
find_db_by_db_dname(DB * const & db,DB * const & dbfind)3014 find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
3015 int cmp;
3016 const char *dname = db->i->dname;
3017 const char *dnamefind = dbfind->i->dname;
3018 cmp = strcmp(dname, dnamefind);
3019 if (cmp != 0) return cmp;
3020 if (db < dbfind) return -1;
3021 if (db > dbfind) return 1;
3022 return 0;
3023 }
3024
3025 static int
find_db_by_db_dict_id(DB * const & db,DB * const & dbfind)3026 find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
3027 DICTIONARY_ID dict_id = db->i->dict_id;
3028 DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
3029 if (dict_id.dictid < dict_id_find.dictid) {
3030 return -1;
3031 } else if (dict_id.dictid > dict_id_find.dictid) {
3032 return 1;
3033 } else if (db < dbfind) {
3034 return -1;
3035 } else if (db > dbfind) {
3036 return 1;
3037 } else {
3038 return 0;
3039 }
3040 }
3041
3042 // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
3043 void
env_note_db_opened(DB_ENV * env,DB * db)3044 env_note_db_opened(DB_ENV *env, DB *db) {
3045 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3046 assert(db->i->dname); // internal (non-user) dictionary has no dname
3047
3048 int r;
3049 uint32_t idx;
3050
3051 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3052 assert(r == DB_NOTFOUND);
3053 r = env->i->open_dbs_by_dname->insert_at(db, idx);
3054 assert_zero(r);
3055 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3056 assert(r == DB_NOTFOUND);
3057 r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
3058 assert_zero(r);
3059
3060 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3061 STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
3062 if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
3063 STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
3064 }
3065 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3066 }
3067
3068 // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
3069 void
env_note_db_closed(DB_ENV * env,DB * db)3070 env_note_db_closed(DB_ENV *env, DB *db) {
3071 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3072 assert(db->i->dname); // internal (non-user) dictionary has no dname
3073 assert(env->i->open_dbs_by_dname->size() > 0);
3074 assert(env->i->open_dbs_by_dict_id->size() > 0);
3075
3076 int r;
3077 uint32_t idx;
3078
3079 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3080 assert_zero(r);
3081 r = env->i->open_dbs_by_dname->delete_at(idx);
3082 assert_zero(r);
3083 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3084 assert_zero(r);
3085 r = env->i->open_dbs_by_dict_id->delete_at(idx);
3086 assert_zero(r);
3087
3088 STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
3089 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3090 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3091 }
3092
3093 static int
find_open_db_by_dname(DB * const & db,const char * const & dnamefind)3094 find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
3095 return strcmp(db->i->dname, dnamefind);
3096 }
3097
3098 // return true if there is any db open with the given dname
3099 static bool
env_is_db_with_dname_open(DB_ENV * env,const char * dname)3100 env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
3101 DB *db;
3102 toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
3103 int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
3104 if (r == 0) {
3105 invariant(strcmp(dname, db->i->dname) == 0);
3106 } else {
3107 invariant(r == DB_NOTFOUND);
3108 }
3109 toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
3110 return r == 0 ? true : false;
3111 }
3112
3113 //We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
3114 static int
env_dbremove_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,int32_t flags)3115 env_dbremove_subdb(DB_ENV * env, DB_TXN * txn, const char *fname, const char *dbname, int32_t flags) {
3116 int r;
3117 if (!fname || !dbname) r = EINVAL;
3118 else {
3119 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3120 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3121 assert(bytes==(int)sizeof(subdb_full_name)-1);
3122 const char *null_subdbname = NULL;
3123 r = env_dbremove(env, txn, subdb_full_name, null_subdbname, flags);
3124 }
3125 return r;
3126 }
3127
3128 // see if we can acquire a table lock for the given dname.
3129 // requires: write lock on dname in the directory. dictionary
3130 // open, close, and begin checkpoint cannot occur.
3131 // returns: zero if we could open, lock, and close a dictionary
3132 // with the given dname, errno otherwise.
3133 static int
can_acquire_table_lock(DB_ENV * env,DB_TXN * txn,const char * iname_in_env)3134 can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
3135 int r;
3136 DB *db;
3137
3138 r = toku_db_create(&db, env, 0);
3139 assert_zero(r);
3140 r = toku_db_open_iname(db, txn, iname_in_env, 0, 0);
3141 if(r) {
3142 if (r == ENAMETOOLONG)
3143 toku_ydb_do_error(env, r, "File name too long!\n");
3144 goto exit;
3145 }
3146 r = toku_db_pre_acquire_table_lock(db, txn);
3147 if (r) {
3148 r = DB_LOCK_NOTGRANTED;
3149 }
3150 exit:
3151 if(db) {
3152 int r2 = toku_db_close(db);
3153 assert_zero(r2);
3154 }
3155 return r;
3156 }
3157
3158 static int
env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)3159 env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
3160 int r;
3161 HANDLE_PANICKED_ENV(env);
3162 if (!env_opened(env) || flags != 0) {
3163 return EINVAL;
3164 }
3165 HANDLE_READ_ONLY_TXN(txn);
3166 if (dbname != NULL) {
3167 // env_dbremove_subdb() converts (fname, dbname) to dname
3168 return env_dbremove_subdb(env, txn, fname, dbname, flags);
3169 }
3170
3171 const char * dname = fname;
3172 assert(dbname == NULL);
3173
3174 // We check for an open db here as a "fast path" to error.
3175 // We'll need to check again below to be sure.
3176 if (env_is_db_with_dname_open(env, dname)) {
3177 return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3178 }
3179
3180 DBT dname_dbt;
3181 DBT iname_dbt;
3182 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
3183 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3184
3185 // get iname
3186 r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3187 char *iname = (char *) iname_dbt.data;
3188 DB *db = NULL;
3189 if (r != 0) {
3190 if (r == DB_NOTFOUND) {
3191 r = ENOENT;
3192 }
3193 goto exit;
3194 }
3195 // remove (dname,iname) from directory
3196 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
3197 if (r != 0) {
3198 goto exit;
3199 }
3200 r = toku_db_create(&db, env, 0);
3201 lazy_assert_zero(r);
3202 r = toku_db_open_iname(db, txn, iname, 0, 0);
3203 if (txn && r) {
3204 if (r == EMFILE || r == ENFILE)
3205 r = toku_ydb_do_error(env, r, "toku dbremove failed because open file limit reached\n");
3206 else if (r != ENOENT)
3207 r = toku_ydb_do_error(env, r, "toku dbremove failed\n");
3208 else
3209 r = 0;
3210 goto exit;
3211 }
3212 if (txn) {
3213 // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
3214 if (env_is_db_with_dname_open(env, dname)) {
3215 r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3216 goto exit;
3217 }
3218 // we know a live db handle does not exist.
3219 //
3220 // use the internally opened db to try and get a table lock
3221 //
3222 // if we can't get it, then some txn needs the ft and we
3223 // should return lock not granted.
3224 //
3225 // otherwise, we're okay in marking this ft as remove on
3226 // commit. no new handles can open for this dictionary
3227 // because the txn has directory write locks on the dname
3228 r = toku_db_pre_acquire_table_lock(db, txn);
3229 if (r != 0) {
3230 r = DB_LOCK_NOTGRANTED;
3231 goto exit;
3232 }
3233 // The ft will be unlinked when the txn commits
3234 toku_ft_unlink_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
3235 }
3236 else {
3237 // unlink the ft without a txn
3238 toku_ft_unlink(db->i->ft_handle);
3239 }
3240
3241 exit:
3242 if (db) {
3243 int ret = toku_db_close(db);
3244 assert(ret == 0);
3245 }
3246 if (iname) {
3247 toku_free(iname);
3248 }
3249 return r;
3250 }
3251
3252 static int
env_dbrename_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3253 env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3254 int r;
3255 if (!fname || !dbname || !newname) r = EINVAL;
3256 else {
3257 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3258 {
3259 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3260 assert(bytes==(int)sizeof(subdb_full_name)-1);
3261 }
3262 char new_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3263 {
3264 int bytes = snprintf(new_full_name, sizeof(new_full_name), "%s/%s", fname, dbname);
3265 assert(bytes==(int)sizeof(new_full_name)-1);
3266 }
3267 const char *null_subdbname = NULL;
3268 r = env_dbrename(env, txn, subdb_full_name, null_subdbname, new_full_name, flags);
3269 }
3270 return r;
3271 }
3272
3273 static int
env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3274 env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3275 int r;
3276 HANDLE_PANICKED_ENV(env);
3277 if (!env_opened(env) || flags != 0) {
3278 return EINVAL;
3279 }
3280 HANDLE_READ_ONLY_TXN(txn);
3281 if (dbname != NULL) {
3282 // env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
3283 return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
3284 }
3285
3286 const char * dname = fname;
3287 assert(dbname == NULL);
3288
3289 // We check for open dnames for the old and new name as a "fast path" to error.
3290 // We will need to check these again later.
3291 if (env_is_db_with_dname_open(env, dname)) {
3292 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3293 }
3294 if (env_is_db_with_dname_open(env, newname)) {
3295 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3296 }
3297
3298 DBT old_dname_dbt;
3299 DBT new_dname_dbt;
3300 DBT iname_dbt;
3301 toku_fill_dbt(&old_dname_dbt, dname, strlen(dname)+1);
3302 toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
3303 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3304
3305 // get iname
3306 r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3307 char *iname = (char *) iname_dbt.data;
3308 if (r == DB_NOTFOUND) {
3309 r = ENOENT;
3310 } else if (r == 0) {
3311 // verify that newname does not already exist
3312 r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
3313 if (r == 0) {
3314 r = EEXIST;
3315 }
3316 else if (r == DB_NOTFOUND) {
3317 DBT new_iname_dbt;
3318 // Do not rename ft file if 'dir_per_db' option is not set
3319 auto new_iname =
3320 env->get_dir_per_db(env)
3321 ? generate_iname_for_rename_or_open(
3322 env, txn, newname, false)
3323 : std::unique_ptr<char[], decltype(&toku_free)>(
3324 toku_strdup(iname), &toku_free);
3325 toku_fill_dbt(
3326 &new_iname_dbt, new_iname.get(), strlen(new_iname.get()) + 1);
3327
3328 // remove old (dname,iname) and insert (newname,iname) in directory
3329 r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
3330 if (r != 0) { goto exit; }
3331
3332 // Do not rename ft file if 'dir_per_db' option is not set
3333 if (env->get_dir_per_db(env))
3334 r = toku_ft_rename_iname(txn,
3335 env->get_data_dir(env),
3336 iname,
3337 new_iname.get(),
3338 env->i->cachetable);
3339
3340 r = toku_db_put(env->i->directory,
3341 txn,
3342 &new_dname_dbt,
3343 &new_iname_dbt,
3344 0,
3345 true);
3346 if (r != 0) { goto exit; }
3347
3348 //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
3349 if (env_is_db_with_dname_open(env, dname)) {
3350 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3351 goto exit;
3352 }
3353 if (env_is_db_with_dname_open(env, newname)) {
3354 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3355 goto exit;
3356 }
3357
3358 // we know a live db handle does not exist.
3359 //
3360 // use the internally opened db to try and get a table lock
3361 //
3362 // if we can't get it, then some txn needs the ft and we
3363 // should return lock not granted.
3364 //
3365 // otherwise, we're okay in marking this ft as remove on
3366 // commit. no new handles can open for this dictionary
3367 // because the txn has directory write locks on the dname
3368 if (txn) {
3369 r = can_acquire_table_lock(env, txn, new_iname.get());
3370 }
3371 // We don't do anything at the ft or cachetable layer for rename.
3372 // We just update entries in the environment's directory.
3373 }
3374 }
3375
3376 exit:
3377 if (iname) {
3378 toku_free(iname);
3379 }
3380 return r;
3381 }
3382
3383 int
DB_CREATE_FUN(DB ** db,DB_ENV * env,uint32_t flags)3384 DB_CREATE_FUN (DB ** db, DB_ENV * env, uint32_t flags) {
3385 int r = toku_db_create(db, env, flags);
3386 return r;
3387 }
3388
3389 /* need db_strerror_r for multiple threads */
3390
3391 const char *
db_strerror(int error)3392 db_strerror(int error) {
3393 char *errorstr;
3394 if (error >= 0) {
3395 errorstr = strerror(error);
3396 if (errorstr)
3397 return errorstr;
3398 }
3399
3400 switch (error) {
3401 case DB_BADFORMAT:
3402 return "Database Bad Format (probably a corrupted database)";
3403 case DB_NOTFOUND:
3404 return "Not found";
3405 case TOKUDB_OUT_OF_LOCKS:
3406 return "Out of locks";
3407 case TOKUDB_DICTIONARY_TOO_OLD:
3408 return "Dictionary too old for this version of PerconaFT";
3409 case TOKUDB_DICTIONARY_TOO_NEW:
3410 return "Dictionary too new for this version of PerconaFT";
3411 case TOKUDB_CANCELED:
3412 return "User cancelled operation";
3413 case TOKUDB_NO_DATA:
3414 return "Ran out of data (not EOF)";
3415 case TOKUDB_HUGE_PAGES_ENABLED:
3416 return "Transparent huge pages are enabled but PerconaFT's memory allocator will oversubscribe main memory with transparent huge pages. This check can be disabled by setting the environment variable TOKU_HUGE_PAGES_OK.";
3417 }
3418
3419 static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
3420 errorstr = unknown_result;
3421 snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
3422 return errorstr;
3423 }
3424
3425 const char *
db_version(int * major,int * minor,int * patch)3426 db_version(int *major, int *minor, int *patch) {
3427 if (major)
3428 *major = DB_VERSION_MAJOR;
3429 if (minor)
3430 *minor = DB_VERSION_MINOR;
3431 if (patch)
3432 *patch = DB_VERSION_PATCH;
3433 return toku_product_name_strings.db_version;
3434 }
3435
3436 // HACK: To ensure toku_pthread_yield gets included in the .so
3437 // non-static would require a prototype in a header
3438 // static (since unused) would give a warning
3439 // static + unused would not actually help toku_pthread_yield get in the .so
3440 // static + used avoids all the warnings and makes sure toku_pthread_yield is in the .so
3441 static void __attribute__((__used__))
include_toku_pthread_yield(void)3442 include_toku_pthread_yield (void) {
3443 toku_pthread_yield();
3444 }
3445
3446 // For test purposes only, translate dname to iname
3447 // YDB lock is NOT held when this function is called,
3448 // as it is called by user
3449 static int
env_get_iname(DB_ENV * env,DBT * dname_dbt,DBT * iname_dbt)3450 env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
3451 DB *directory = env->i->directory;
3452 int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_SERIALIZABLE|DB_PRELOCKED); // allocates memory for iname
3453 return r;
3454 }
3455
3456 // TODO 2216: Patch out this (dangerous) function when loader is working and
3457 // we don't need to test the low-level redirect anymore.
3458 // for use by test programs only, just a wrapper around ft call:
3459 int
toku_test_db_redirect_dictionary(DB * db,const char * dname_of_new_file,DB_TXN * dbtxn)3460 toku_test_db_redirect_dictionary(DB * db, const char * dname_of_new_file, DB_TXN *dbtxn) {
3461 int r;
3462 DBT dname_dbt;
3463 DBT iname_dbt;
3464 char * new_iname_in_env;
3465
3466 FT_HANDLE ft_handle = db->i->ft_handle;
3467 TOKUTXN tokutxn = db_txn_struct_i(dbtxn)->tokutxn;
3468
3469 toku_fill_dbt(&dname_dbt, dname_of_new_file, strlen(dname_of_new_file)+1);
3470 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3471 r = toku_db_get(db->dbenv->i->directory, dbtxn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3472 assert_zero(r);
3473 new_iname_in_env = (char *) iname_dbt.data;
3474
3475 toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
3476 r = toku_dictionary_redirect(new_iname_in_env, ft_handle, tokutxn);
3477 toku_multi_operation_client_unlock();
3478
3479 toku_free(new_iname_in_env);
3480 return r;
3481 }
3482
3483 //Tets only function
3484 uint64_t
toku_test_get_latest_lsn(DB_ENV * env)3485 toku_test_get_latest_lsn(DB_ENV *env) {
3486 LSN rval = ZERO_LSN;
3487 if (env && env->i->logger) {
3488 rval = toku_logger_last_lsn(env->i->logger);
3489 }
3490 return rval.lsn;
3491 }
3492
toku_set_test_txn_sync_callback(void (* cb)(pthread_t,void *),void * extra)3493 void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
3494 set_test_txn_sync_callback(cb, extra);
3495 }
3496
3497 int
toku_test_get_checkpointing_user_data_status(void)3498 toku_test_get_checkpointing_user_data_status (void) {
3499 return toku_cachetable_get_checkpointing_user_data_status();
3500 }
3501
3502 #undef STATUS_VALUE
3503 #undef PERSISTENT_UPGRADE_STATUS_VALUE
3504
3505 #include <toku_race_tools.h>
3506 void __attribute__((constructor)) toku_ydb_helgrind_ignore(void);
3507 void
toku_ydb_helgrind_ignore(void)3508 toku_ydb_helgrind_ignore(void) {
3509 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_layer_status, sizeof ydb_layer_status);
3510 }
3511