1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6
7
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9
10 PerconaFT is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License, version 2,
12 as published by the Free Software Foundation.
13
14 PerconaFT is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
21
22 ----------------------------------------
23
24 PerconaFT is free software: you can redistribute it and/or modify
25 it under the terms of the GNU Affero General Public License, version 3,
26 as published by the Free Software Foundation.
27
28 PerconaFT is distributed in the hope that it will be useful,
29 but WITHOUT ANY WARRANTY; without even the implied warranty of
30 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 GNU Affero General Public License for more details.
32
33 You should have received a copy of the GNU Affero General Public License
34 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38
39 extern const char *toku_patent_string;
40 const char *toku_copyright_string = "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.";
41
42
43 extern int writing_rollback;
44
45 #include <db.h>
46 #include <errno.h>
47 #include <string.h>
48
49 #include "portability/memory.h"
50 #include "portability/toku_assert.h"
51 #include "portability/toku_portability.h"
52 #include "portability/toku_pthread.h"
53 #include "portability/toku_stdlib.h"
54
55 #include "ft/ft-flusher.h"
56 #include "ft/cachetable/cachetable.h"
57 #include "ft/cachetable/checkpoint.h"
58 #include "ft/logger/log.h"
59 #include "ft/loader/loader.h"
60 #include "ft/log_header.h"
61 #include "ft/ft.h"
62 #include "ft/txn/txn_manager.h"
63 #include "src/ydb.h"
64 #include "src/ydb-internal.h"
65 #include "src/ydb_cursor.h"
66 #include "src/ydb_row_lock.h"
67 #include "src/ydb_env_func.h"
68 #include "src/ydb_db.h"
69 #include "src/ydb_write.h"
70 #include "src/ydb_txn.h"
71 #include "src/loader.h"
72 #include "src/indexer.h"
73 #include "util/status.h"
74 #include "util/context.h"
75
76 #include <functional>
77
78 // Include ydb_lib.cc here so that its constructor/destructor gets put into
79 // ydb.o, to make sure they don't get erased at link time (when linking to
80 // a static libtokufractaltree.a that was compiled with gcc). See #5094.
81 #include "ydb_lib.cc"
82
83 #ifdef TOKUTRACE
84 #define DB_ENV_CREATE_FUN db_env_create_toku10
85 #define DB_CREATE_FUN db_create_toku10
86 #else
87 #define DB_ENV_CREATE_FUN db_env_create
88 #define DB_CREATE_FUN db_create
toku_set_trace_file(const char * fname)89 int toku_set_trace_file (const char *fname __attribute__((__unused__))) { return 0; }
toku_close_trace_file(void)90 int toku_close_trace_file (void) { return 0; }
91 #endif
92
93 extern uint force_recovery;
94
95 // Set when env is panicked, never cleared.
96 static int env_is_panicked = 0;
97
98 void
env_panic(DB_ENV * env,int cause,const char * msg)99 env_panic(DB_ENV * env, int cause, const char * msg) {
100 if (cause == 0)
101 cause = -1; // if unknown cause, at least guarantee panic
102 if (msg == NULL)
103 msg = "Unknown cause in env_panic\n";
104 env_is_panicked = cause;
105 env->i->is_panicked = cause;
106 env->i->panic_string = toku_strdup(msg);
107 }
108
109 static int env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp);
110
111 /********************************************************************************
112 * Status is intended for display to humans to help understand system behavior.
113 * It does not need to be perfectly thread-safe.
114 */
115
116 typedef enum {
117 YDB_LAYER_TIME_CREATION = 0, /* timestamp of environment creation, read from persistent environment */
118 YDB_LAYER_TIME_STARTUP, /* timestamp of system startup */
119 YDB_LAYER_TIME_NOW, /* timestamp of engine status query */
120 YDB_LAYER_NUM_DB_OPEN,
121 YDB_LAYER_NUM_DB_CLOSE,
122 YDB_LAYER_NUM_OPEN_DBS,
123 YDB_LAYER_MAX_OPEN_DBS,
124 YDB_LAYER_FSYNC_LOG_PERIOD,
125 #if 0
126 YDB_LAYER_ORIGINAL_ENV_VERSION, /* version of original environment, read from persistent environment */
127 YDB_LAYER_STARTUP_ENV_VERSION, /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
128 YDB_LAYER_LAST_LSN_OF_V13, /* read from persistent environment */
129 YDB_LAYER_UPGRADE_V14_TIME, /* timestamp of upgrade to version 14, read from persistent environment */
130 YDB_LAYER_UPGRADE_V14_FOOTPRINT, /* footprint of upgrade to version 14, read from persistent environment */
131 #endif
132 YDB_LAYER_STATUS_NUM_ROWS /* number of rows in this status array */
133 } ydb_layer_status_entry;
134
135 typedef struct {
136 bool initialized;
137 TOKU_ENGINE_STATUS_ROW_S status[YDB_LAYER_STATUS_NUM_ROWS];
138 } YDB_LAYER_STATUS_S, *YDB_LAYER_STATUS;
139
140 static YDB_LAYER_STATUS_S ydb_layer_status;
141 #define STATUS_VALUE(x) ydb_layer_status.status[x].value.num
142
143 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_layer_status, k, c, t, l, inc)
144
145 static void
ydb_layer_status_init(void)146 ydb_layer_status_init (void) {
147 // Note, this function initializes the keyname, type, and legend fields.
148 // Value fields are initialized to zero by compiler.
149
150 STATUS_INIT(YDB_LAYER_TIME_CREATION, nullptr, UNIXTIME, "time of environment creation", TOKU_ENGINE_STATUS);
151 STATUS_INIT(YDB_LAYER_TIME_STARTUP, nullptr, UNIXTIME, "time of engine startup", TOKU_ENGINE_STATUS);
152 STATUS_INIT(YDB_LAYER_TIME_NOW, nullptr, UNIXTIME, "time now", TOKU_ENGINE_STATUS);
153 STATUS_INIT(YDB_LAYER_NUM_DB_OPEN, DB_OPENS, UINT64, "db opens", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
154 STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE, DB_CLOSES, UINT64, "db closes", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
155 STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS, DB_OPEN_CURRENT, UINT64, "num open dbs now", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
156 STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS, DB_OPEN_MAX, UINT64, "max open dbs", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
157 STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD, nullptr, UINT64, "period, in ms, that recovery log is automatically fsynced", TOKU_ENGINE_STATUS);
158
159 STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
160 ydb_layer_status.initialized = true;
161 }
162 #undef STATUS_INIT
163
164 static void
ydb_layer_get_status(DB_ENV * env,YDB_LAYER_STATUS statp)165 ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
166 STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
167 STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
168 *statp = ydb_layer_status;
169 }
170
171 /********************************************************************************
172 * End of ydb_layer local status section.
173 */
174
175 static DB_ENV * volatile most_recent_env; // most recently opened env, used for engine status on crash. Note there are likely to be races on this if you have multiple threads creating and closing environments in parallel. We'll declare it volatile since at least that helps make sure the compiler doesn't optimize away certain code (e.g., if while debugging, you write a code that spins on most_recent_env, you'd like to compiler not to optimize your code away.)
176
177 static int env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt);
178 static int toku_maybe_get_engine_status_text (char* buff, int buffsize); // for use by toku_assert
179 static int toku_maybe_err_engine_status (void);
180 static void toku_maybe_set_env_panic(int code, const char * msg); // for use by toku_assert
181
182 int
toku_ydb_init(void)183 toku_ydb_init(void) {
184 int r = 0;
185 //Lower level must be initialized first.
186 r = toku_ft_layer_init();
187 return r;
188 }
189
190 // Do not clean up resources if env is panicked, just exit ugly
191 void
toku_ydb_destroy(void)192 toku_ydb_destroy(void) {
193 if (!ydb_layer_status.initialized)
194 return;
195 if (env_is_panicked == 0) {
196 toku_ft_layer_destroy();
197 }
198 ydb_layer_status.initialized = false;
199 }
200
201 static int
ydb_getf_do_nothing(DBT const * UU (key),DBT const * UU (val),void * UU (extra))202 ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
203 return 0;
204 }
205
206 /* env methods */
207
208 static void
env_fs_report_in_yellow(DB_ENV * UU (env))209 env_fs_report_in_yellow(DB_ENV *UU(env)) {
210 char tbuf[26];
211 time_t tnow = time(NULL);
212 fprintf(stderr, "%.24s PerconaFT file system space is low\n", ctime_r(&tnow, tbuf)); fflush(stderr);
213 }
214
215 static void
env_fs_report_in_red(DB_ENV * UU (env))216 env_fs_report_in_red(DB_ENV *UU(env)) {
217 char tbuf[26];
218 time_t tnow = time(NULL);
219 fprintf(stderr, "%.24s PerconaFT file system space is really low and access is restricted\n", ctime_r(&tnow, tbuf)); fflush(stderr);
220 }
221
222 static inline uint64_t
env_fs_redzone(DB_ENV * env,uint64_t total)223 env_fs_redzone(DB_ENV *env, uint64_t total) {
224 return total * env->i->redzone / 100;
225 }
226
227 #define ZONEREPORTLIMIT 12
228 // Check the available space in the file systems used by tokuft and erect barriers when available space gets low.
229 static int
env_fs_poller(void * arg)230 env_fs_poller(void *arg) {
231 if(force_recovery == 6) {
232 return 0;
233 }
234 DB_ENV *env = (DB_ENV *) arg;
235 int r;
236
237 int in_yellow; // set true to issue warning to user
238 int in_red; // set true to prevent certain operations (returning ENOSPC)
239
240 // get the fs sizes for the home dir
241 uint64_t avail_size = 0, total_size = 0;
242 r = toku_get_filesystem_sizes(env->i->dir, &avail_size, NULL, &total_size);
243 assert(r == 0);
244 in_yellow = (avail_size < 2 * env_fs_redzone(env, total_size));
245 in_red = (avail_size < env_fs_redzone(env, total_size));
246
247 // get the fs sizes for the data dir if different than the home dir
248 if (strcmp(env->i->dir, env->i->real_data_dir) != 0) {
249 r = toku_get_filesystem_sizes(env->i->real_data_dir, &avail_size, NULL, &total_size);
250 assert(r == 0);
251 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
252 in_red += (avail_size < env_fs_redzone(env, total_size));
253 }
254
255 // get the fs sizes for the log dir if different than the home dir and data dir
256 if (strcmp(env->i->dir, env->i->real_log_dir) != 0 && strcmp(env->i->real_data_dir, env->i->real_log_dir) != 0) {
257 r = toku_get_filesystem_sizes(env->i->real_log_dir, &avail_size, NULL, &total_size);
258 assert(r == 0);
259 in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
260 in_red += (avail_size < env_fs_redzone(env, total_size));
261 }
262
263 env->i->fs_seq++; // how many times through this polling loop?
264 uint64_t now = env->i->fs_seq;
265
266 // Don't issue report if we have not been out of this fs_state for a while, unless we're at system startup
267 switch (env->i->fs_state) {
268 case FS_RED:
269 if (!in_red) {
270 if (in_yellow) {
271 env->i->fs_state = FS_YELLOW;
272 } else {
273 env->i->fs_state = FS_GREEN;
274 }
275 }
276 break;
277 case FS_YELLOW:
278 if (in_red) {
279 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
280 env_fs_report_in_red(env);
281 env->i->fs_state = FS_RED;
282 env->i->last_seq_entered_red = now;
283 } else if (!in_yellow) {
284 env->i->fs_state = FS_GREEN;
285 }
286 break;
287 case FS_GREEN:
288 if (in_red) {
289 if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
290 env_fs_report_in_red(env);
291 env->i->fs_state = FS_RED;
292 env->i->last_seq_entered_red = now;
293 } else if (in_yellow) {
294 if ((now - env->i->last_seq_entered_yellow > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
295 env_fs_report_in_yellow(env);
296 env->i->fs_state = FS_YELLOW;
297 env->i->last_seq_entered_yellow = now;
298 }
299 break;
300 default:
301 assert(0);
302 }
303 return 0;
304 }
305 #undef ZONEREPORTLIMIT
306
307 static void
env_fs_init(DB_ENV * env)308 env_fs_init(DB_ENV *env) {
309 env->i->fs_state = FS_GREEN;
310 env->i->fs_poll_time = 5; // seconds
311 env->i->redzone = 5; // percent of total space
312 env->i->fs_poller_is_init = false;
313 }
314
315 // Initialize the minicron that polls file system space
316 static int
env_fs_init_minicron(DB_ENV * env)317 env_fs_init_minicron(DB_ENV *env) {
318 if(force_recovery == 6) {
319 return 0;
320 }
321 int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
322 if (r == 0)
323 env->i->fs_poller_is_init = true;
324 return r;
325 }
326
327 // Destroy the file system space minicron
328 static void
env_fs_destroy(DB_ENV * env)329 env_fs_destroy(DB_ENV *env) {
330 if (env->i->fs_poller_is_init) {
331 int r = toku_minicron_shutdown(&env->i->fs_poller);
332 assert(r == 0);
333 env->i->fs_poller_is_init = false;
334 }
335 }
336
337 static int
env_fsync_log_on_minicron(void * arg)338 env_fsync_log_on_minicron(void *arg) {
339 DB_ENV *env = (DB_ENV *) arg;
340 int r = env->log_flush(env, 0);
341 assert(r == 0);
342 return 0;
343 }
344
345 static void
env_fsync_log_init(DB_ENV * env)346 env_fsync_log_init(DB_ENV *env) {
347 env->i->fsync_log_period_ms = 0;
348 env->i->fsync_log_cron_is_init = false;
349 }
350
UU()351 static void UU()
352 env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
353 env->i->fsync_log_period_ms = period_ms;
354 if (env->i->fsync_log_cron_is_init) {
355 toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
356 }
357 }
358
359 static int
env_fsync_log_cron_init(DB_ENV * env)360 env_fsync_log_cron_init(DB_ENV *env) {
361 int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
362 if (r == 0)
363 env->i->fsync_log_cron_is_init = true;
364 return r;
365 }
366
367 static void
env_fsync_log_cron_destroy(DB_ENV * env)368 env_fsync_log_cron_destroy(DB_ENV *env) {
369 if (env->i->fsync_log_cron_is_init) {
370 int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
371 assert(r == 0);
372 env->i->fsync_log_cron_is_init = false;
373 }
374 }
375
376 static void
env_setup_real_dir(DB_ENV * env,char ** real_dir,const char * nominal_dir)377 env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
378 toku_free(*real_dir);
379 *real_dir = NULL;
380
381 assert(env->i->dir);
382 if (nominal_dir)
383 *real_dir = toku_construct_full_name(2, env->i->dir, nominal_dir);
384 else
385 *real_dir = toku_strdup(env->i->dir);
386 }
387
388 static void
env_setup_real_data_dir(DB_ENV * env)389 env_setup_real_data_dir(DB_ENV *env) {
390 env_setup_real_dir(env, &env->i->real_data_dir, env->i->data_dir);
391 }
392
393 static void
env_setup_real_log_dir(DB_ENV * env)394 env_setup_real_log_dir(DB_ENV *env) {
395 env_setup_real_dir(env, &env->i->real_log_dir, env->i->lg_dir);
396 }
397
398 static void
env_setup_real_tmp_dir(DB_ENV * env)399 env_setup_real_tmp_dir(DB_ENV *env) {
400 env_setup_real_dir(env, &env->i->real_tmp_dir, env->i->tmp_dir);
401 }
402
keep_cachetable_callback(DB_ENV * env,CACHETABLE cachetable)403 static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
404 {
405 env->i->cachetable = cachetable;
406 }
407
408 static int
ydb_do_recovery(DB_ENV * env)409 ydb_do_recovery (DB_ENV *env) {
410 assert(env->i->real_log_dir);
411 int r = tokuft_recover(env,
412 toku_keep_prepared_txn_callback,
413 keep_cachetable_callback,
414 env->i->logger,
415 env->i->dir, env->i->real_log_dir, env->i->bt_compare,
416 env->i->update_function,
417 env->i->generate_row_for_put, env->i->generate_row_for_del,
418 env->i->cachetable_size);
419 return r;
420 }
421
422 static int
needs_recovery(DB_ENV * env)423 needs_recovery (DB_ENV *env) {
424 assert(env->i->real_log_dir);
425 int recovery_needed = tokuft_needs_recovery(env->i->real_log_dir, true);
426 return recovery_needed ? DB_RUNRECOVERY : 0;
427 }
428
429 static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, uint32_t flags);
430
431 // Keys used in persistent environment dictionary:
432 // Following keys added in version 12
433 static const char * orig_env_ver_key = "original_version";
434 static const char * curr_env_ver_key = "current_version";
435 // Following keys added in version 14, add more keys for future versions
436 static const char * creation_time_key = "creation_time";
437
get_upgrade_time_key(int version)438 static char * get_upgrade_time_key(int version) {
439 static char upgrade_time_key[sizeof("upgrade_v_time") + 12];
440 {
441 int n;
442 n = snprintf(upgrade_time_key, sizeof(upgrade_time_key), "upgrade_v%d_time", version);
443 assert(n >= 0 && n < (int)sizeof(upgrade_time_key));
444 }
445 return &upgrade_time_key[0];
446 }
447
get_upgrade_footprint_key(int version)448 static char * get_upgrade_footprint_key(int version) {
449 static char upgrade_footprint_key[sizeof("upgrade_v_footprint") + 12];
450 {
451 int n;
452 n = snprintf(upgrade_footprint_key, sizeof(upgrade_footprint_key), "upgrade_v%d_footprint", version);
453 assert(n >= 0 && n < (int)sizeof(upgrade_footprint_key));
454 }
455 return &upgrade_footprint_key[0];
456 }
457
get_upgrade_last_lsn_key(int version)458 static char * get_upgrade_last_lsn_key(int version) {
459 static char upgrade_last_lsn_key[sizeof("upgrade_v_last_lsn") + 12];
460 {
461 int n;
462 n = snprintf(upgrade_last_lsn_key, sizeof(upgrade_last_lsn_key), "upgrade_v%d_last_lsn", version);
463 assert(n >= 0 && n < (int)sizeof(upgrade_last_lsn_key));
464 }
465 return &upgrade_last_lsn_key[0];
466 }
467
468 // Values read from (or written into) persistent environment,
469 // kept here for read-only access from engine status.
470 // Note, persistent_upgrade_status info is separate in part to simplify its exclusion from engine status until relevant.
471 typedef enum {
472 PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION = 0,
473 PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, // read from curr_env_ver_key, prev version as of this startup
474 PERSISTENT_UPGRADE_LAST_LSN_OF_V13,
475 PERSISTENT_UPGRADE_V14_TIME,
476 PERSISTENT_UPGRADE_V14_FOOTPRINT,
477 PERSISTENT_UPGRADE_STATUS_NUM_ROWS
478 } persistent_upgrade_status_entry;
479
480 typedef struct {
481 bool initialized;
482 TOKU_ENGINE_STATUS_ROW_S status[PERSISTENT_UPGRADE_STATUS_NUM_ROWS];
483 } PERSISTENT_UPGRADE_STATUS_S, *PERSISTENT_UPGRADE_STATUS;
484
485 static PERSISTENT_UPGRADE_STATUS_S persistent_upgrade_status;
486
487 #define PERSISTENT_UPGRADE_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(persistent_upgrade_status, k, c, t, "upgrade: " l, inc)
488
489 static void
persistent_upgrade_status_init(void)490 persistent_upgrade_status_init (void) {
491 // Note, this function initializes the keyname, type, and legend fields.
492 // Value fields are initialized to zero by compiler.
493
494 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION, nullptr, UINT64, "original version (at time of environment creation)", TOKU_ENGINE_STATUS);
495 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP, nullptr, UINT64, "version at time of startup", TOKU_ENGINE_STATUS);
496 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_LAST_LSN_OF_V13, nullptr, UINT64, "last LSN of version 13", TOKU_ENGINE_STATUS);
497 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_TIME, nullptr, UNIXTIME, "time of upgrade to version 14", TOKU_ENGINE_STATUS);
498 PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_FOOTPRINT, nullptr, UINT64, "footprint from version 13 to 14", TOKU_ENGINE_STATUS);
499 persistent_upgrade_status.initialized = true;
500 }
501
502 #define PERSISTENT_UPGRADE_STATUS_VALUE(x) persistent_upgrade_status.status[x].value.num
503
504 // Requires: persistent environment dictionary is already open.
505 // Input arg is lsn of clean shutdown of previous version,
506 // or ZERO_LSN if no upgrade or if crash between log upgrade and here.
507 // NOTE: To maintain compatibility with previous versions, do not change the
508 // format of any information stored in the persistent environment dictionary.
509 // For example, some values are stored as 32 bits, even though they are immediately
510 // converted to 64 bits when read. Do not change them to be stored as 64 bits.
511 //
512 static int
maybe_upgrade_persistent_environment_dictionary(DB_ENV * env,DB_TXN * txn,LSN last_lsn_of_clean_shutdown_read_from_log)513 maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN last_lsn_of_clean_shutdown_read_from_log) {
514 int r;
515 DBT key, val;
516 DB *persistent_environment = env->i->persistent_environment;
517
518 if (!persistent_upgrade_status.initialized)
519 persistent_upgrade_status_init();
520
521 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
522 toku_init_dbt(&val);
523 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
524 assert(r == 0);
525 uint32_t stored_env_version = toku_dtoh32(*(uint32_t*)val.data);
526 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP) = stored_env_version;
527 if (stored_env_version > FT_LAYOUT_VERSION)
528 r = TOKUDB_DICTIONARY_TOO_NEW;
529 else if (stored_env_version < FT_LAYOUT_MIN_SUPPORTED_VERSION)
530 r = TOKUDB_DICTIONARY_TOO_OLD;
531 else if (stored_env_version < FT_LAYOUT_VERSION) {
532 const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
533 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
534 toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
535 r = toku_db_put(persistent_environment, txn, &key, &val, 0, false);
536 assert_zero(r);
537
538 time_t upgrade_time_d = toku_htod64(time(NULL));
539 uint64_t upgrade_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
540 uint64_t upgrade_last_lsn_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
541 for (int version = stored_env_version+1; version <= FT_LAYOUT_VERSION; version++) {
542 uint32_t put_flag = DB_NOOVERWRITE;
543 if (version <= FT_LAYOUT_VERSION_19) {
544 // See #5902.
545 // To prevent a crash (and any higher complexity code) we'll simply
546 // silently not overwrite anything if it exists.
547 // The keys existing for version <= 19 is not necessarily an error.
548 // If this happens for versions > 19 it IS an error and we'll use DB_NOOVERWRITE.
549 put_flag = DB_NOOVERWRITE_NO_ERROR;
550 }
551
552
553 char* upgrade_time_key = get_upgrade_time_key(version);
554 toku_fill_dbt(&key, upgrade_time_key, strlen(upgrade_time_key));
555 toku_fill_dbt(&val, &upgrade_time_d, sizeof(upgrade_time_d));
556 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
557 assert_zero(r);
558
559 char* upgrade_footprint_key = get_upgrade_footprint_key(version);
560 toku_fill_dbt(&key, upgrade_footprint_key, strlen(upgrade_footprint_key));
561 toku_fill_dbt(&val, &upgrade_footprint_d, sizeof(upgrade_footprint_d));
562 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
563 assert_zero(r);
564
565 char* upgrade_last_lsn_key = get_upgrade_last_lsn_key(version);
566 toku_fill_dbt(&key, upgrade_last_lsn_key, strlen(upgrade_last_lsn_key));
567 toku_fill_dbt(&val, &upgrade_last_lsn_d, sizeof(upgrade_last_lsn_d));
568 r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
569 assert_zero(r);
570 }
571
572 }
573 return r;
574 }
575
576 // Capture contents of persistent_environment dictionary so that it can be read by engine status
577 static void
capture_persistent_env_contents(DB_ENV * env,DB_TXN * txn)578 capture_persistent_env_contents (DB_ENV * env, DB_TXN * txn) {
579 int r;
580 DBT key, val;
581 DB *persistent_environment = env->i->persistent_environment;
582
583 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
584 toku_init_dbt(&val);
585 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
586 assert_zero(r);
587 uint32_t curr_env_version = toku_dtoh32(*(uint32_t*)val.data);
588 assert(curr_env_version == FT_LAYOUT_VERSION);
589
590 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
591 toku_init_dbt(&val);
592 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
593 assert_zero(r);
594 uint64_t persistent_original_env_version = toku_dtoh32(*(uint32_t*)val.data);
595 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION) = persistent_original_env_version;
596 assert(persistent_original_env_version <= curr_env_version);
597
598 // make no assertions about timestamps, clock may have been reset
599 if (persistent_original_env_version >= FT_LAYOUT_VERSION_14) {
600 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
601 toku_init_dbt(&val);
602 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
603 assert_zero(r);
604 STATUS_VALUE(YDB_LAYER_TIME_CREATION) = toku_dtoh64((*(time_t*)val.data));
605 }
606
607 if (persistent_original_env_version != curr_env_version) {
608 // an upgrade was performed at some time, capture info about the upgrade
609
610 char * last_lsn_key = get_upgrade_last_lsn_key(curr_env_version);
611 toku_fill_dbt(&key, last_lsn_key, strlen(last_lsn_key));
612 toku_init_dbt(&val);
613 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
614 assert_zero(r);
615 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_LAST_LSN_OF_V13) = toku_dtoh64(*(uint64_t*)val.data);
616
617 char * time_key = get_upgrade_time_key(curr_env_version);
618 toku_fill_dbt(&key, time_key, strlen(time_key));
619 toku_init_dbt(&val);
620 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
621 assert_zero(r);
622 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_TIME) = toku_dtoh64(*(time_t*)val.data);
623
624 char * footprint_key = get_upgrade_footprint_key(curr_env_version);
625 toku_fill_dbt(&key, footprint_key, strlen(footprint_key));
626 toku_init_dbt(&val);
627 r = toku_db_get(persistent_environment, txn, &key, &val, 0);
628 assert_zero(r);
629 PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_FOOTPRINT) = toku_dtoh64(*(uint64_t*)val.data);
630 }
631
632 }
633
634 // return 0 if log exists or ENOENT if log does not exist
635 static int
ydb_recover_log_exists(DB_ENV * env)636 ydb_recover_log_exists(DB_ENV *env) {
637 int r = tokuft_recover_log_exists(env->i->real_log_dir);
638 return r;
639 }
640
641 // Validate that all required files are present, no side effects.
642 // Return 0 if all is well, ENOENT if some files are present but at least one is
643 // missing,
644 // other non-zero value if some other error occurs.
645 // Set *valid_newenv if creating a new environment (all files missing).
646 // (Note, if special dictionaries exist, then they were created transactionally
647 // and log should exist.)
validate_env(DB_ENV * env,bool * valid_newenv,bool need_rollback_cachefile)648 static int validate_env(DB_ENV *env,
649 bool *valid_newenv,
650 bool need_rollback_cachefile) {
651 int r;
652 bool expect_newenv = false; // set true if we expect to create a new env
653 toku_struct_stat buf;
654 char *path = NULL;
655
656 // Test for persistent environment
657 path = toku_construct_full_name(
658 2, env->i->dir, toku_product_name_strings.environmentdictionary);
659 assert(path);
660 r = toku_stat(path, &buf, toku_uninstrumented);
661 if (r == 0) {
662 expect_newenv = false; // persistent info exists
663 } else {
664 int stat_errno = get_error_errno();
665 if (stat_errno == ENOENT) {
666 expect_newenv = true;
667 r = 0;
668 } else {
669 r = toku_ydb_do_error(
670 env,
671 stat_errno,
672 "Unable to access persistent environment [%s] in [%s]\n",
673 toku_product_name_strings.environmentdictionary,
674 env->i->dir);
675 assert(r);
676 }
677 }
678 toku_free(path);
679
680 // Test for existence of rollback cachefile if it is expected to exist
681 if (r == 0 && need_rollback_cachefile) {
682 path = toku_construct_full_name(
683 2, env->i->dir, toku_product_name_strings.rollback_cachefile);
684 assert(path);
685 r = toku_stat(path, &buf, toku_uninstrumented);
686 if (r == 0) {
687 if (expect_newenv) // rollback cachefile exists, but persistent env
688 // is missing
689 r = toku_ydb_do_error(
690 env,
691 ENOENT,
692 "Persistent environment is missing while looking for "
693 "rollback cachefile [%s] in [%s]\n",
694 toku_product_name_strings.rollback_cachefile, env->i->dir);
695 } else {
696 int stat_errno = get_error_errno();
697 if (stat_errno == ENOENT) {
698 if (!expect_newenv) // rollback cachefile is missing but
699 // persistent env exists
700 r = toku_ydb_do_error(
701 env,
702 ENOENT,
703 "rollback cachefile [%s] is missing from [%s]\n",
704 toku_product_name_strings.rollback_cachefile,
705 env->i->dir);
706 else
707 r = 0; // both rollback cachefile and persistent env are
708 // missing
709 } else {
710 r = toku_ydb_do_error(
711 env,
712 stat_errno,
713 "Unable to access rollback cachefile [%s] in [%s]\n",
714 toku_product_name_strings.rollback_cachefile,
715 env->i->dir);
716 assert(r);
717 }
718 }
719 toku_free(path);
720 }
721
722 // Test for fileops directory
723 if (r == 0 && force_recovery != 6) {
724 path = toku_construct_full_name(
725 2, env->i->dir, toku_product_name_strings.fileopsdirectory);
726 assert(path);
727 r = toku_stat(path, &buf, toku_uninstrumented);
728 if (r == 0) {
729 if (expect_newenv) // fileops directory exists, but persistent env
730 // is missing
731 r = toku_ydb_do_error(
732 env,
733 ENOENT,
734 "Persistent environment is missing while looking for "
735 "fileops directory [%s] in [%s]\n",
736 toku_product_name_strings.fileopsdirectory,
737 env->i->dir);
738 } else {
739 int stat_errno = get_error_errno();
740 if (stat_errno == ENOENT) {
741 if (!expect_newenv) // fileops directory is missing but
742 // persistent env exists
743 r = toku_ydb_do_error(
744 env,
745 ENOENT,
746 "Fileops directory [%s] is missing from [%s]\n",
747 toku_product_name_strings.fileopsdirectory,
748 env->i->dir);
749 else
750 r = 0; // both fileops directory and persistent env are
751 // missing
752 } else {
753 r = toku_ydb_do_error(
754 env,
755 stat_errno,
756 "Unable to access fileops directory [%s] in [%s]\n",
757 toku_product_name_strings.fileopsdirectory,
758 env->i->dir);
759 assert(r);
760 }
761 }
762 toku_free(path);
763 }
764
765 // Test for recovery log
766 if ((r == 0) && (env->i->open_flags & DB_INIT_LOG) && force_recovery != 6) {
767 // if using transactions, test for existence of log
768 r = ydb_recover_log_exists(env); // return 0 or ENOENT
769 if (expect_newenv && (r != ENOENT))
770 r = toku_ydb_do_error(env,
771 ENOENT,
772 "Persistent environment information is "
773 "missing (but log exists) while looking for "
774 "recovery log files in [%s]\n",
775 env->i->real_log_dir);
776 else if (!expect_newenv && r == ENOENT)
777 r = toku_ydb_do_error(env,
778 ENOENT,
779 "Recovery log is missing (persistent "
780 "environment information is present) while "
781 "looking for recovery log files in [%s]\n",
782 env->i->real_log_dir);
783 else
784 r = 0;
785 }
786
787 if (r == 0)
788 *valid_newenv = expect_newenv;
789 else
790 *valid_newenv = false;
791 return r;
792 }
793
794 // The version of the environment (on disk) is the version of the recovery log.
795 // If the recovery log is of the current version, then there is no upgrade to be done.
796 // If the recovery log is of an old version, then replacing it with a new recovery log
797 // of the current version is how the upgrade is done.
798 // Note, the upgrade procedure takes a checkpoint, so we must release the ydb lock.
799 static int
ydb_maybe_upgrade_env(DB_ENV * env,LSN * last_lsn_of_clean_shutdown_read_from_log,bool * upgrade_in_progress)800 ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, bool * upgrade_in_progress) {
801 int r = 0;
802 if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
803 r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
804 }
805 return r;
806 }
807
808 static void
unlock_single_process(DB_ENV * env)809 unlock_single_process(DB_ENV *env) {
810 int r;
811 r = toku_single_process_unlock(&env->i->envdir_lockfd);
812 lazy_assert_zero(r);
813 r = toku_single_process_unlock(&env->i->datadir_lockfd);
814 lazy_assert_zero(r);
815 r = toku_single_process_unlock(&env->i->logdir_lockfd);
816 lazy_assert_zero(r);
817 r = toku_single_process_unlock(&env->i->tmpdir_lockfd);
818 lazy_assert_zero(r);
819 }
820
821 // Open the environment.
822 // If this is a new environment, then create the necessary files.
823 // Return 0 on success, ENOENT if any of the expected necessary files are missing.
824 // (The set of necessary files is defined in the function validate_env() above.)
825 static int
env_open(DB_ENV * env,const char * home,uint32_t flags,int mode)826 env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
827
828 if(force_recovery == 6) {
829 {
830 const int len = strlen(toku_product_name_strings.rollback_cachefile);
831 toku_product_name_strings.rollback_cachefile[len] = '2';
832 toku_product_name_strings.rollback_cachefile[len+1] = 0;
833 }
834
835 {
836 const int len = strlen(toku_product_name_strings.single_process_lock);
837 toku_product_name_strings.single_process_lock[len] = '2';
838 toku_product_name_strings.single_process_lock[len+1] = 0;
839 }
840
841 {
842 const int len = strlen(toku_product_name_strings.environmentdictionary);
843 toku_product_name_strings.environmentdictionary[len] = '2';
844 toku_product_name_strings.environmentdictionary[len+1] = 0;
845 }
846 }
847
848 HANDLE_PANICKED_ENV(env);
849 int r;
850 bool newenv; // true iff creating a new environment
851 uint32_t unused_flags=flags;
852 CHECKPOINTER cp;
853 DB_TXN *txn = NULL;
854
855 if (env_opened(env)) {
856 r = toku_ydb_do_error(env, EINVAL, "The environment is already open\n");
857 goto cleanup;
858 }
859
860 if (env->get_check_thp(env) && toku_os_huge_pages_enabled()) {
861 r = toku_ydb_do_error(env, TOKUDB_HUGE_PAGES_ENABLED,
862 "Huge pages are enabled, disable them before continuing\n");
863 goto cleanup;
864 }
865
866 most_recent_env = NULL;
867
868 assert(sizeof(time_t) == sizeof(uint64_t));
869
870 HANDLE_EXTRA_FLAGS(env, flags,
871 DB_CREATE|DB_PRIVATE|DB_INIT_LOG|DB_INIT_TXN|DB_RECOVER|DB_INIT_MPOOL|DB_INIT_LOCK|DB_THREAD);
872
873 // DB_CREATE means create if env does not exist, and PerconaFT requires it because
874 // PerconaFT requries DB_PRIVATE.
875 if ((flags & DB_PRIVATE) && !(flags & DB_CREATE)) {
876 r = toku_ydb_do_error(env, ENOENT, "DB_PRIVATE requires DB_CREATE (seems gratuitous to us, but that's BDB's behavior\n");
877 goto cleanup;
878 }
879
880 if (!(flags & DB_PRIVATE)) {
881 r = toku_ydb_do_error(env, ENOENT, "PerconaFT requires DB_PRIVATE\n");
882 goto cleanup;
883 }
884
885 if ((flags & DB_INIT_LOG) && !(flags & DB_INIT_TXN)) {
886 r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires transactions for logging\n");
887 goto cleanup;
888 }
889
890 if (!home) home = ".";
891
892 // Verify that the home exists.
893 toku_struct_stat buf;
894 r = toku_stat(home, &buf, toku_uninstrumented);
895 if (r != 0) {
896 int e = get_error_errno();
897 r = toku_ydb_do_error(
898 env, e, "Error from toku_stat(\"%s\",...)\n", home);
899 goto cleanup;
900 }
901 unused_flags &= ~DB_PRIVATE;
902
903 if (env->i->dir) {
904 toku_free(env->i->dir);
905 }
906 env->i->dir = toku_strdup(home);
907 if (env->i->dir == 0) {
908 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
909 goto cleanup;
910 }
911 env->i->open_flags = flags;
912 env->i->open_mode = mode;
913
914 // Instrumentation probe start
915 TOKU_PROBE_START(toku_instr_probe_1);
916
917 env_setup_real_data_dir(env);
918 env_setup_real_log_dir(env);
919 env_setup_real_tmp_dir(env);
920
921 // Instrumentation probe stop
922 toku_instr_probe_1->stop();
923
924 r = toku_single_process_lock(
925 env->i->dir, "environment", &env->i->envdir_lockfd);
926 if (r != 0)
927 goto cleanup;
928 r = toku_single_process_lock(
929 env->i->real_data_dir, "data", &env->i->datadir_lockfd);
930 if (r!=0) goto cleanup;
931 r = toku_single_process_lock(env->i->real_log_dir, "logs", &env->i->logdir_lockfd);
932 if (r!=0) goto cleanup;
933 r = toku_single_process_lock(env->i->real_tmp_dir, "temp", &env->i->tmpdir_lockfd);
934 if (r!=0) goto cleanup;
935
936 bool need_rollback_cachefile;
937 need_rollback_cachefile = false;
938 if (flags & (DB_INIT_TXN | DB_INIT_LOG) && force_recovery != 6) {
939 need_rollback_cachefile = true;
940 }
941
942 ydb_layer_status_init(); // do this before possibly upgrading, so upgrade work is counted in status counters
943
944 LSN last_lsn_of_clean_shutdown_read_from_log;
945 last_lsn_of_clean_shutdown_read_from_log = ZERO_LSN;
946 bool upgrade_in_progress;
947 upgrade_in_progress = false;
948 r = ydb_maybe_upgrade_env(env, &last_lsn_of_clean_shutdown_read_from_log, &upgrade_in_progress);
949 if (r!=0) goto cleanup;
950
951 if (upgrade_in_progress || force_recovery == 6) {
952 // Delete old rollback file. There was a clean shutdown, so it has nothing useful,
953 // and there is no value in upgrading it. It is simpler to just create a new one.
954 char* rollback_filename = toku_construct_full_name(2, env->i->dir, toku_product_name_strings.rollback_cachefile);
955 assert(rollback_filename);
956 r = unlink(rollback_filename);
957 if (r != 0) {
958 assert(get_error_errno() == ENOENT);
959 }
960 toku_free(rollback_filename);
961 need_rollback_cachefile = false; // we're not expecting it to exist now
962 }
963
964 r = validate_env(env, &newenv, need_rollback_cachefile); // make sure that environment is either new or complete
965 if (r != 0) goto cleanup;
966
967 unused_flags &= ~DB_INIT_TXN & ~DB_INIT_LOG;
968
969 if(force_recovery == 6) {
970 flags |= DB_INIT_LOG | DB_INIT_TXN;
971 }
972
973 // do recovery only if there exists a log and recovery is requested
974 // otherwise, a log is created when the logger is opened later
975 if (!newenv && force_recovery == 0) {
976 if (flags & DB_INIT_LOG) {
977 // the log does exist
978 if (flags & DB_RECOVER) {
979 r = ydb_do_recovery(env);
980 if (r != 0) goto cleanup;
981 } else {
982 // the log is required to have clean shutdown if recovery is not requested
983 r = needs_recovery(env);
984 if (r != 0) goto cleanup;
985 }
986 }
987 }
988
989 toku_loader_cleanup_temp_files(env);
990
991 if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
992 assert(env->i->logger);
993 toku_logger_write_log_files(env->i->logger, (bool)((flags & DB_INIT_LOG) != 0));
994 if (!toku_logger_is_open(env->i->logger)) {
995 r = toku_logger_open(env->i->real_log_dir, env->i->logger);
996 if (r!=0) {
997 toku_ydb_do_error(env, r, "Could not open logger\n");
998 }
999 }
1000 } else {
1001 r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
1002 assert_zero(r);
1003 }
1004
1005 unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
1006 unused_flags &= ~DB_CREATE; // we always do DB_CREATE
1007 unused_flags &= ~DB_INIT_LOCK; // we check this later (e.g. in db->open)
1008 unused_flags &= ~DB_RECOVER;
1009
1010 // This is probably correct, but it will be pain...
1011 // if ((flags & DB_THREAD)==0) {
1012 // r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires DB_THREAD");
1013 // goto cleanup;
1014 // }
1015 unused_flags &= ~DB_THREAD;
1016
1017 if (unused_flags!=0) {
1018 r = toku_ydb_do_error(env, EINVAL, "Extra flags not understood by tokuft: %u\n", unused_flags);
1019 goto cleanup;
1020 }
1021
1022 if (env->i->cachetable==NULL) {
1023 // If we ran recovery then the cachetable should be set here.
1024 r = toku_cachetable_create_ex(&env->i->cachetable, env->i->cachetable_size,
1025 env->i->client_pool_threads,
1026 env->i->cachetable_pool_threads,
1027 env->i->checkpoint_pool_threads,
1028 ZERO_LSN, env->i->logger);
1029 if (r != 0) {
1030 r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
1031 goto cleanup;
1032 }
1033 }
1034
1035 toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
1036
1037 int using_txns;
1038 using_txns = env->i->open_flags & DB_INIT_TXN;
1039 if (env->i->logger) {
1040 // if this is a newborn env or if this is an upgrade, then create a brand new rollback file
1041 assert (using_txns);
1042 toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
1043 if (!toku_logger_rollback_is_open(env->i->logger)) {
1044 bool create_new_rollback_file = newenv | upgrade_in_progress | (force_recovery == 6);
1045 r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
1046 if (r != 0) {
1047 r = toku_ydb_do_error(env, r, "Cant open rollback\n");
1048 goto cleanup;
1049 }
1050 }
1051 }
1052
1053 if (using_txns) {
1054 r = toku_txn_begin(env, 0, &txn, 0);
1055 assert_zero(r);
1056 }
1057
1058 {
1059 r = toku_db_create(&env->i->persistent_environment, env, 0);
1060 assert_zero(r);
1061 r = toku_db_use_builtin_key_cmp(env->i->persistent_environment);
1062 assert_zero(r);
1063 writing_rollback++;
1064 r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
1065 if (r != 0) {
1066 r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
1067 goto cleanup;
1068 }
1069 if (newenv) {
1070 // create new persistent_environment
1071 DBT key, val;
1072 uint32_t persistent_original_env_version = FT_LAYOUT_VERSION;
1073 const uint32_t environment_version = toku_htod32(persistent_original_env_version);
1074
1075 toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
1076 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1077 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1078 assert_zero(r);
1079
1080 toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
1081 toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1082 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1083 assert_zero(r);
1084
1085 time_t creation_time_d = toku_htod64(time(NULL));
1086 toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
1087 toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
1088 r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1089 assert_zero(r);
1090 }
1091 else {
1092 r = maybe_upgrade_persistent_environment_dictionary(env, txn, last_lsn_of_clean_shutdown_read_from_log);
1093 assert_zero(r);
1094 }
1095 capture_persistent_env_contents(env, txn);
1096 writing_rollback--;
1097 }
1098 {
1099 r = toku_db_create(&env->i->directory, env, 0);
1100 assert_zero(r);
1101 r = toku_db_use_builtin_key_cmp(env->i->directory);
1102 assert_zero(r);
1103 r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
1104 if (r != 0) {
1105 r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
1106 goto cleanup;
1107 }
1108 }
1109 if (using_txns) {
1110 r = locked_txn_commit(txn, 0);
1111 assert_zero(r);
1112 txn = NULL;
1113 }
1114 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1115 if (!force_recovery) {
1116 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
1117 }
1118 writing_rollback--;
1119 env_fs_poller(env); // get the file system state at startup
1120 r = env_fs_init_minicron(env);
1121 if (r != 0) {
1122 r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
1123 goto cleanup;
1124 }
1125 r = env_fsync_log_cron_init(env);
1126 if (r != 0) {
1127 r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
1128 goto cleanup;
1129 }
1130 cleanup:
1131 if (r!=0) {
1132 if (txn) {
1133 locked_txn_abort(txn);
1134 }
1135 if (env && env->i) {
1136 unlock_single_process(env);
1137 }
1138 }
1139 if (r == 0) {
1140 set_errno(0); // tabula rasa. If there's a crash after env was successfully opened, no misleading errno will have been left around by this code.
1141 most_recent_env = env;
1142 uint64_t num_rows;
1143 env_get_engine_status_num_rows(env, &num_rows);
1144 toku_assert_set_fpointers(toku_maybe_get_engine_status_text, toku_maybe_err_engine_status, toku_maybe_set_env_panic, num_rows);
1145 }
1146 return r;
1147 }
1148
1149 static int
env_close(DB_ENV * env,uint32_t flags)1150 env_close(DB_ENV * env, uint32_t flags) {
1151 int r = 0;
1152 const char * err_msg = NULL;
1153 bool clean_shutdown = true;
1154
1155 if (flags & TOKUFT_DIRTY_SHUTDOWN) {
1156 clean_shutdown = false;
1157 flags &= ~TOKUFT_DIRTY_SHUTDOWN;
1158 }
1159
1160 most_recent_env = NULL; // Set most_recent_env to NULL so that we don't have a dangling pointer (and if there's an error, the toku assert code would try to look at the env.)
1161
1162 // if panicked, or if any open transactions, or any open dbs, then do nothing.
1163
1164 if (toku_env_is_panicked(env)) {
1165 goto panic_and_quit_early;
1166 }
1167 if (env->i->logger && toku_logger_txns_exist(env->i->logger)) {
1168 err_msg = "Cannot close environment due to open transactions\n";
1169 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1170 goto panic_and_quit_early;
1171 }
1172 if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
1173 if (env->i->open_dbs_by_dname->size() > 0) {
1174 err_msg = "Cannot close environment due to open DBs\n";
1175 r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1176 goto panic_and_quit_early;
1177 }
1178 }
1179 if (env->i->persistent_environment) {
1180 r = toku_db_close(env->i->persistent_environment);
1181 if (r) {
1182 err_msg = "Cannot close persistent environment dictionary (DB->close error)\n";
1183 toku_ydb_do_error(env, r, "%s", err_msg);
1184 goto panic_and_quit_early;
1185 }
1186 }
1187 if (env->i->directory) {
1188 r = toku_db_close(env->i->directory);
1189 if (r) {
1190 err_msg = "Cannot close Directory dictionary (DB->close error)\n";
1191 toku_ydb_do_error(env, r, "%s", err_msg);
1192 goto panic_and_quit_early;
1193 }
1194 }
1195 env_fsync_log_cron_destroy(env);
1196 if (env->i->cachetable) {
1197 toku_cachetable_prepare_close(env->i->cachetable);
1198 toku_cachetable_minicron_shutdown(env->i->cachetable);
1199 if (env->i->logger) {
1200 CHECKPOINTER cp = nullptr;
1201 if (clean_shutdown) {
1202 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1203 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1204 if (r) {
1205 err_msg = "Cannot close environment (error during checkpoint)\n";
1206 toku_ydb_do_error(env, r, "%s", err_msg);
1207 goto panic_and_quit_early;
1208 }
1209 }
1210 toku_logger_close_rollback_check_empty(env->i->logger, clean_shutdown);
1211 if (clean_shutdown) {
1212 //Do a second checkpoint now that the rollback cachefile is closed.
1213 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1214 if (r) {
1215 err_msg = "Cannot close environment (error during checkpoint)\n";
1216 toku_ydb_do_error(env, r, "%s", err_msg);
1217 goto panic_and_quit_early;
1218 }
1219 toku_logger_shutdown(env->i->logger);
1220 }
1221 }
1222 toku_cachetable_close(&env->i->cachetable);
1223 }
1224 if (env->i->logger) {
1225 r = toku_logger_close(&env->i->logger);
1226 if (r) {
1227 err_msg = "Cannot close environment (logger close error)\n";
1228 env->i->logger = NULL;
1229 toku_ydb_do_error(env, r, "%s", err_msg);
1230 goto panic_and_quit_early;
1231 }
1232 }
1233 // Even if nothing else went wrong, but we were panicked, then raise an error.
1234 // But if something else went wrong then raise that error (above)
1235 if (toku_env_is_panicked(env)) {
1236 goto panic_and_quit_early;
1237 } else {
1238 assert(env->i->panic_string == 0);
1239 }
1240
1241 env_fs_destroy(env);
1242 env->i->ltm.destroy();
1243 if (env->i->data_dir)
1244 toku_free(env->i->data_dir);
1245 if (env->i->lg_dir)
1246 toku_free(env->i->lg_dir);
1247 if (env->i->tmp_dir)
1248 toku_free(env->i->tmp_dir);
1249 if (env->i->real_data_dir)
1250 toku_free(env->i->real_data_dir);
1251 if (env->i->real_log_dir)
1252 toku_free(env->i->real_log_dir);
1253 if (env->i->real_tmp_dir)
1254 toku_free(env->i->real_tmp_dir);
1255 if (env->i->open_dbs_by_dname) {
1256 env->i->open_dbs_by_dname->destroy();
1257 toku_free(env->i->open_dbs_by_dname);
1258 }
1259 if (env->i->open_dbs_by_dict_id) {
1260 env->i->open_dbs_by_dict_id->destroy();
1261 toku_free(env->i->open_dbs_by_dict_id);
1262 }
1263 if (env->i->dir)
1264 toku_free(env->i->dir);
1265 toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
1266
1267 // Immediately before freeing internal environment unlock the directories
1268 unlock_single_process(env);
1269 toku_free(env->i);
1270 toku_free(env);
1271 toku_sync_fetch_and_add(&tokuft_num_envs, -1);
1272 if (flags != 0) {
1273 r = EINVAL;
1274 }
1275 return r;
1276
1277 panic_and_quit_early:
1278 //release lock files.
1279 unlock_single_process(env);
1280 //r is the panic error
1281 if (toku_env_is_panicked(env)) {
1282 char *panic_string = env->i->panic_string;
1283 r = toku_ydb_do_error(env, toku_env_is_panicked(env), "Cannot close environment due to previous error: %s\n", panic_string);
1284 }
1285 else {
1286 env_panic(env, r, err_msg);
1287 }
1288 return r;
1289 }
1290
1291 static int
env_log_archive(DB_ENV * env,char ** list[],uint32_t flags)1292 env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
1293 return toku_logger_log_archive(env->i->logger, list, flags);
1294 }
1295
1296 static int
env_log_flush(DB_ENV * env,const DB_LSN * lsn)1297 env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
1298 HANDLE_PANICKED_ENV(env);
1299 // do nothing if no logger
1300 if (env->i->logger) {
1301 // We just flush everything. MySQL uses lsn == 0 which means flush everything.
1302 // For anyone else using the log, it is correct to flush too much, so we are OK.
1303 toku_logger_fsync(env->i->logger);
1304 }
1305 return 0;
1306 }
1307
1308 static int
env_set_cachesize(DB_ENV * env,uint32_t gbytes,uint32_t bytes,int ncache)1309 env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
1310 HANDLE_PANICKED_ENV(env);
1311 if (ncache != 1) {
1312 return EINVAL;
1313 }
1314 uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
1315 unsigned long cs = cs64;
1316 if (cs64 > cs) {
1317 return EINVAL;
1318 }
1319 env->i->cachetable_size = cs;
1320 return 0;
1321 }
1322
1323 static int
env_set_client_pool_threads(DB_ENV * env,uint32_t threads)1324 env_set_client_pool_threads(DB_ENV * env, uint32_t threads) {
1325 HANDLE_PANICKED_ENV(env);
1326 env->i->client_pool_threads = threads;
1327 return 0;
1328 }
1329
1330 static int
env_set_cachetable_pool_threads(DB_ENV * env,uint32_t threads)1331 env_set_cachetable_pool_threads(DB_ENV * env, uint32_t threads) {
1332 HANDLE_PANICKED_ENV(env);
1333 env->i->cachetable_pool_threads = threads;
1334 return 0;
1335 }
1336
1337 static int
env_set_checkpoint_pool_threads(DB_ENV * env,uint32_t threads)1338 env_set_checkpoint_pool_threads(DB_ENV * env, uint32_t threads) {
1339 HANDLE_PANICKED_ENV(env);
1340 env->i->checkpoint_pool_threads = threads;
1341 return 0;
1342 }
1343
1344 static void
env_set_check_thp(DB_ENV * env,bool new_val)1345 env_set_check_thp(DB_ENV * env, bool new_val) {
1346 assert(env);
1347 env->i->check_thp = new_val;
1348 }
1349
1350 static bool
env_get_check_thp(DB_ENV * env)1351 env_get_check_thp(DB_ENV * env) {
1352 assert(env);
1353 return env->i->check_thp;
1354 }
1355
env_set_dir_per_db(DB_ENV * env,bool new_val)1356 static bool env_set_dir_per_db(DB_ENV *env, bool new_val) {
1357 HANDLE_PANICKED_ENV(env);
1358 bool r = env->i->dir_per_db;
1359 env->i->dir_per_db = new_val;
1360 return r;
1361 }
1362
env_get_dir_per_db(DB_ENV * env)1363 static bool env_get_dir_per_db(DB_ENV *env) {
1364 HANDLE_PANICKED_ENV(env);
1365 return env->i->dir_per_db;
1366 }
1367
env_get_data_dir(DB_ENV * env)1368 static const char *env_get_data_dir(DB_ENV *env) {
1369 return env->i->real_data_dir;
1370 }
1371
env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1372 static int env_dirtool_attach(DB_ENV *env,
1373 DB_TXN *txn,
1374 const char *dname,
1375 const char *iname) {
1376 int r;
1377 DBT dname_dbt;
1378 DBT iname_dbt;
1379
1380 HANDLE_PANICKED_ENV(env);
1381 if (!env_opened(env)) {
1382 return EINVAL;
1383 }
1384 HANDLE_READ_ONLY_TXN(txn);
1385 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1386 toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
1387
1388 r = toku_db_put(env->i->directory,
1389 txn,
1390 &dname_dbt,
1391 &iname_dbt,
1392 0,
1393 true);
1394 return r;
1395 }
1396
env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1397 static int env_dirtool_detach(DB_ENV *env,
1398 DB_TXN *txn,
1399 const char *dname) {
1400 int r;
1401 DBT dname_dbt;
1402 DBT old_iname_dbt;
1403
1404 HANDLE_PANICKED_ENV(env);
1405 if (!env_opened(env)) {
1406 return EINVAL;
1407 }
1408 HANDLE_READ_ONLY_TXN(txn);
1409
1410 toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1411 toku_init_dbt_flags(&old_iname_dbt, DB_DBT_REALLOC);
1412
1413 r = toku_db_get(env->i->directory,
1414 txn,
1415 &dname_dbt,
1416 &old_iname_dbt,
1417 DB_SERIALIZABLE); // allocates memory for iname
1418 if (r == DB_NOTFOUND)
1419 return EEXIST;
1420 toku_free(old_iname_dbt.data);
1421
1422 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
1423
1424 return r;
1425 }
1426
env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1427 static int env_dirtool_move(DB_ENV *env,
1428 DB_TXN *txn,
1429 const char *old_dname,
1430 const char *new_dname) {
1431 int r;
1432 DBT old_dname_dbt;
1433 DBT new_dname_dbt;
1434 DBT iname_dbt;
1435
1436 HANDLE_PANICKED_ENV(env);
1437 if (!env_opened(env)) {
1438 return EINVAL;
1439 }
1440 HANDLE_READ_ONLY_TXN(txn);
1441
1442 toku_fill_dbt(&old_dname_dbt, old_dname, strlen(old_dname) + 1);
1443 toku_fill_dbt(&new_dname_dbt, new_dname, strlen(new_dname) + 1);
1444 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
1445
1446 r = toku_db_get(env->i->directory,
1447 txn,
1448 &old_dname_dbt,
1449 &iname_dbt,
1450 DB_SERIALIZABLE); // allocates memory for iname
1451 if (r == DB_NOTFOUND)
1452 return EEXIST;
1453
1454 r = toku_db_del(
1455 env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
1456 if (r != 0)
1457 goto exit;
1458
1459 r = toku_db_put(
1460 env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, true);
1461
1462 exit:
1463 toku_free(iname_dbt.data);
1464 return r;
1465 }
1466
locked_env_op(DB_ENV * env,DB_TXN * txn,std::function<int (DB_TXN *)> f)1467 static int locked_env_op(DB_ENV *env,
1468 DB_TXN *txn,
1469 std::function<int(DB_TXN *)> f) {
1470 int ret, r;
1471 HANDLE_READ_ONLY_TXN(txn);
1472 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1473
1474 DB_TXN *child_txn = NULL;
1475 int using_txns = env->i->open_flags & DB_INIT_TXN;
1476 if (using_txns) {
1477 ret = toku_txn_begin(env, txn, &child_txn, 0);
1478 lazy_assert_zero(ret);
1479 }
1480
1481 // cannot begin a checkpoint
1482 toku_multi_operation_client_lock();
1483 r = f(child_txn);
1484 toku_multi_operation_client_unlock();
1485
1486 if (using_txns) {
1487 if (r == 0) {
1488 ret = locked_txn_commit(child_txn, 0);
1489 lazy_assert_zero(ret);
1490 } else {
1491 ret = locked_txn_abort(child_txn);
1492 lazy_assert_zero(ret);
1493 }
1494 }
1495 return r;
1496
1497 }
1498
locked_env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1499 static int locked_env_dirtool_attach(DB_ENV *env,
1500 DB_TXN *txn,
1501 const char *dname,
1502 const char *iname) {
1503 auto f = std::bind(
1504 env_dirtool_attach, env, std::placeholders::_1, dname, iname);
1505 return locked_env_op(env, txn, f);
1506 }
1507
locked_env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1508 static int locked_env_dirtool_detach(DB_ENV *env,
1509 DB_TXN *txn,
1510 const char *dname) {
1511 auto f = std::bind(
1512 env_dirtool_detach, env, std::placeholders::_1, dname);
1513 return locked_env_op(env, txn, f);
1514 }
1515
locked_env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1516 static int locked_env_dirtool_move(DB_ENV *env,
1517 DB_TXN *txn,
1518 const char *old_dname,
1519 const char *new_dname) {
1520 auto f = std::bind(
1521 env_dirtool_move, env, std::placeholders::_1, old_dname, new_dname);
1522 return locked_env_op(env, txn, f);
1523 }
1524
1525 static int env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags);
1526
1527 static int
locked_env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)1528 locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
1529 int ret, r;
1530 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1531 HANDLE_READ_ONLY_TXN(txn);
1532
1533 DB_TXN *child_txn = NULL;
1534 int using_txns = env->i->open_flags & DB_INIT_TXN;
1535 if (using_txns) {
1536 ret = toku_txn_begin(env, txn, &child_txn, 0);
1537 lazy_assert_zero(ret);
1538 }
1539
1540 // cannot begin a checkpoint
1541 toku_multi_operation_client_lock();
1542 r = env_dbremove(env, child_txn, fname, dbname, flags);
1543 toku_multi_operation_client_unlock();
1544
1545 if (using_txns) {
1546 if (r == 0) {
1547 ret = locked_txn_commit(child_txn, 0);
1548 lazy_assert_zero(ret);
1549 } else {
1550 ret = locked_txn_abort(child_txn);
1551 lazy_assert_zero(ret);
1552 }
1553 }
1554 return r;
1555 }
1556
1557 static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags);
1558
1559 static int
locked_env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)1560 locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
1561 int ret, r;
1562 HANDLE_READ_ONLY_TXN(txn);
1563 HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1564
1565 DB_TXN *child_txn = NULL;
1566 int using_txns = env->i->open_flags & DB_INIT_TXN;
1567 if (using_txns) {
1568 ret = toku_txn_begin(env, txn, &child_txn, 0);
1569 lazy_assert_zero(ret);
1570 }
1571
1572 // cannot begin a checkpoint
1573 toku_multi_operation_client_lock();
1574 r = env_dbrename(env, child_txn, fname, dbname, newname, flags);
1575 toku_multi_operation_client_unlock();
1576
1577 if (using_txns) {
1578 if (r == 0) {
1579 ret = locked_txn_commit(child_txn, 0);
1580 lazy_assert_zero(ret);
1581 } else {
1582 ret = locked_txn_abort(child_txn);
1583 lazy_assert_zero(ret);
1584 }
1585 }
1586 return r;
1587 }
1588
1589 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
1590
1591 static int
env_get_cachesize(DB_ENV * env,uint32_t * gbytes,uint32_t * bytes,int * ncache)1592 env_get_cachesize(DB_ENV * env, uint32_t *gbytes, uint32_t *bytes, int *ncache) {
1593 HANDLE_PANICKED_ENV(env);
1594 *gbytes = env->i->cachetable_size >> 30;
1595 *bytes = env->i->cachetable_size & ((1<<30)-1);
1596 *ncache = 1;
1597 return 0;
1598 }
1599
1600 #endif
1601
1602 static int
env_set_data_dir(DB_ENV * env,const char * dir)1603 env_set_data_dir(DB_ENV * env, const char *dir) {
1604 HANDLE_PANICKED_ENV(env);
1605 int r;
1606
1607 if (env_opened(env) || !dir) {
1608 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
1609 }
1610 else if (env->i->data_dir)
1611 r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir more than once.\n");
1612 else {
1613 env->i->data_dir = toku_strdup(dir);
1614 if (env->i->data_dir==NULL) {
1615 assert(get_error_errno() == ENOMEM);
1616 r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1617 }
1618 else r = 0;
1619 }
1620 return r;
1621 }
1622
1623 static void
env_set_errcall(DB_ENV * env,toku_env_errcall_t errcall)1624 env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
1625 env->i->errcall = errcall;
1626 }
1627
1628 static void
env_set_errfile(DB_ENV * env,FILE * errfile)1629 env_set_errfile(DB_ENV*env, FILE*errfile) {
1630 env->i->errfile = errfile;
1631 }
1632
1633 static void
env_set_errpfx(DB_ENV * env,const char * errpfx)1634 env_set_errpfx(DB_ENV * env, const char *errpfx) {
1635 env->i->errpfx = errpfx;
1636 }
1637
1638 static int
env_set_flags(DB_ENV * env,uint32_t flags,int onoff)1639 env_set_flags(DB_ENV * env, uint32_t flags, int onoff) {
1640 HANDLE_PANICKED_ENV(env);
1641
1642 uint32_t change = 0;
1643 if (flags & DB_AUTO_COMMIT) {
1644 change |= DB_AUTO_COMMIT;
1645 flags &= ~DB_AUTO_COMMIT;
1646 }
1647 if (flags != 0 && onoff) {
1648 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
1649 }
1650 if (onoff) env->i->open_flags |= change;
1651 else env->i->open_flags &= ~change;
1652 return 0;
1653 }
1654
1655 static int
env_set_lg_bsize(DB_ENV * env,uint32_t bsize)1656 env_set_lg_bsize(DB_ENV * env, uint32_t bsize) {
1657 HANDLE_PANICKED_ENV(env);
1658 return toku_logger_set_lg_bsize(env->i->logger, bsize);
1659 }
1660
1661 static int
env_set_lg_dir(DB_ENV * env,const char * dir)1662 env_set_lg_dir(DB_ENV * env, const char *dir) {
1663 HANDLE_PANICKED_ENV(env);
1664 if (env_opened(env)) {
1665 return toku_ydb_do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
1666 }
1667
1668 if (env->i->lg_dir) toku_free(env->i->lg_dir);
1669 if (dir) {
1670 env->i->lg_dir = toku_strdup(dir);
1671 if (!env->i->lg_dir) {
1672 return toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1673 }
1674 }
1675 else env->i->lg_dir = NULL;
1676 return 0;
1677 }
1678
1679 static int
env_set_lg_max(DB_ENV * env,uint32_t lg_max)1680 env_set_lg_max(DB_ENV * env, uint32_t lg_max) {
1681 HANDLE_PANICKED_ENV(env);
1682 return toku_logger_set_lg_max(env->i->logger, lg_max);
1683 }
1684
1685 static int
env_get_lg_max(DB_ENV * env,uint32_t * lg_maxp)1686 env_get_lg_max(DB_ENV * env, uint32_t *lg_maxp) {
1687 HANDLE_PANICKED_ENV(env);
1688 return toku_logger_get_lg_max(env->i->logger, lg_maxp);
1689 }
1690
1691 static int
env_set_lk_detect(DB_ENV * env,uint32_t UU (detect))1692 env_set_lk_detect(DB_ENV * env, uint32_t UU(detect)) {
1693 HANDLE_PANICKED_ENV(env);
1694 return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support set_lk_detect\n");
1695 }
1696
1697 static int
env_set_lk_max_memory(DB_ENV * env,uint64_t lock_memory_limit)1698 env_set_lk_max_memory(DB_ENV *env, uint64_t lock_memory_limit) {
1699 HANDLE_PANICKED_ENV(env);
1700 int r = 0;
1701 if (env_opened(env)) {
1702 r = EINVAL;
1703 } else {
1704 r = env->i->ltm.set_max_lock_memory(lock_memory_limit);
1705 }
1706 return r;
1707 }
1708
1709 static int
env_get_lk_max_memory(DB_ENV * env,uint64_t * lk_maxp)1710 env_get_lk_max_memory(DB_ENV *env, uint64_t *lk_maxp) {
1711 HANDLE_PANICKED_ENV(env);
1712 uint32_t max_lock_memory = env->i->ltm.get_max_lock_memory();
1713 *lk_maxp = max_lock_memory;
1714 return 0;
1715 }
1716
1717 //void toku__env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
1718 // env->i->noticecall = noticecall;
1719 //}
1720
1721 static int
env_set_tmp_dir(DB_ENV * env,const char * tmp_dir)1722 env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
1723 HANDLE_PANICKED_ENV(env);
1724 if (env_opened(env)) {
1725 return toku_ydb_do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
1726 }
1727 if (!tmp_dir) {
1728 return toku_ydb_do_error(env, EINVAL, "Tmp dir bust be non-null\n");
1729 }
1730 if (env->i->tmp_dir)
1731 toku_free(env->i->tmp_dir);
1732 env->i->tmp_dir = toku_strdup(tmp_dir);
1733 return env->i->tmp_dir ? 0 : ENOMEM;
1734 }
1735
1736 static int
env_set_verbose(DB_ENV * env,uint32_t UU (which),int UU (onoff))1737 env_set_verbose(DB_ENV * env, uint32_t UU(which), int UU(onoff)) {
1738 HANDLE_PANICKED_ENV(env);
1739 return 1;
1740 }
1741
1742 static int
toku_env_txn_checkpoint(DB_ENV * env,uint32_t kbyte,uint32_t min,uint32_t flags)1743 toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte __attribute__((__unused__)), uint32_t min __attribute__((__unused__)), uint32_t flags __attribute__((__unused__))) {
1744 CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1745 int r = toku_checkpoint(cp, env->i->logger,
1746 checkpoint_callback_f, checkpoint_callback_extra,
1747 checkpoint_callback2_f, checkpoint_callback2_extra,
1748 CLIENT_CHECKPOINT);
1749 if (r) {
1750 // Panicking the whole environment may be overkill, but I'm not sure what else to do.
1751 env_panic(env, r, "checkpoint error\n");
1752 toku_ydb_do_error(env, r, "Checkpoint\n");
1753 }
1754 return r;
1755 }
1756
1757 static int
env_txn_stat(DB_ENV * env,DB_TXN_STAT ** UU (statp),uint32_t UU (flags))1758 env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), uint32_t UU(flags)) {
1759 HANDLE_PANICKED_ENV(env);
1760 return 1;
1761 }
1762
1763 //
1764 // We can assume the client calls this function right after recovery
1765 // to return a list of prepared transactions to the user. When called,
1766 // we can assume that no other work is being done in the system,
1767 // as we are in the state of being after recovery,
1768 // but before client operations should commence
1769 //
1770 static int
env_txn_xa_recover(DB_ENV * env,TOKU_XA_XID xids[],long count,long * retp,uint32_t flags)1771 env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1772 struct tokulogger_preplist *MALLOC_N(count,preps);
1773 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1774 if (r==0) {
1775 assert(*retp<=count);
1776 for (int i=0; i<*retp; i++) {
1777 xids[i] = preps[i].xid;
1778 }
1779 }
1780 toku_free(preps);
1781 return r;
1782 }
1783
1784 //
1785 // We can assume the client calls this function right after recovery
1786 // to return a list of prepared transactions to the user. When called,
1787 // we can assume that no other work is being done in the system,
1788 // as we are in the state of being after recovery,
1789 // but before client operations should commence
1790 //
1791 static int
env_txn_recover(DB_ENV * env,DB_PREPLIST preplist[],long count,long * retp,uint32_t flags)1792 env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1793 struct tokulogger_preplist *MALLOC_N(count,preps);
1794 int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1795 if (r==0) {
1796 assert(*retp<=count);
1797 for (int i=0; i<*retp; i++) {
1798 preplist[i].txn = preps[i].txn;
1799 memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
1800 }
1801 }
1802 toku_free(preps);
1803 return r;
1804 }
1805
1806 static int
env_get_txn_from_xid(DB_ENV * env,TOKU_XA_XID * xid,DB_TXN ** txnp)1807 env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
1808 return toku_txn_manager_get_root_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
1809 }
1810
1811 static int
env_checkpointing_set_period(DB_ENV * env,uint32_t seconds)1812 env_checkpointing_set_period(DB_ENV * env, uint32_t seconds) {
1813 HANDLE_PANICKED_ENV(env);
1814 int r = 0;
1815 if (!env_opened(env)) {
1816 r = EINVAL;
1817 } else {
1818 toku_set_checkpoint_period(env->i->cachetable, seconds);
1819 }
1820 return r;
1821 }
1822
1823 static int
env_cleaner_set_period(DB_ENV * env,uint32_t seconds)1824 env_cleaner_set_period(DB_ENV * env, uint32_t seconds) {
1825 HANDLE_PANICKED_ENV(env);
1826 int r = 0;
1827 if (!env_opened(env)) {
1828 r = EINVAL;
1829 } else {
1830 toku_set_cleaner_period(env->i->cachetable, seconds);
1831 }
1832 return r;
1833 }
1834
1835 static int
env_cleaner_set_iterations(DB_ENV * env,uint32_t iterations)1836 env_cleaner_set_iterations(DB_ENV * env, uint32_t iterations) {
1837 HANDLE_PANICKED_ENV(env);
1838 int r = 0;
1839 if (!env_opened(env)) {
1840 r = EINVAL;
1841 } else {
1842 toku_set_cleaner_iterations(env->i->cachetable, iterations);
1843 }
1844 return r;
1845 }
1846
1847 static int
env_create_loader(DB_ENV * env,DB_TXN * txn,DB_LOADER ** blp,DB * src_db,int N,DB * dbs[],uint32_t db_flags[],uint32_t dbt_flags[],uint32_t loader_flags)1848 env_create_loader(DB_ENV *env,
1849 DB_TXN *txn,
1850 DB_LOADER **blp,
1851 DB *src_db,
1852 int N,
1853 DB *dbs[],
1854 uint32_t db_flags[/*N*/],
1855 uint32_t dbt_flags[/*N*/],
1856 uint32_t loader_flags) {
1857 int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
1858 return r;
1859 }
1860
1861 static int
env_checkpointing_get_period(DB_ENV * env,uint32_t * seconds)1862 env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
1863 HANDLE_PANICKED_ENV(env);
1864 int r = 0;
1865 if (!env_opened(env)) r = EINVAL;
1866 else
1867 *seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
1868 return r;
1869 }
1870
1871 static int
env_cleaner_get_period(DB_ENV * env,uint32_t * seconds)1872 env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
1873 HANDLE_PANICKED_ENV(env);
1874 int r = 0;
1875 if (!env_opened(env)) r = EINVAL;
1876 else
1877 *seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
1878 return r;
1879 }
1880
1881 static int
env_cleaner_get_iterations(DB_ENV * env,uint32_t * iterations)1882 env_cleaner_get_iterations(DB_ENV * env, uint32_t *iterations) {
1883 HANDLE_PANICKED_ENV(env);
1884 int r = 0;
1885 if (!env_opened(env)) r = EINVAL;
1886 else
1887 *iterations = toku_get_cleaner_iterations(env->i->cachetable);
1888 return r;
1889 }
1890
1891 static int
env_evictor_set_enable_partial_eviction(DB_ENV * env,bool enabled)1892 env_evictor_set_enable_partial_eviction(DB_ENV* env, bool enabled) {
1893 HANDLE_PANICKED_ENV(env);
1894 int r = 0;
1895 if (!env_opened(env)) r = EINVAL;
1896 else toku_set_enable_partial_eviction(env->i->cachetable, enabled);
1897 return r;
1898 }
1899
1900 static int
env_evictor_get_enable_partial_eviction(DB_ENV * env,bool * enabled)1901 env_evictor_get_enable_partial_eviction(DB_ENV* env, bool *enabled) {
1902 HANDLE_PANICKED_ENV(env);
1903 int r = 0;
1904 if (!env_opened(env)) r = EINVAL;
1905 else *enabled = toku_get_enable_partial_eviction(env->i->cachetable);
1906 return r;
1907 }
1908
1909 static int
env_checkpointing_postpone(DB_ENV * env)1910 env_checkpointing_postpone(DB_ENV * env) {
1911 HANDLE_PANICKED_ENV(env);
1912 int r = 0;
1913 if (!env_opened(env)) r = EINVAL;
1914 else toku_checkpoint_safe_client_lock();
1915 return r;
1916 }
1917
1918 static int
env_checkpointing_resume(DB_ENV * env)1919 env_checkpointing_resume(DB_ENV * env) {
1920 HANDLE_PANICKED_ENV(env);
1921 int r = 0;
1922 if (!env_opened(env)) r = EINVAL;
1923 else toku_checkpoint_safe_client_unlock();
1924 return r;
1925 }
1926
1927 static int
env_checkpointing_begin_atomic_operation(DB_ENV * env)1928 env_checkpointing_begin_atomic_operation(DB_ENV * env) {
1929 HANDLE_PANICKED_ENV(env);
1930 int r = 0;
1931 if (!env_opened(env)) r = EINVAL;
1932 else toku_multi_operation_client_lock();
1933 return r;
1934 }
1935
1936 static int
env_checkpointing_end_atomic_operation(DB_ENV * env)1937 env_checkpointing_end_atomic_operation(DB_ENV * env) {
1938 HANDLE_PANICKED_ENV(env);
1939 int r = 0;
1940 if (!env_opened(env)) r = EINVAL;
1941 else toku_multi_operation_client_unlock();
1942 return r;
1943 }
1944
1945 static int
env_set_default_bt_compare(DB_ENV * env,int (* bt_compare)(DB *,const DBT *,const DBT *))1946 env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1947 HANDLE_PANICKED_ENV(env);
1948 int r = 0;
1949 if (env_opened(env)) r = EINVAL;
1950 else {
1951 env->i->bt_compare = bt_compare;
1952 }
1953 return r;
1954 }
1955
1956 static void
env_set_update(DB_ENV * env,int (* update_function)(DB *,const DBT * key,const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra))1957 env_set_update (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
1958 env->i->update_function = update_function;
1959 }
1960
1961 static int
env_set_generate_row_callback_for_put(DB_ENV * env,generate_row_for_put_func generate_row_for_put)1962 env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
1963 HANDLE_PANICKED_ENV(env);
1964 int r = 0;
1965 if (env_opened(env)) r = EINVAL;
1966 else {
1967 env->i->generate_row_for_put = generate_row_for_put;
1968 }
1969 return r;
1970 }
1971
1972 static int
env_set_generate_row_callback_for_del(DB_ENV * env,generate_row_for_del_func generate_row_for_del)1973 env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
1974 HANDLE_PANICKED_ENV(env);
1975 int r = 0;
1976 if (env_opened(env)) r = EINVAL;
1977 else {
1978 env->i->generate_row_for_del = generate_row_for_del;
1979 }
1980 return r;
1981 }
1982 static int
env_set_redzone(DB_ENV * env,int redzone)1983 env_set_redzone(DB_ENV *env, int redzone) {
1984 HANDLE_PANICKED_ENV(env);
1985 int r;
1986 if (env_opened(env))
1987 r = EINVAL;
1988 else {
1989 env->i->redzone = redzone;
1990 r = 0;
1991 }
1992 return r;
1993 }
1994
env_get_lock_timeout(DB_ENV * env,uint64_t * lock_timeout_msec)1995 static int env_get_lock_timeout(DB_ENV *env, uint64_t *lock_timeout_msec) {
1996 uint64_t t = env->i->default_lock_timeout_msec;
1997 if (env->i->get_lock_timeout_callback)
1998 t = env->i->get_lock_timeout_callback(t);
1999 *lock_timeout_msec = t;
2000 return 0;
2001 }
2002
env_set_lock_timeout(DB_ENV * env,uint64_t default_lock_timeout_msec,uint64_t (* get_lock_timeout_callback)(uint64_t default_lock_timeout_msec))2003 static int env_set_lock_timeout(DB_ENV *env, uint64_t default_lock_timeout_msec, uint64_t (*get_lock_timeout_callback)(uint64_t default_lock_timeout_msec)) {
2004 env->i->default_lock_timeout_msec = default_lock_timeout_msec;
2005 env->i->get_lock_timeout_callback = get_lock_timeout_callback;
2006 return 0;
2007 }
2008
2009 static int
env_set_lock_timeout_callback(DB_ENV * env,lock_timeout_callback callback)2010 env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
2011 env->i->lock_wait_timeout_callback = callback;
2012 return 0;
2013 }
2014
2015 static void
format_time(const time_t * timer,char * buf)2016 format_time(const time_t *timer, char *buf) {
2017 ctime_r(timer, buf);
2018 size_t len = strlen(buf);
2019 assert(len < 26);
2020 char end;
2021
2022 assert(len>=1);
2023 end = buf[len-1];
2024 while (end == '\n' || end == '\r') {
2025 buf[len-1] = '\0';
2026 len--;
2027 assert(len>=1);
2028 end = buf[len-1];
2029 }
2030 }
2031
2032 ////////////////////////////////////////////////////////////////////////////////////////////////
2033 // Local definition of status information from portability layer, which should not include db.h.
2034 // Local status structs are used to concentrate file system information collected from various places
2035 // and memory information collected from memory.c.
2036 //
2037 typedef enum {
2038 FS_ENOSPC_REDZONE_STATE = 0, // possible values are enumerated by fs_redzone_state
2039 FS_ENOSPC_THREADS_BLOCKED, // how many threads currently blocked on ENOSPC
2040 FS_ENOSPC_REDZONE_CTR, // number of operations rejected by enospc prevention (red zone)
2041 FS_ENOSPC_MOST_RECENT, // most recent time that file system was completely full
2042 FS_ENOSPC_COUNT, // total number of times ENOSPC was returned from an attempt to write
2043 FS_FSYNC_TIME,
2044 FS_FSYNC_COUNT,
2045 FS_LONG_FSYNC_TIME,
2046 FS_LONG_FSYNC_COUNT,
2047 FS_STATUS_NUM_ROWS, // must be last
2048 } fs_status_entry;
2049
2050 typedef struct {
2051 bool initialized;
2052 TOKU_ENGINE_STATUS_ROW_S status[FS_STATUS_NUM_ROWS];
2053 } FS_STATUS_S, *FS_STATUS;
2054
2055 static FS_STATUS_S fsstat;
2056
2057 #define FS_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(fsstat, k, c, t, "filesystem: " l, inc)
2058
2059 static void
fs_status_init(void)2060 fs_status_init(void) {
2061 FS_STATUS_INIT(FS_ENOSPC_REDZONE_STATE, nullptr, FS_STATE, "ENOSPC redzone state", TOKU_ENGINE_STATUS);
2062 FS_STATUS_INIT(FS_ENOSPC_THREADS_BLOCKED, FILESYSTEM_THREADS_BLOCKED_BY_FULL_DISK, UINT64, "threads currently blocked by full disk", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2063 FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR, nullptr, UINT64, "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
2064 FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT, nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
2065 FS_STATUS_INIT(FS_ENOSPC_COUNT, nullptr, UINT64, "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
2066 FS_STATUS_INIT(FS_FSYNC_TIME, FILESYSTEM_FSYNC_TIME, UINT64, "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2067 FS_STATUS_INIT(FS_FSYNC_COUNT, FILESYSTEM_FSYNC_NUM, UINT64, "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2068 FS_STATUS_INIT(FS_LONG_FSYNC_TIME, FILESYSTEM_LONG_FSYNC_TIME, UINT64, "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2069 FS_STATUS_INIT(FS_LONG_FSYNC_COUNT, FILESYSTEM_LONG_FSYNC_NUM, UINT64, "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2070 fsstat.initialized = true;
2071 }
2072 #undef FS_STATUS_INIT
2073
2074 #define FS_STATUS_VALUE(x) fsstat.status[x].value.num
2075
2076 static void
fs_get_status(DB_ENV * env,fs_redzone_state * redzone_state)2077 fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
2078 if (!fsstat.initialized)
2079 fs_status_init();
2080
2081 time_t enospc_most_recent_timestamp;
2082 uint64_t enospc_threads_blocked, enospc_total;
2083 toku_fs_get_write_info(&enospc_most_recent_timestamp, &enospc_threads_blocked, &enospc_total);
2084 if (enospc_threads_blocked)
2085 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = FS_BLOCKED;
2086 else
2087 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = env->i->fs_state;
2088 *redzone_state = (fs_redzone_state) FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE);
2089 FS_STATUS_VALUE(FS_ENOSPC_THREADS_BLOCKED) = enospc_threads_blocked;
2090 FS_STATUS_VALUE(FS_ENOSPC_REDZONE_CTR) = env->i->enospc_redzone_ctr;
2091 FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
2092 FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
2093
2094 uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
2095 toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
2096 FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
2097 FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
2098 FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
2099 FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
2100 }
2101 #undef FS_STATUS_VALUE
2102
2103 // Local status struct used to get information from memory.c
2104 typedef enum {
2105 MEMORY_MALLOC_COUNT = 0,
2106 MEMORY_FREE_COUNT,
2107 MEMORY_REALLOC_COUNT,
2108 MEMORY_MALLOC_FAIL,
2109 MEMORY_REALLOC_FAIL,
2110 MEMORY_REQUESTED,
2111 MEMORY_USED,
2112 MEMORY_FREED,
2113 MEMORY_MAX_REQUESTED_SIZE,
2114 MEMORY_LAST_FAILED_SIZE,
2115 MEMORY_MAX_IN_USE,
2116 MEMORY_MALLOCATOR_VERSION,
2117 MEMORY_MMAP_THRESHOLD,
2118 MEMORY_STATUS_NUM_ROWS
2119 } memory_status_entry;
2120
2121 typedef struct {
2122 bool initialized;
2123 TOKU_ENGINE_STATUS_ROW_S status[MEMORY_STATUS_NUM_ROWS];
2124 } MEMORY_STATUS_S, *MEMORY_STATUS;
2125
2126 static MEMORY_STATUS_S memory_status;
2127
2128 #define STATUS_INIT(k,c,t,l) TOKUFT_STATUS_INIT(memory_status, k, c, t, "memory: " l, TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS)
2129
2130 static void
memory_status_init(void)2131 memory_status_init(void) {
2132 // Note, this function initializes the keyname, type, and legend fields.
2133 // Value fields are initialized to zero by compiler.
2134 STATUS_INIT(MEMORY_MALLOC_COUNT, MEMORY_MALLOC_COUNT, UINT64, "number of malloc operations");
2135 STATUS_INIT(MEMORY_FREE_COUNT, MEMORY_FREE_COUNT, UINT64, "number of free operations");
2136 STATUS_INIT(MEMORY_REALLOC_COUNT, MEMORY_REALLOC_COUNT, UINT64, "number of realloc operations");
2137 STATUS_INIT(MEMORY_MALLOC_FAIL, MEMORY_MALLOC_FAIL, UINT64, "number of malloc operations that failed");
2138 STATUS_INIT(MEMORY_REALLOC_FAIL, MEMORY_REALLOC_FAIL, UINT64, "number of realloc operations that failed" );
2139 STATUS_INIT(MEMORY_REQUESTED, MEMORY_REQUESTED, UINT64, "number of bytes requested");
2140 STATUS_INIT(MEMORY_USED, MEMORY_USED, UINT64, "number of bytes used (requested + overhead)");
2141 STATUS_INIT(MEMORY_FREED, MEMORY_FREED, UINT64, "number of bytes freed");
2142 STATUS_INIT(MEMORY_MAX_REQUESTED_SIZE, MEMORY_MAX_REQUESTED_SIZE, UINT64, "largest attempted allocation size");
2143 STATUS_INIT(MEMORY_LAST_FAILED_SIZE, MEMORY_LAST_FAILED_SIZE, UINT64, "size of the last failed allocation attempt");
2144 STATUS_INIT(MEMORY_MAX_IN_USE, MEM_ESTIMATED_MAXIMUM_MEMORY_FOOTPRINT, UINT64, "estimated maximum memory footprint");
2145 STATUS_INIT(MEMORY_MALLOCATOR_VERSION, MEMORY_MALLOCATOR_VERSION, CHARSTR, "mallocator version");
2146 STATUS_INIT(MEMORY_MMAP_THRESHOLD, MEMORY_MMAP_THRESHOLD, UINT64, "mmap threshold");
2147 memory_status.initialized = true;
2148 }
2149 #undef STATUS_INIT
2150
2151 #define MEMORY_STATUS_VALUE(x) memory_status.status[x].value.num
2152
2153 static void
memory_get_status(void)2154 memory_get_status(void) {
2155 if (!memory_status.initialized)
2156 memory_status_init();
2157 LOCAL_MEMORY_STATUS_S local_memstat;
2158 toku_memory_get_status(&local_memstat);
2159 MEMORY_STATUS_VALUE(MEMORY_MALLOC_COUNT) = local_memstat.malloc_count;
2160 MEMORY_STATUS_VALUE(MEMORY_FREE_COUNT) = local_memstat.free_count;
2161 MEMORY_STATUS_VALUE(MEMORY_REALLOC_COUNT) = local_memstat.realloc_count;
2162 MEMORY_STATUS_VALUE(MEMORY_MALLOC_FAIL) = local_memstat.malloc_fail;
2163 MEMORY_STATUS_VALUE(MEMORY_REALLOC_FAIL) = local_memstat.realloc_fail;
2164 MEMORY_STATUS_VALUE(MEMORY_REQUESTED) = local_memstat.requested;
2165 MEMORY_STATUS_VALUE(MEMORY_USED) = local_memstat.used;
2166 MEMORY_STATUS_VALUE(MEMORY_FREED) = local_memstat.freed;
2167 MEMORY_STATUS_VALUE(MEMORY_MAX_IN_USE) = local_memstat.max_in_use;
2168 MEMORY_STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = local_memstat.mmap_threshold;
2169 memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str = local_memstat.mallocator_version;
2170 }
2171 #undef MEMORY_STATUS_VALUE
2172
2173 // how many rows are in engine status?
2174 static int
env_get_engine_status_num_rows(DB_ENV * UU (env),uint64_t * num_rowsp)2175 env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp) {
2176 uint64_t num_rows = 0;
2177 num_rows += YDB_LAYER_STATUS_NUM_ROWS;
2178 num_rows += YDB_C_LAYER_STATUS_NUM_ROWS;
2179 num_rows += YDB_WRITE_LAYER_STATUS_NUM_ROWS;
2180 num_rows += LE_STATUS_S::LE_STATUS_NUM_ROWS;
2181 num_rows += CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS;
2182 num_rows += CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS;
2183 num_rows += LTM_STATUS_S::LTM_STATUS_NUM_ROWS;
2184 num_rows += FT_STATUS_S::FT_STATUS_NUM_ROWS;
2185 num_rows += FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS;
2186 num_rows += FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS;
2187 num_rows += TXN_STATUS_S::TXN_STATUS_NUM_ROWS;
2188 num_rows += LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS;
2189 num_rows += MEMORY_STATUS_NUM_ROWS;
2190 num_rows += FS_STATUS_NUM_ROWS;
2191 num_rows += INDEXER_STATUS_NUM_ROWS;
2192 num_rows += LOADER_STATUS_NUM_ROWS;
2193 num_rows += CTX_STATUS_NUM_ROWS;
2194 #if 0
2195 // enable when upgrade is supported
2196 num_rows += FT_UPGRADE_STATUS_NUM_ROWS;
2197 num_rows += PERSISTENT_UPGRADE_STATUS_NUM_ROWS;
2198 #endif
2199 *num_rowsp = num_rows;
2200 return 0;
2201 }
2202
2203 // Do not take ydb lock or any other lock around or in this function.
2204 // If the engine is blocked because some thread is holding a lock, this function
2205 // can help diagnose the problem.
2206 // This function only collects information, and it does not matter if something gets garbled
2207 // because of a race condition.
2208 // Note, engine status is still collected even if the environment or logger is panicked
2209 static int
env_get_engine_status(DB_ENV * env,TOKU_ENGINE_STATUS_ROW engstat,uint64_t maxrows,uint64_t * num_rows,fs_redzone_state * redzone_state,uint64_t * env_panicp,char * env_panic_string_buf,int env_panic_string_length,toku_engine_status_include_type include_flags)2210 env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t maxrows, uint64_t *num_rows, fs_redzone_state* redzone_state, uint64_t * env_panicp, char * env_panic_string_buf, int env_panic_string_length, toku_engine_status_include_type include_flags) {
2211 int r;
2212
2213 if (env_panic_string_buf) {
2214 if (env && env->i && env->i->is_panicked && env->i->panic_string) {
2215 strncpy(env_panic_string_buf, env->i->panic_string, env_panic_string_length);
2216 env_panic_string_buf[env_panic_string_length - 1] = '\0'; // just in case
2217 }
2218 else
2219 *env_panic_string_buf = '\0';
2220 }
2221
2222 if ( !(env) ||
2223 !(env->i) ||
2224 !(env_opened(env)) ||
2225 !num_rows ||
2226 !include_flags)
2227 r = EINVAL;
2228 else {
2229 r = 0;
2230 uint64_t row = 0; // which row to fill next
2231 *env_panicp = env->i->is_panicked;
2232
2233 {
2234 YDB_LAYER_STATUS_S ydb_stat;
2235 ydb_layer_get_status(env, &ydb_stat);
2236 for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2237 if (ydb_stat.status[i].include & include_flags) {
2238 engstat[row++] = ydb_stat.status[i];
2239 }
2240 }
2241 }
2242 {
2243 YDB_C_LAYER_STATUS_S ydb_c_stat;
2244 ydb_c_layer_get_status(&ydb_c_stat);
2245 for (int i = 0; i < YDB_C_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2246 if (ydb_c_stat.status[i].include & include_flags) {
2247 engstat[row++] = ydb_c_stat.status[i];
2248 }
2249 }
2250 }
2251 {
2252 YDB_WRITE_LAYER_STATUS_S ydb_write_stat;
2253 ydb_write_layer_get_status(&ydb_write_stat);
2254 for (int i = 0; i < YDB_WRITE_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2255 if (ydb_write_stat.status[i].include & include_flags) {
2256 engstat[row++] = ydb_write_stat.status[i];
2257 }
2258 }
2259 }
2260 {
2261 LE_STATUS_S lestat; // Rice's vampire
2262 toku_le_get_status(&lestat);
2263 for (int i = 0; i < LE_STATUS_S::LE_STATUS_NUM_ROWS && row < maxrows; i++) {
2264 if (lestat.status[i].include & include_flags) {
2265 engstat[row++] = lestat.status[i];
2266 }
2267 }
2268 }
2269 {
2270 CHECKPOINT_STATUS_S cpstat;
2271 toku_checkpoint_get_status(env->i->cachetable, &cpstat);
2272 for (int i = 0; i < CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS && row < maxrows; i++) {
2273 if (cpstat.status[i].include & include_flags) {
2274 engstat[row++] = cpstat.status[i];
2275 }
2276 }
2277 }
2278 {
2279 CACHETABLE_STATUS_S ctstat;
2280 toku_cachetable_get_status(env->i->cachetable, &ctstat);
2281 for (int i = 0; i < CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS && row < maxrows; i++) {
2282 if (ctstat.status[i].include & include_flags) {
2283 engstat[row++] = ctstat.status[i];
2284 }
2285 }
2286 }
2287 {
2288 LTM_STATUS_S ltmstat;
2289 env->i->ltm.get_status(<mstat);
2290 for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS && row < maxrows; i++) {
2291 if (ltmstat.status[i].include & include_flags) {
2292 engstat[row++] = ltmstat.status[i];
2293 }
2294 }
2295 }
2296 {
2297 FT_STATUS_S ftstat;
2298 toku_ft_get_status(&ftstat);
2299 for (int i = 0; i < FT_STATUS_S::FT_STATUS_NUM_ROWS && row < maxrows; i++) {
2300 if (ftstat.status[i].include & include_flags) {
2301 engstat[row++] = ftstat.status[i];
2302 }
2303 }
2304 }
2305 {
2306 FT_FLUSHER_STATUS_S flusherstat;
2307 toku_ft_flusher_get_status(&flusherstat);
2308 for (int i = 0; i < FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS && row < maxrows; i++) {
2309 if (flusherstat.status[i].include & include_flags) {
2310 engstat[row++] = flusherstat.status[i];
2311 }
2312 }
2313 }
2314 {
2315 FT_HOT_STATUS_S hotstat;
2316 toku_ft_hot_get_status(&hotstat);
2317 for (int i = 0; i < FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS && row < maxrows; i++) {
2318 if (hotstat.status[i].include & include_flags) {
2319 engstat[row++] = hotstat.status[i];
2320 }
2321 }
2322 }
2323 {
2324 TXN_STATUS_S txnstat;
2325 toku_txn_get_status(&txnstat);
2326 for (int i = 0; i < TXN_STATUS_S::TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
2327 if (txnstat.status[i].include & include_flags) {
2328 engstat[row++] = txnstat.status[i];
2329 }
2330 }
2331 }
2332 {
2333 LOGGER_STATUS_S loggerstat;
2334 toku_logger_get_status(env->i->logger, &loggerstat);
2335 for (int i = 0; i < LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS && row < maxrows; i++) {
2336 if (loggerstat.status[i].include & include_flags) {
2337 engstat[row++] = loggerstat.status[i];
2338 }
2339 }
2340 }
2341
2342 {
2343 INDEXER_STATUS_S indexerstat;
2344 toku_indexer_get_status(&indexerstat);
2345 for (int i = 0; i < INDEXER_STATUS_NUM_ROWS && row < maxrows; i++) {
2346 if (indexerstat.status[i].include & include_flags) {
2347 engstat[row++] = indexerstat.status[i];
2348 }
2349 }
2350 }
2351 {
2352 LOADER_STATUS_S loaderstat;
2353 toku_loader_get_status(&loaderstat);
2354 for (int i = 0; i < LOADER_STATUS_NUM_ROWS && row < maxrows; i++) {
2355 if (loaderstat.status[i].include & include_flags) {
2356 engstat[row++] = loaderstat.status[i];
2357 }
2358 }
2359 }
2360
2361 {
2362 // memory_status is local to this file
2363 memory_get_status();
2364 for (int i = 0; i < MEMORY_STATUS_NUM_ROWS && row < maxrows; i++) {
2365 if (memory_status.status[i].include & include_flags) {
2366 engstat[row++] = memory_status.status[i];
2367 }
2368 }
2369 }
2370 {
2371 // Note, fs_get_status() and the fsstat structure are local to this file because they
2372 // are used to concentrate file system information collected from various places.
2373 fs_get_status(env, redzone_state);
2374 for (int i = 0; i < FS_STATUS_NUM_ROWS && row < maxrows; i++) {
2375 if (fsstat.status[i].include & include_flags) {
2376 engstat[row++] = fsstat.status[i];
2377 }
2378 }
2379 }
2380 {
2381 struct context_status ctxstatus;
2382 toku_context_get_status(&ctxstatus);
2383 for (int i = 0; i < CTX_STATUS_NUM_ROWS && row < maxrows; i++) {
2384 if (ctxstatus.status[i].include & include_flags) {
2385 engstat[row++] = ctxstatus.status[i];
2386 }
2387 }
2388 }
2389 #if 0
2390 // enable when upgrade is supported
2391 {
2392 for (int i = 0; i < PERSISTENT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2393 if (persistent_upgrade_status.status[i].include & include_flags) {
2394 engstat[row++] = persistent_upgrade_status.status[i];
2395 }
2396 }
2397 FT_UPGRADE_STATUS_S ft_upgradestat;
2398 toku_ft_upgrade_get_status(&ft_upgradestat);
2399 for (int i = 0; i < FT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2400 if (ft_upgradestat.status[i].include & include_flags) {
2401 engstat[row++] = ft_upgradestat.status[i];
2402 }
2403 }
2404
2405 }
2406 #endif
2407 if (r==0) {
2408 *num_rows = row;
2409 }
2410 }
2411 return r;
2412 }
2413
2414 // Fill buff with text description of engine status up to bufsiz bytes.
2415 // Intended for use by test programs that do not have the handlerton available,
2416 // and for use by toku_assert logic to print diagnostic info on crash.
2417 static int
env_get_engine_status_text(DB_ENV * env,char * buff,int bufsiz)2418 env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
2419 uint32_t stringsize = 1024;
2420 uint64_t panic;
2421 char panicstring[stringsize];
2422 int n = 0; // number of characters printed so far
2423 uint64_t num_rows;
2424 uint64_t max_rows;
2425 fs_redzone_state redzone_state;
2426
2427 n = snprintf(buff, bufsiz - n, "BUILD_ID = %d\n", BUILD_ID);
2428
2429 (void) env_get_engine_status_num_rows (env, &max_rows);
2430 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2431 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2432
2433 if (r) {
2434 n += snprintf(buff + n, bufsiz - n, "Engine status not available: ");
2435 if (!env) {
2436 n += snprintf(buff + n, bufsiz - n, "no environment\n");
2437 }
2438 else if (!(env->i)) {
2439 n += snprintf(buff + n, bufsiz - n, "environment internal struct is null\n");
2440 }
2441 else if (!env_opened(env)) {
2442 n += snprintf(buff + n, bufsiz - n, "environment is not open\n");
2443 }
2444 }
2445 else {
2446 if (panic) {
2447 n += snprintf(buff + n, bufsiz - n, "Env panic code: %" PRIu64 "\n", panic);
2448 if (strlen(panicstring)) {
2449 invariant(strlen(panicstring) <= stringsize);
2450 n += snprintf(buff + n, bufsiz - n, "Env panic string: %s\n", panicstring);
2451 }
2452 }
2453
2454 for (uint64_t row = 0; row < num_rows; row++) {
2455 n += snprintf(buff + n, bufsiz - n, "%s: ", mystat[row].legend);
2456 switch (mystat[row].type) {
2457 case FS_STATE:
2458 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2459 break;
2460 case UINT64:
2461 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2462 break;
2463 case CHARSTR:
2464 n += snprintf(buff + n, bufsiz - n, "%s\n", mystat[row].value.str);
2465 break;
2466 case UNIXTIME:
2467 {
2468 char tbuf[26];
2469 format_time((time_t*)&mystat[row].value.num, tbuf);
2470 n += snprintf(buff + n, bufsiz - n, "%s\n", tbuf);
2471 }
2472 break;
2473 case TOKUTIME:
2474 {
2475 double t = tokutime_to_seconds(mystat[row].value.num);
2476 n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
2477 }
2478 break;
2479 case PARCOUNT:
2480 {
2481 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2482 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
2483 }
2484 break;
2485 default:
2486 n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
2487 break;
2488 }
2489 }
2490 }
2491
2492 if (n > bufsiz) {
2493 const char * errmsg = "BUFFER TOO SMALL\n";
2494 int len = strlen(errmsg) + 1;
2495 (void) snprintf(buff + (bufsiz - 1) - len, len, "%s", errmsg);
2496 }
2497
2498 return r;
2499 }
2500
2501 // prints engine status using toku_env_err line-by-line
2502 static int
env_err_engine_status(DB_ENV * env)2503 env_err_engine_status(DB_ENV * env) {
2504 uint32_t stringsize = 1024;
2505 uint64_t panic;
2506 char panicstring[stringsize];
2507 uint64_t num_rows;
2508 uint64_t max_rows;
2509 fs_redzone_state redzone_state;
2510
2511 toku_env_err(env, 0, "BUILD_ID = %d", BUILD_ID);
2512
2513 (void) env_get_engine_status_num_rows (env, &max_rows);
2514 TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2515 int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2516
2517 if (r) {
2518 toku_env_err(env, 0, "Engine status not available: ");
2519 if (!env) {
2520 toku_env_err(env, 0, "no environment");
2521 }
2522 else if (!(env->i)) {
2523 toku_env_err(env, 0, "environment internal struct is null");
2524 }
2525 else if (!env_opened(env)) {
2526 toku_env_err(env, 0, "environment is not open");
2527 }
2528 }
2529 else {
2530 if (panic) {
2531 toku_env_err(env, 0, "Env panic code: %" PRIu64, panic);
2532 if (strlen(panicstring)) {
2533 invariant(strlen(panicstring) <= stringsize);
2534 toku_env_err(env, 0, "Env panic string: %s", panicstring);
2535 }
2536 }
2537
2538 for (uint64_t row = 0; row < num_rows; row++) {
2539 switch (mystat[row].type) {
2540 case FS_STATE:
2541 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2542 break;
2543 case UINT64:
2544 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2545 break;
2546 case CHARSTR:
2547 toku_env_err(env, 0, "%s: %s", mystat[row].legend, mystat[row].value.str);
2548 break;
2549 case UNIXTIME:
2550 {
2551 char tbuf[26];
2552 format_time((time_t*)&mystat[row].value.num, tbuf);
2553 toku_env_err(env, 0, "%s: %s", mystat[row].legend, tbuf);
2554 }
2555 break;
2556 case TOKUTIME:
2557 {
2558 double t = tokutime_to_seconds(mystat[row].value.num);
2559 toku_env_err(env, 0, "%s: %.6f", mystat[row].legend, t);
2560 }
2561 break;
2562 case PARCOUNT:
2563 {
2564 uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2565 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, v);
2566 }
2567 break;
2568 default:
2569 toku_env_err(env, 0, "%s: UNKNOWN STATUS TYPE: %d", mystat[row].legend, mystat[row].type);
2570 break;
2571 }
2572 }
2573 }
2574
2575 return r;
2576 }
2577
2578 // intended for use by toku_assert logic, when env is not known
2579 static int
toku_maybe_get_engine_status_text(char * buff,int buffsize)2580 toku_maybe_get_engine_status_text (char * buff, int buffsize) {
2581 DB_ENV * env = most_recent_env;
2582 int r;
2583 if (engine_status_enable && env != NULL) {
2584 r = env_get_engine_status_text(env, buff, buffsize);
2585 }
2586 else {
2587 r = EOPNOTSUPP;
2588 snprintf(buff, buffsize, "Engine status not available: disabled by user. This should only happen in test programs.\n");
2589 }
2590 return r;
2591 }
2592
2593 static int
toku_maybe_err_engine_status(void)2594 toku_maybe_err_engine_status (void) {
2595 DB_ENV * env = most_recent_env;
2596 int r;
2597 if (engine_status_enable && env != NULL) {
2598 r = env_err_engine_status(env);
2599 }
2600 else {
2601 r = EOPNOTSUPP;
2602 }
2603 return r;
2604 }
2605
2606 // Set panic code and panic string if not already panicked,
2607 // intended for use by toku_assert when about to abort().
2608 static void
toku_maybe_set_env_panic(int code,const char * msg)2609 toku_maybe_set_env_panic(int code, const char * msg) {
2610 if (code == 0)
2611 code = -1;
2612 if (msg == NULL)
2613 msg = "Unknown cause from abort (failed assert)\n";
2614 env_is_panicked = code; // disable library destructor no matter what
2615 DB_ENV * env = most_recent_env;
2616 if (env &&
2617 env->i &&
2618 (env->i->is_panicked == 0)) {
2619 env_panic(env, code, msg);
2620 }
2621 }
2622
2623 // handlerton's call to fractal tree layer on failed assert in handlerton
2624 static int
env_crash(DB_ENV * UU (db_env),const char * msg,const char * fun,const char * file,int line,int caller_errno)2625 env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* file, int line, int caller_errno) {
2626 toku_do_assert_fail(msg, fun, file, line, caller_errno);
2627 return -1; // placate compiler
2628 }
2629
2630 static int
env_get_cursor_for_persistent_environment(DB_ENV * env,DB_TXN * txn,DBC ** c)2631 env_get_cursor_for_persistent_environment(DB_ENV* env, DB_TXN* txn, DBC** c) {
2632 if (!env_opened(env)) {
2633 return EINVAL;
2634 }
2635 return toku_db_cursor(env->i->persistent_environment, txn, c, 0);
2636 }
2637
2638 static int
env_get_cursor_for_directory(DB_ENV * env,DB_TXN * txn,DBC ** c)2639 env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
2640 if (!env_opened(env)) {
2641 return EINVAL;
2642 }
2643 return toku_db_cursor(env->i->directory, txn, c, 0);
2644 }
2645
2646 static DB *
env_get_db_for_directory(DB_ENV * env)2647 env_get_db_for_directory(DB_ENV* env) {
2648 if (!env_opened(env)) {
2649 return NULL;
2650 }
2651 return env->i->directory;
2652 }
2653
2654 struct ltm_iterate_requests_callback_extra {
ltm_iterate_requests_callback_extraltm_iterate_requests_callback_extra2655 ltm_iterate_requests_callback_extra(DB_ENV *e,
2656 iterate_requests_callback cb,
2657 void *ex) :
2658 env(e), callback(cb), extra(ex) {
2659 }
2660 DB_ENV *env;
2661 iterate_requests_callback callback;
2662 void *extra;
2663 };
2664
2665 static int
find_db_by_dict_id(DB * const & db,const DICTIONARY_ID & dict_id_find)2666 find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
2667 DICTIONARY_ID dict_id = db->i->dict_id;
2668 if (dict_id.dictid < dict_id_find.dictid) {
2669 return -1;
2670 } else if (dict_id.dictid > dict_id_find.dictid) {
2671 return 1;
2672 } else {
2673 return 0;
2674 }
2675 }
2676
2677 static DB *
locked_get_db_by_dict_id(DB_ENV * env,DICTIONARY_ID dict_id)2678 locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
2679 DB *db;
2680 int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
2681 return r == 0 ? db : nullptr;
2682 }
2683
ltm_iterate_requests_callback(DICTIONARY_ID dict_id,TXNID txnid,const DBT * left_key,const DBT * right_key,TXNID blocking_txnid,uint64_t start_time,void * extra)2684 static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
2685 const DBT *left_key,
2686 const DBT *right_key,
2687 TXNID blocking_txnid,
2688 uint64_t start_time,
2689 void *extra) {
2690 ltm_iterate_requests_callback_extra *info =
2691 reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
2692
2693 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2694 int r = 0;
2695 DB *db = locked_get_db_by_dict_id(info->env, dict_id);
2696 if (db != nullptr) {
2697 r = info->callback(db, txnid, left_key, right_key,
2698 blocking_txnid, start_time, info->extra);
2699 }
2700 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2701 return r;
2702 }
2703
2704 static int
env_iterate_pending_lock_requests(DB_ENV * env,iterate_requests_callback callback,void * extra)2705 env_iterate_pending_lock_requests(DB_ENV *env,
2706 iterate_requests_callback callback,
2707 void *extra) {
2708 if (!env_opened(env)) {
2709 return EINVAL;
2710 }
2711
2712 toku::locktree_manager *mgr = &env->i->ltm;
2713 ltm_iterate_requests_callback_extra e(env, callback, extra);
2714 return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
2715 }
2716
2717 // for the lifetime of this object:
2718 // - open_dbs_rwlock must be read locked (or better)
2719 // - txn_mutex must be held
2720 struct iter_txn_row_locks_callback_extra {
iter_txn_row_locks_callback_extraiter_txn_row_locks_callback_extra2721 iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
2722 env(e), current_db(nullptr), which_lt(0), lt_map(m) {
2723 if (lt_map->size() > 0) {
2724 set_iterator_and_current_db();
2725 }
2726 }
2727
set_iterator_and_current_dbiter_txn_row_locks_callback_extra2728 void set_iterator_and_current_db() {
2729 txn_lt_key_ranges ranges;
2730 const int r = lt_map->fetch(which_lt, &ranges);
2731 invariant_zero(r);
2732 current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
2733 iter = toku::range_buffer::iterator(ranges.buffer);
2734 }
2735
2736 DB_ENV *env;
2737 DB *current_db;
2738 size_t which_lt;
2739 toku::omt<txn_lt_key_ranges> *lt_map;
2740 toku::range_buffer::iterator iter;
2741 toku::range_buffer::iterator::record rec;
2742 };
2743
iter_txn_row_locks_callback(DB ** db,DBT * left_key,DBT * right_key,void * extra)2744 static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
2745 iter_txn_row_locks_callback_extra *info =
2746 reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
2747
2748 while (info->which_lt < info->lt_map->size()) {
2749 const bool more = info->iter.current(&info->rec);
2750 if (more) {
2751 *db = info->current_db;
2752 // The caller should interpret data/size == 0 to mean infinity.
2753 // Therefore, when we copyref pos/neg infinity into left/right_key,
2754 // the caller knows what we're talking about.
2755 toku_copyref_dbt(left_key, *info->rec.get_left_key());
2756 toku_copyref_dbt(right_key, *info->rec.get_right_key());
2757 info->iter.next();
2758 return 0;
2759 } else {
2760 info->which_lt++;
2761 if (info->which_lt < info->lt_map->size()) {
2762 info->set_iterator_and_current_db();
2763 }
2764 }
2765 }
2766 return DB_NOTFOUND;
2767 }
2768
2769 struct iter_txns_callback_extra {
iter_txns_callback_extraiter_txns_callback_extra2770 iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
2771 env(e), callback(cb), extra(ex) {
2772 }
2773 DB_ENV *env;
2774 iterate_transactions_callback callback;
2775 void *extra;
2776 };
2777
iter_txns_callback(TOKUTXN txn,void * extra)2778 static int iter_txns_callback(TOKUTXN txn, void *extra) {
2779 int r = 0;
2780 iter_txns_callback_extra *info =
2781 reinterpret_cast<iter_txns_callback_extra *>(extra);
2782 DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
2783 invariant_notnull(dbtxn);
2784 struct __toku_db_txn_internal *db_txn_internal __attribute__((__unused__)) = db_txn_struct_i(dbtxn);
2785 TOKU_VALGRIND_HG_DISABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2786 if (db_txn_struct_i(dbtxn)->tokutxn == txn) { // make sure that the dbtxn is fully initialized
2787 toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
2788 toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2789
2790 iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
2791 r = info->callback(dbtxn, iter_txn_row_locks_callback, &e, info->extra);
2792
2793 toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2794 toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
2795 }
2796 TOKU_VALGRIND_HG_ENABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2797
2798 return r;
2799 }
2800
2801 static int
env_iterate_live_transactions(DB_ENV * env,iterate_transactions_callback callback,void * extra)2802 env_iterate_live_transactions(DB_ENV *env,
2803 iterate_transactions_callback callback,
2804 void *extra) {
2805 if (!env_opened(env)) {
2806 return EINVAL;
2807 }
2808
2809 TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
2810 iter_txns_callback_extra e(env, callback, extra);
2811 return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
2812 }
2813
env_set_loader_memory_size(DB_ENV * env,uint64_t (* get_loader_memory_size_callback)(void))2814 static void env_set_loader_memory_size(DB_ENV *env, uint64_t (*get_loader_memory_size_callback)(void)) {
2815 env->i->get_loader_memory_size_callback = get_loader_memory_size_callback;
2816 }
2817
env_get_loader_memory_size(DB_ENV * env)2818 static uint64_t env_get_loader_memory_size(DB_ENV *env) {
2819 uint64_t memory_size = 0;
2820 if (env->i->get_loader_memory_size_callback)
2821 memory_size = env->i->get_loader_memory_size_callback();
2822 return memory_size;
2823 }
2824
env_set_killed_callback(DB_ENV * env,uint64_t default_killed_time_msec,uint64_t (* get_killed_time_callback)(uint64_t default_killed_time_msec),int (* killed_callback)(void))2825 static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_msec, uint64_t (*get_killed_time_callback)(uint64_t default_killed_time_msec), int (*killed_callback)(void)) {
2826 env->i->default_killed_time_msec = default_killed_time_msec;
2827 env->i->get_killed_time_callback = get_killed_time_callback;
2828 env->i->killed_callback = killed_callback;
2829 }
2830
env_kill_waiter(DB_ENV * env,void * extra)2831 static void env_kill_waiter(DB_ENV *env, void *extra) {
2832 env->i->ltm.kill_waiter(extra);
2833 }
2834
env_do_backtrace(DB_ENV * env)2835 static void env_do_backtrace(DB_ENV *env) {
2836 if (env->i->errcall) {
2837 db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
2838 }
2839 if (env->i->errfile) {
2840 db_env_do_backtrace((FILE *) env->i->errfile);
2841 } else {
2842 db_env_do_backtrace(stderr);
2843 }
2844 }
2845
2846 static int
toku_env_create(DB_ENV ** envp,uint32_t flags)2847 toku_env_create(DB_ENV ** envp, uint32_t flags) {
2848 int r = ENOSYS;
2849 DB_ENV* result = NULL;
2850
2851 if (flags!=0) { r = EINVAL; goto cleanup; }
2852 MALLOC(result);
2853 if (result == 0) { r = ENOMEM; goto cleanup; }
2854 memset(result, 0, sizeof *result);
2855
2856 // locked methods
2857 result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_env_err;
2858 #define SENV(name) result->name = locked_env_ ## name
2859 SENV(dbremove);
2860 SENV(dbrename);
2861 SENV(dirtool_attach);
2862 SENV(dirtool_detach);
2863 SENV(dirtool_move);
2864 //SENV(set_noticecall);
2865 #undef SENV
2866 #define USENV(name) result->name = env_ ## name
2867 // methods with locking done internally
2868 USENV(put_multiple);
2869 USENV(del_multiple);
2870 USENV(update_multiple);
2871 // unlocked methods
2872 USENV(open);
2873 USENV(close);
2874 USENV(set_default_bt_compare);
2875 USENV(set_update);
2876 USENV(set_generate_row_callback_for_put);
2877 USENV(set_generate_row_callback_for_del);
2878 USENV(set_lg_bsize);
2879 USENV(set_lg_dir);
2880 USENV(set_lg_max);
2881 USENV(get_lg_max);
2882 USENV(set_lk_max_memory);
2883 USENV(get_lk_max_memory);
2884 USENV(get_iname);
2885 USENV(set_errcall);
2886 USENV(set_errfile);
2887 USENV(set_errpfx);
2888 USENV(set_data_dir);
2889 USENV(checkpointing_set_period);
2890 USENV(checkpointing_get_period);
2891 USENV(cleaner_set_period);
2892 USENV(cleaner_get_period);
2893 USENV(cleaner_set_iterations);
2894 USENV(cleaner_get_iterations);
2895 USENV(evictor_set_enable_partial_eviction);
2896 USENV(evictor_get_enable_partial_eviction);
2897 USENV(set_cachesize);
2898 USENV(set_client_pool_threads);
2899 USENV(set_cachetable_pool_threads);
2900 USENV(set_checkpoint_pool_threads);
2901 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
2902 USENV(get_cachesize);
2903 #endif
2904 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
2905 USENV(set_lk_max);
2906 #endif
2907 USENV(set_lk_detect);
2908 USENV(set_flags);
2909 USENV(set_tmp_dir);
2910 USENV(set_verbose);
2911 USENV(txn_recover);
2912 USENV(txn_xa_recover);
2913 USENV(get_txn_from_xid);
2914 USENV(txn_stat);
2915 USENV(get_lock_timeout);
2916 USENV(set_lock_timeout);
2917 USENV(set_lock_timeout_callback);
2918 USENV(set_redzone);
2919 USENV(log_flush);
2920 USENV(log_archive);
2921 USENV(create_loader);
2922 USENV(get_cursor_for_persistent_environment);
2923 USENV(get_cursor_for_directory);
2924 USENV(get_db_for_directory);
2925 USENV(iterate_pending_lock_requests);
2926 USENV(iterate_live_transactions);
2927 USENV(change_fsync_log_period);
2928 USENV(set_loader_memory_size);
2929 USENV(get_loader_memory_size);
2930 USENV(set_killed_callback);
2931 USENV(do_backtrace);
2932 USENV(set_check_thp);
2933 USENV(get_check_thp);
2934 USENV(set_dir_per_db);
2935 USENV(get_dir_per_db);
2936 USENV(get_data_dir);
2937 USENV(kill_waiter);
2938 #undef USENV
2939
2940 // unlocked methods
2941 result->create_indexer = toku_indexer_create_indexer;
2942 result->txn_checkpoint = toku_env_txn_checkpoint;
2943 result->checkpointing_postpone = env_checkpointing_postpone;
2944 result->checkpointing_resume = env_checkpointing_resume;
2945 result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
2946 result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
2947 result->get_engine_status_num_rows = env_get_engine_status_num_rows;
2948 result->get_engine_status = env_get_engine_status;
2949 result->get_engine_status_text = env_get_engine_status_text;
2950 result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert
2951 result->txn_begin = toku_txn_begin;
2952
2953 MALLOC(result->i);
2954 if (result->i == 0) { r = ENOMEM; goto cleanup; }
2955 memset(result->i, 0, sizeof *result->i);
2956 result->i->envdir_lockfd = -1;
2957 result->i->datadir_lockfd = -1;
2958 result->i->logdir_lockfd = -1;
2959 result->i->tmpdir_lockfd = -1;
2960 env_fs_init(result);
2961 env_fsync_log_init(result);
2962
2963 result->i->check_thp = true;
2964
2965 result->i->bt_compare = toku_builtin_compare_fun;
2966
2967 r = toku_logger_create(&result->i->logger);
2968 invariant_zero(r);
2969 invariant_notnull(result->i->logger);
2970
2971 // Create the locktree manager, passing in the create/destroy/escalate callbacks.
2972 // The extra parameter for escalation is simply a pointer to this environment.
2973 // The escalate callback will need it to translate txnids to DB_TXNs
2974 result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
2975
2976 XMALLOC(result->i->open_dbs_by_dname);
2977 result->i->open_dbs_by_dname->create();
2978 XMALLOC(result->i->open_dbs_by_dict_id);
2979 result->i->open_dbs_by_dict_id->create();
2980 toku_pthread_rwlock_init(
2981 *result_i_open_dbs_rwlock_key, &result->i->open_dbs_rwlock, nullptr);
2982
2983 *envp = result;
2984 r = 0;
2985 toku_sync_fetch_and_add(&tokuft_num_envs, 1);
2986 cleanup:
2987 if (r!=0) {
2988 if (result) {
2989 toku_free(result->i);
2990 toku_free(result);
2991 }
2992 }
2993 return r;
2994 }
2995
2996 int
DB_ENV_CREATE_FUN(DB_ENV ** envp,uint32_t flags)2997 DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
2998 int r = toku_env_create(envp, flags);
2999 return r;
3000 }
3001
3002 // return 0 if v and dbv refer to same db (including same dname)
3003 // return <0 if v is earlier in omt than dbv
3004 // return >0 if v is later in omt than dbv
3005 static int
find_db_by_db_dname(DB * const & db,DB * const & dbfind)3006 find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
3007 int cmp;
3008 const char *dname = db->i->dname;
3009 const char *dnamefind = dbfind->i->dname;
3010 cmp = strcmp(dname, dnamefind);
3011 if (cmp != 0) return cmp;
3012 if (db < dbfind) return -1;
3013 if (db > dbfind) return 1;
3014 return 0;
3015 }
3016
3017 static int
find_db_by_db_dict_id(DB * const & db,DB * const & dbfind)3018 find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
3019 DICTIONARY_ID dict_id = db->i->dict_id;
3020 DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
3021 if (dict_id.dictid < dict_id_find.dictid) {
3022 return -1;
3023 } else if (dict_id.dictid > dict_id_find.dictid) {
3024 return 1;
3025 } else if (db < dbfind) {
3026 return -1;
3027 } else if (db > dbfind) {
3028 return 1;
3029 } else {
3030 return 0;
3031 }
3032 }
3033
3034 // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
3035 void
env_note_db_opened(DB_ENV * env,DB * db)3036 env_note_db_opened(DB_ENV *env, DB *db) {
3037 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3038 assert(db->i->dname); // internal (non-user) dictionary has no dname
3039
3040 int r;
3041 uint32_t idx;
3042
3043 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3044 assert(r == DB_NOTFOUND);
3045 r = env->i->open_dbs_by_dname->insert_at(db, idx);
3046 assert_zero(r);
3047 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3048 assert(r == DB_NOTFOUND);
3049 r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
3050 assert_zero(r);
3051
3052 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3053 STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
3054 if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
3055 STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
3056 }
3057 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3058 }
3059
3060 // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API. The DB may still be in use by the fractal tree internals.
3061 void
env_note_db_closed(DB_ENV * env,DB * db)3062 env_note_db_closed(DB_ENV *env, DB *db) {
3063 toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3064 assert(db->i->dname); // internal (non-user) dictionary has no dname
3065 assert(env->i->open_dbs_by_dname->size() > 0);
3066 assert(env->i->open_dbs_by_dict_id->size() > 0);
3067
3068 int r;
3069 uint32_t idx;
3070
3071 r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3072 assert_zero(r);
3073 r = env->i->open_dbs_by_dname->delete_at(idx);
3074 assert_zero(r);
3075 r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3076 assert_zero(r);
3077 r = env->i->open_dbs_by_dict_id->delete_at(idx);
3078 assert_zero(r);
3079
3080 STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
3081 STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3082 toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3083 }
3084
3085 static int
find_open_db_by_dname(DB * const & db,const char * const & dnamefind)3086 find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
3087 return strcmp(db->i->dname, dnamefind);
3088 }
3089
3090 // return true if there is any db open with the given dname
3091 static bool
env_is_db_with_dname_open(DB_ENV * env,const char * dname)3092 env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
3093 DB *db;
3094 toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
3095 int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
3096 if (r == 0) {
3097 invariant(strcmp(dname, db->i->dname) == 0);
3098 } else {
3099 invariant(r == DB_NOTFOUND);
3100 }
3101 toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
3102 return r == 0 ? true : false;
3103 }
3104
3105 //We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
3106 static int
env_dbremove_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,int32_t flags)3107 env_dbremove_subdb(DB_ENV * env, DB_TXN * txn, const char *fname, const char *dbname, int32_t flags) {
3108 int r;
3109 if (!fname || !dbname) r = EINVAL;
3110 else {
3111 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3112 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3113 assert(bytes==(int)sizeof(subdb_full_name)-1);
3114 const char *null_subdbname = NULL;
3115 r = env_dbremove(env, txn, subdb_full_name, null_subdbname, flags);
3116 }
3117 return r;
3118 }
3119
3120 // see if we can acquire a table lock for the given dname.
3121 // requires: write lock on dname in the directory. dictionary
3122 // open, close, and begin checkpoint cannot occur.
3123 // returns: zero if we could open, lock, and close a dictionary
3124 // with the given dname, errno otherwise.
3125 static int
can_acquire_table_lock(DB_ENV * env,DB_TXN * txn,const char * iname_in_env)3126 can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
3127 int r;
3128 DB *db;
3129
3130 r = toku_db_create(&db, env, 0);
3131 assert_zero(r);
3132 r = toku_db_open_iname(db, txn, iname_in_env, 0, 0);
3133 if(r) {
3134 if (r == ENAMETOOLONG)
3135 toku_ydb_do_error(env, r, "File name too long!\n");
3136 goto exit;
3137 }
3138 r = toku_db_pre_acquire_table_lock(db, txn);
3139 if (r) {
3140 r = DB_LOCK_NOTGRANTED;
3141 }
3142 exit:
3143 if(db) {
3144 int r2 = toku_db_close(db);
3145 assert_zero(r2);
3146 }
3147 return r;
3148 }
3149
3150 static int
env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)3151 env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
3152 int r;
3153 HANDLE_PANICKED_ENV(env);
3154 if (!env_opened(env) || flags != 0) {
3155 return EINVAL;
3156 }
3157 HANDLE_READ_ONLY_TXN(txn);
3158 if (dbname != NULL) {
3159 // env_dbremove_subdb() converts (fname, dbname) to dname
3160 return env_dbremove_subdb(env, txn, fname, dbname, flags);
3161 }
3162
3163 const char * dname = fname;
3164 assert(dbname == NULL);
3165
3166 // We check for an open db here as a "fast path" to error.
3167 // We'll need to check again below to be sure.
3168 if (env_is_db_with_dname_open(env, dname)) {
3169 return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3170 }
3171
3172 DBT dname_dbt;
3173 DBT iname_dbt;
3174 toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
3175 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3176
3177 // get iname
3178 r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3179 char *iname = (char *) iname_dbt.data;
3180 DB *db = NULL;
3181 if (r != 0) {
3182 if (r == DB_NOTFOUND) {
3183 r = ENOENT;
3184 }
3185 goto exit;
3186 }
3187 // remove (dname,iname) from directory
3188 r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
3189 if (r != 0) {
3190 goto exit;
3191 }
3192 r = toku_db_create(&db, env, 0);
3193 lazy_assert_zero(r);
3194 r = toku_db_open_iname(db, txn, iname, 0, 0);
3195 if (txn && r) {
3196 if (r == EMFILE || r == ENFILE)
3197 r = toku_ydb_do_error(env, r, "toku dbremove failed because open file limit reached\n");
3198 else if (r != ENOENT)
3199 r = toku_ydb_do_error(env, r, "toku dbremove failed\n");
3200 else
3201 r = 0;
3202 goto exit;
3203 }
3204 if (txn) {
3205 // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
3206 if (env_is_db_with_dname_open(env, dname)) {
3207 r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3208 goto exit;
3209 }
3210 // we know a live db handle does not exist.
3211 //
3212 // use the internally opened db to try and get a table lock
3213 //
3214 // if we can't get it, then some txn needs the ft and we
3215 // should return lock not granted.
3216 //
3217 // otherwise, we're okay in marking this ft as remove on
3218 // commit. no new handles can open for this dictionary
3219 // because the txn has directory write locks on the dname
3220 r = toku_db_pre_acquire_table_lock(db, txn);
3221 if (r != 0) {
3222 r = DB_LOCK_NOTGRANTED;
3223 goto exit;
3224 }
3225 // The ft will be unlinked when the txn commits
3226 toku_ft_unlink_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
3227 }
3228 else {
3229 // unlink the ft without a txn
3230 toku_ft_unlink(db->i->ft_handle);
3231 }
3232
3233 exit:
3234 if (db) {
3235 int ret = toku_db_close(db);
3236 assert(ret == 0);
3237 }
3238 if (iname) {
3239 toku_free(iname);
3240 }
3241 return r;
3242 }
3243
3244 static int
env_dbrename_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3245 env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3246 int r;
3247 if (!fname || !dbname || !newname) r = EINVAL;
3248 else {
3249 char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3250 {
3251 int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3252 assert(bytes==(int)sizeof(subdb_full_name)-1);
3253 }
3254 char new_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3255 {
3256 int bytes = snprintf(new_full_name, sizeof(new_full_name), "%s/%s", fname, dbname);
3257 assert(bytes==(int)sizeof(new_full_name)-1);
3258 }
3259 const char *null_subdbname = NULL;
3260 r = env_dbrename(env, txn, subdb_full_name, null_subdbname, new_full_name, flags);
3261 }
3262 return r;
3263 }
3264
3265 static int
env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3266 env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3267 int r;
3268 HANDLE_PANICKED_ENV(env);
3269 if (!env_opened(env) || flags != 0) {
3270 return EINVAL;
3271 }
3272 HANDLE_READ_ONLY_TXN(txn);
3273 if (dbname != NULL) {
3274 // env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
3275 return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
3276 }
3277
3278 const char * dname = fname;
3279 assert(dbname == NULL);
3280
3281 // We check for open dnames for the old and new name as a "fast path" to error.
3282 // We will need to check these again later.
3283 if (env_is_db_with_dname_open(env, dname)) {
3284 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3285 }
3286 if (env_is_db_with_dname_open(env, newname)) {
3287 return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3288 }
3289
3290 DBT old_dname_dbt;
3291 DBT new_dname_dbt;
3292 DBT iname_dbt;
3293 toku_fill_dbt(&old_dname_dbt, dname, strlen(dname)+1);
3294 toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
3295 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3296
3297 // get iname
3298 r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3299 char *iname = (char *) iname_dbt.data;
3300 if (r == DB_NOTFOUND) {
3301 r = ENOENT;
3302 } else if (r == 0) {
3303 // verify that newname does not already exist
3304 r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
3305 if (r == 0) {
3306 r = EEXIST;
3307 }
3308 else if (r == DB_NOTFOUND) {
3309 DBT new_iname_dbt;
3310 // Do not rename ft file if 'dir_per_db' option is not set
3311 auto new_iname =
3312 env->get_dir_per_db(env)
3313 ? generate_iname_for_rename_or_open(
3314 env, txn, newname, false)
3315 : std::unique_ptr<char[], decltype(&toku_free)>(
3316 toku_strdup(iname), &toku_free);
3317 toku_fill_dbt(
3318 &new_iname_dbt, new_iname.get(), strlen(new_iname.get()) + 1);
3319
3320 // remove old (dname,iname) and insert (newname,iname) in directory
3321 r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
3322 if (r != 0) { goto exit; }
3323
3324 // Do not rename ft file if 'dir_per_db' option is not set
3325 if (env->get_dir_per_db(env))
3326 r = toku_ft_rename_iname(txn,
3327 env->get_data_dir(env),
3328 iname,
3329 new_iname.get(),
3330 env->i->cachetable);
3331
3332 r = toku_db_put(env->i->directory,
3333 txn,
3334 &new_dname_dbt,
3335 &new_iname_dbt,
3336 0,
3337 true);
3338 if (r != 0) { goto exit; }
3339
3340 //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
3341 if (env_is_db_with_dname_open(env, dname)) {
3342 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3343 goto exit;
3344 }
3345 if (env_is_db_with_dname_open(env, newname)) {
3346 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3347 goto exit;
3348 }
3349
3350 // we know a live db handle does not exist.
3351 //
3352 // use the internally opened db to try and get a table lock
3353 //
3354 // if we can't get it, then some txn needs the ft and we
3355 // should return lock not granted.
3356 //
3357 // otherwise, we're okay in marking this ft as remove on
3358 // commit. no new handles can open for this dictionary
3359 // because the txn has directory write locks on the dname
3360 if (txn) {
3361 r = can_acquire_table_lock(env, txn, new_iname.get());
3362 }
3363 // We don't do anything at the ft or cachetable layer for rename.
3364 // We just update entries in the environment's directory.
3365 }
3366 }
3367
3368 exit:
3369 if (iname) {
3370 toku_free(iname);
3371 }
3372 return r;
3373 }
3374
3375 int
DB_CREATE_FUN(DB ** db,DB_ENV * env,uint32_t flags)3376 DB_CREATE_FUN (DB ** db, DB_ENV * env, uint32_t flags) {
3377 int r = toku_db_create(db, env, flags);
3378 return r;
3379 }
3380
3381 /* need db_strerror_r for multiple threads */
3382
3383 const char *
db_strerror(int error)3384 db_strerror(int error) {
3385 char *errorstr;
3386 if (error >= 0) {
3387 errorstr = strerror(error);
3388 if (errorstr)
3389 return errorstr;
3390 }
3391
3392 switch (error) {
3393 case DB_BADFORMAT:
3394 return "Database Bad Format (probably a corrupted database)";
3395 case DB_NOTFOUND:
3396 return "Not found";
3397 case TOKUDB_OUT_OF_LOCKS:
3398 return "Out of locks";
3399 case TOKUDB_DICTIONARY_TOO_OLD:
3400 return "Dictionary too old for this version of PerconaFT";
3401 case TOKUDB_DICTIONARY_TOO_NEW:
3402 return "Dictionary too new for this version of PerconaFT";
3403 case TOKUDB_CANCELED:
3404 return "User cancelled operation";
3405 case TOKUDB_NO_DATA:
3406 return "Ran out of data (not EOF)";
3407 case TOKUDB_HUGE_PAGES_ENABLED:
3408 return "Transparent huge pages are enabled but PerconaFT's memory allocator will oversubscribe main memory with transparent huge pages. This check can be disabled by setting the environment variable TOKU_HUGE_PAGES_OK.";
3409 }
3410
3411 static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
3412 errorstr = unknown_result;
3413 snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
3414 return errorstr;
3415 }
3416
3417 const char *
db_version(int * major,int * minor,int * patch)3418 db_version(int *major, int *minor, int *patch) {
3419 if (major)
3420 *major = DB_VERSION_MAJOR;
3421 if (minor)
3422 *minor = DB_VERSION_MINOR;
3423 if (patch)
3424 *patch = DB_VERSION_PATCH;
3425 return toku_product_name_strings.db_version;
3426 }
3427
3428 // HACK: To ensure toku_pthread_yield gets included in the .so
3429 // non-static would require a prototype in a header
3430 // static (since unused) would give a warning
3431 // static + unused would not actually help toku_pthread_yield get in the .so
3432 // static + used avoids all the warnings and makes sure toku_pthread_yield is in the .so
3433 static void __attribute__((__used__))
include_toku_pthread_yield(void)3434 include_toku_pthread_yield (void) {
3435 toku_pthread_yield();
3436 }
3437
3438 // For test purposes only, translate dname to iname
3439 // YDB lock is NOT held when this function is called,
3440 // as it is called by user
3441 static int
env_get_iname(DB_ENV * env,DBT * dname_dbt,DBT * iname_dbt)3442 env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
3443 DB *directory = env->i->directory;
3444 int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_SERIALIZABLE|DB_PRELOCKED); // allocates memory for iname
3445 return r;
3446 }
3447
3448 // TODO 2216: Patch out this (dangerous) function when loader is working and
3449 // we don't need to test the low-level redirect anymore.
3450 // for use by test programs only, just a wrapper around ft call:
3451 int
toku_test_db_redirect_dictionary(DB * db,const char * dname_of_new_file,DB_TXN * dbtxn)3452 toku_test_db_redirect_dictionary(DB * db, const char * dname_of_new_file, DB_TXN *dbtxn) {
3453 int r;
3454 DBT dname_dbt;
3455 DBT iname_dbt;
3456 char * new_iname_in_env;
3457
3458 FT_HANDLE ft_handle = db->i->ft_handle;
3459 TOKUTXN tokutxn = db_txn_struct_i(dbtxn)->tokutxn;
3460
3461 toku_fill_dbt(&dname_dbt, dname_of_new_file, strlen(dname_of_new_file)+1);
3462 toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3463 r = toku_db_get(db->dbenv->i->directory, dbtxn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE); // allocates memory for iname
3464 assert_zero(r);
3465 new_iname_in_env = (char *) iname_dbt.data;
3466
3467 toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
3468 r = toku_dictionary_redirect(new_iname_in_env, ft_handle, tokutxn);
3469 toku_multi_operation_client_unlock();
3470
3471 toku_free(new_iname_in_env);
3472 return r;
3473 }
3474
3475 //Tets only function
3476 uint64_t
toku_test_get_latest_lsn(DB_ENV * env)3477 toku_test_get_latest_lsn(DB_ENV *env) {
3478 LSN rval = ZERO_LSN;
3479 if (env && env->i->logger) {
3480 rval = toku_logger_last_lsn(env->i->logger);
3481 }
3482 return rval.lsn;
3483 }
3484
toku_set_test_txn_sync_callback(void (* cb)(pthread_t,void *),void * extra)3485 void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
3486 set_test_txn_sync_callback(cb, extra);
3487 }
3488
3489 int
toku_test_get_checkpointing_user_data_status(void)3490 toku_test_get_checkpointing_user_data_status (void) {
3491 return toku_cachetable_get_checkpointing_user_data_status();
3492 }
3493
3494 #undef STATUS_VALUE
3495 #undef PERSISTENT_UPGRADE_STATUS_VALUE
3496
3497 #include <toku_race_tools.h>
3498 void __attribute__((constructor)) toku_ydb_helgrind_ignore(void);
3499 void
toku_ydb_helgrind_ignore(void)3500 toku_ydb_helgrind_ignore(void) {
3501 TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_layer_status, sizeof ydb_layer_status);
3502 }
3503