1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 extern const char *toku_patent_string;
40 const char *toku_copyright_string = "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.";
41 
42 #include <my_global.h>
43 
44 extern int writing_rollback;
45 
46 #include <db.h>
47 #include <errno.h>
48 #include <string.h>
49 
50 #include "portability/memory.h"
51 #include "portability/toku_assert.h"
52 #include "portability/toku_portability.h"
53 #include "portability/toku_pthread.h"
54 #include "portability/toku_stdlib.h"
55 
56 #include "ft/ft-flusher.h"
57 #include "ft/cachetable/cachetable.h"
58 #include "ft/cachetable/checkpoint.h"
59 #include "ft/logger/log.h"
60 #include "ft/loader/loader.h"
61 #include "ft/log_header.h"
62 #include "ft/ft.h"
63 #include "ft/txn/txn_manager.h"
64 #include "src/ydb.h"
65 #include "src/ydb-internal.h"
66 #include "src/ydb_cursor.h"
67 #include "src/ydb_row_lock.h"
68 #include "src/ydb_env_func.h"
69 #include "src/ydb_db.h"
70 #include "src/ydb_write.h"
71 #include "src/ydb_txn.h"
72 #include "src/loader.h"
73 #include "src/indexer.h"
74 #include "util/status.h"
75 #include "util/context.h"
76 
77 #include <functional>
78 
79 // Include ydb_lib.cc here so that its constructor/destructor gets put into
80 // ydb.o, to make sure they don't get erased at link time (when linking to
81 // a static libtokufractaltree.a that was compiled with gcc).  See #5094.
82 #include "ydb_lib.cc"
83 
84 #ifdef TOKUTRACE
85  #define DB_ENV_CREATE_FUN db_env_create_toku10
86  #define DB_CREATE_FUN db_create_toku10
87 #else
88  #define DB_ENV_CREATE_FUN db_env_create
89  #define DB_CREATE_FUN db_create
toku_set_trace_file(const char * fname)90  int toku_set_trace_file (const char *fname __attribute__((__unused__))) { return 0; }
toku_close_trace_file(void)91  int toku_close_trace_file (void) { return 0; }
92 #endif
93 
94 extern uint force_recovery;
95 
96 // Set when env is panicked, never cleared.
97 static int env_is_panicked = 0;
98 
99 void
env_panic(DB_ENV * env,int cause,const char * msg)100 env_panic(DB_ENV * env, int cause, const char * msg) {
101     if (cause == 0)
102         cause = -1;  // if unknown cause, at least guarantee panic
103     if (msg == NULL)
104         msg = "Unknown cause in env_panic\n";
105     env_is_panicked = cause;
106     env->i->is_panicked = cause;
107     env->i->panic_string = toku_strdup(msg);
108 }
109 
110 static int env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp);
111 
112 /********************************************************************************
113  * Status is intended for display to humans to help understand system behavior.
114  * It does not need to be perfectly thread-safe.
115  */
116 
117 typedef enum {
118     YDB_LAYER_TIME_CREATION = 0,            /* timestamp of environment creation, read from persistent environment */
119     YDB_LAYER_TIME_STARTUP,                 /* timestamp of system startup */
120     YDB_LAYER_TIME_NOW,                     /* timestamp of engine status query */
121     YDB_LAYER_NUM_DB_OPEN,
122     YDB_LAYER_NUM_DB_CLOSE,
123     YDB_LAYER_NUM_OPEN_DBS,
124     YDB_LAYER_MAX_OPEN_DBS,
125     YDB_LAYER_FSYNC_LOG_PERIOD,
126 #if 0
127     YDB_LAYER_ORIGINAL_ENV_VERSION,         /* version of original environment, read from persistent environment */
128     YDB_LAYER_STARTUP_ENV_VERSION,          /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
129     YDB_LAYER_LAST_LSN_OF_V13,              /* read from persistent environment */
130     YDB_LAYER_UPGRADE_V14_TIME,             /* timestamp of upgrade to version 14, read from persistent environment */
131     YDB_LAYER_UPGRADE_V14_FOOTPRINT,        /* footprint of upgrade to version 14, read from persistent environment */
132 #endif
133     YDB_LAYER_STATUS_NUM_ROWS              /* number of rows in this status array */
134 } ydb_layer_status_entry;
135 
136 typedef struct {
137     bool initialized;
138     TOKU_ENGINE_STATUS_ROW_S status[YDB_LAYER_STATUS_NUM_ROWS];
139 } YDB_LAYER_STATUS_S, *YDB_LAYER_STATUS;
140 
141 static YDB_LAYER_STATUS_S ydb_layer_status;
142 #define STATUS_VALUE(x) ydb_layer_status.status[x].value.num
143 
144 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_layer_status, k, c, t, l, inc)
145 
146 static void
ydb_layer_status_init(void)147 ydb_layer_status_init (void) {
148     // Note, this function initializes the keyname, type, and legend fields.
149     // Value fields are initialized to zero by compiler.
150 
151     STATUS_INIT(YDB_LAYER_TIME_CREATION,              nullptr, UNIXTIME, "time of environment creation", TOKU_ENGINE_STATUS);
152     STATUS_INIT(YDB_LAYER_TIME_STARTUP,               nullptr, UNIXTIME, "time of engine startup", TOKU_ENGINE_STATUS);
153     STATUS_INIT(YDB_LAYER_TIME_NOW,                   nullptr, UNIXTIME, "time now", TOKU_ENGINE_STATUS);
154     STATUS_INIT(YDB_LAYER_NUM_DB_OPEN,                DB_OPENS, UINT64,   "db opens", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
155     STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE,               DB_CLOSES, UINT64,   "db closes", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
156     STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS,               DB_OPEN_CURRENT, UINT64,   "num open dbs now", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
157     STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS,               DB_OPEN_MAX, UINT64,   "max open dbs", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
158     STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD,           nullptr, UINT64,   "period, in ms, that recovery log is automatically fsynced", TOKU_ENGINE_STATUS);
159 
160     STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
161     ydb_layer_status.initialized = true;
162 }
163 #undef STATUS_INIT
164 
165 static void
ydb_layer_get_status(DB_ENV * env,YDB_LAYER_STATUS statp)166 ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
167     STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
168     STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
169     *statp = ydb_layer_status;
170 }
171 
172 /********************************************************************************
173  * End of ydb_layer local status section.
174  */
175 
176 static DB_ENV * volatile most_recent_env;   // most recently opened env, used for engine status on crash.  Note there are likely to be races on this if you have multiple threads creating and closing environments in parallel.  We'll declare it volatile since at least that helps make sure the compiler doesn't optimize away certain code (e.g., if while debugging, you write a code that spins on most_recent_env, you'd like to compiler not to optimize your code away.)
177 
178 static int env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt);
179 static int toku_maybe_get_engine_status_text (char* buff, int buffsize);  // for use by toku_assert
180 static int toku_maybe_err_engine_status (void);
181 static void toku_maybe_set_env_panic(int code, const char * msg);               // for use by toku_assert
182 
183 int
toku_ydb_init(void)184 toku_ydb_init(void) {
185     int r = 0;
186     //Lower level must be initialized first.
187     r = toku_ft_layer_init();
188     return r;
189 }
190 
191 // Do not clean up resources if env is panicked, just exit ugly
192 void
toku_ydb_destroy(void)193 toku_ydb_destroy(void) {
194     if (!ydb_layer_status.initialized)
195         return;
196     if (env_is_panicked == 0) {
197         toku_ft_layer_destroy();
198     }
199     ydb_layer_status.initialized = false;
200 }
201 
202 static int
ydb_getf_do_nothing(DBT const * UU (key),DBT const * UU (val),void * UU (extra))203 ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
204     return 0;
205 }
206 
207 /* env methods */
208 
209 static void
env_fs_report_in_yellow(DB_ENV * UU (env))210 env_fs_report_in_yellow(DB_ENV *UU(env)) {
211     char tbuf[26];
212     time_t tnow = time(NULL);
213     fprintf(stderr, "%.24s PerconaFT file system space is low\n", ctime_r(&tnow, tbuf)); fflush(stderr);
214 }
215 
216 static void
env_fs_report_in_red(DB_ENV * UU (env))217 env_fs_report_in_red(DB_ENV *UU(env)) {
218     char tbuf[26];
219     time_t tnow = time(NULL);
220     fprintf(stderr, "%.24s PerconaFT file system space is really low and access is restricted\n", ctime_r(&tnow, tbuf)); fflush(stderr);
221 }
222 
223 static inline uint64_t
env_fs_redzone(DB_ENV * env,uint64_t total)224 env_fs_redzone(DB_ENV *env, uint64_t total) {
225     return total * env->i->redzone / 100;
226 }
227 
228 #define ZONEREPORTLIMIT 12
229 // Check the available space in the file systems used by tokuft and erect barriers when available space gets low.
230 static int
env_fs_poller(void * arg)231 env_fs_poller(void *arg) {
232     if(force_recovery == 6) {
233 	    return 0;
234     }
235     DB_ENV *env = (DB_ENV *) arg;
236     int r;
237 
238     int in_yellow; // set true to issue warning to user
239     int in_red;    // set true to prevent certain operations (returning ENOSPC)
240 
241     // get the fs sizes for the home dir
242     uint64_t avail_size = 0, total_size = 0;
243     r = toku_get_filesystem_sizes(env->i->dir, &avail_size, NULL, &total_size);
244     assert(r == 0);
245     in_yellow = (avail_size < 2 * env_fs_redzone(env, total_size));
246     in_red = (avail_size < env_fs_redzone(env, total_size));
247 
248     // get the fs sizes for the data dir if different than the home dir
249     if (strcmp(env->i->dir, env->i->real_data_dir) != 0) {
250         r = toku_get_filesystem_sizes(env->i->real_data_dir, &avail_size, NULL, &total_size);
251         assert(r == 0);
252         in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
253         in_red += (avail_size < env_fs_redzone(env, total_size));
254     }
255 
256     // get the fs sizes for the log dir if different than the home dir and data dir
257     if (strcmp(env->i->dir, env->i->real_log_dir) != 0 && strcmp(env->i->real_data_dir, env->i->real_log_dir) != 0) {
258         r = toku_get_filesystem_sizes(env->i->real_log_dir, &avail_size, NULL, &total_size);
259         assert(r == 0);
260         in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
261         in_red += (avail_size < env_fs_redzone(env, total_size));
262     }
263 
264     env->i->fs_seq++;                    // how many times through this polling loop?
265     uint64_t now = env->i->fs_seq;
266 
267     // Don't issue report if we have not been out of this fs_state for a while, unless we're at system startup
268     switch (env->i->fs_state) {
269     case FS_RED:
270         if (!in_red) {
271             if (in_yellow) {
272                 env->i->fs_state = FS_YELLOW;
273             } else {
274                 env->i->fs_state = FS_GREEN;
275             }
276         }
277         break;
278     case FS_YELLOW:
279         if (in_red) {
280             if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
281                 env_fs_report_in_red(env);
282             env->i->fs_state = FS_RED;
283             env->i->last_seq_entered_red = now;
284         } else if (!in_yellow) {
285             env->i->fs_state = FS_GREEN;
286         }
287         break;
288     case FS_GREEN:
289         if (in_red) {
290             if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
291                 env_fs_report_in_red(env);
292             env->i->fs_state = FS_RED;
293             env->i->last_seq_entered_red = now;
294         } else if (in_yellow) {
295             if ((now - env->i->last_seq_entered_yellow > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
296                 env_fs_report_in_yellow(env);
297             env->i->fs_state = FS_YELLOW;
298             env->i->last_seq_entered_yellow = now;
299         }
300         break;
301     default:
302         assert(0);
303     }
304     return 0;
305 }
306 #undef ZONEREPORTLIMIT
307 
308 static void
env_fs_init(DB_ENV * env)309 env_fs_init(DB_ENV *env) {
310     env->i->fs_state = FS_GREEN;
311     env->i->fs_poll_time = 5;  // seconds
312     env->i->redzone = 5;       // percent of total space
313     env->i->fs_poller_is_init = false;
314 }
315 
316 // Initialize the minicron that polls file system space
317 static int
env_fs_init_minicron(DB_ENV * env)318 env_fs_init_minicron(DB_ENV *env) {
319     if(force_recovery == 6) {
320         return 0;
321     }
322     int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
323     if (r == 0)
324         env->i->fs_poller_is_init = true;
325     return r;
326 }
327 
328 // Destroy the file system space minicron
329 static void
env_fs_destroy(DB_ENV * env)330 env_fs_destroy(DB_ENV *env) {
331     if (env->i->fs_poller_is_init) {
332         int r = toku_minicron_shutdown(&env->i->fs_poller);
333         assert(r == 0);
334         env->i->fs_poller_is_init = false;
335     }
336 }
337 
338 static int
env_fsync_log_on_minicron(void * arg)339 env_fsync_log_on_minicron(void *arg) {
340     DB_ENV *env = (DB_ENV *) arg;
341     int r = env->log_flush(env, 0);
342     assert(r == 0);
343     return 0;
344 }
345 
346 static void
env_fsync_log_init(DB_ENV * env)347 env_fsync_log_init(DB_ENV *env) {
348     env->i->fsync_log_period_ms = 0;
349     env->i->fsync_log_cron_is_init = false;
350 }
351 
UU()352 static void UU()
353 env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
354     env->i->fsync_log_period_ms = period_ms;
355     if (env->i->fsync_log_cron_is_init) {
356         toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
357     }
358 }
359 
360 static int
env_fsync_log_cron_init(DB_ENV * env)361 env_fsync_log_cron_init(DB_ENV *env) {
362     int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
363     if (r == 0)
364         env->i->fsync_log_cron_is_init = true;
365     return r;
366 }
367 
368 static void
env_fsync_log_cron_destroy(DB_ENV * env)369 env_fsync_log_cron_destroy(DB_ENV *env) {
370     if (env->i->fsync_log_cron_is_init) {
371         int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
372         assert(r == 0);
373         env->i->fsync_log_cron_is_init = false;
374     }
375 }
376 
377 static void
env_setup_real_dir(DB_ENV * env,char ** real_dir,const char * nominal_dir)378 env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
379     toku_free(*real_dir);
380     *real_dir = NULL;
381 
382     assert(env->i->dir);
383     if (nominal_dir)
384         *real_dir = toku_construct_full_name(2, env->i->dir, nominal_dir);
385     else
386         *real_dir = toku_strdup(env->i->dir);
387 }
388 
389 static void
env_setup_real_data_dir(DB_ENV * env)390 env_setup_real_data_dir(DB_ENV *env) {
391     env_setup_real_dir(env, &env->i->real_data_dir, env->i->data_dir);
392 }
393 
394 static void
env_setup_real_log_dir(DB_ENV * env)395 env_setup_real_log_dir(DB_ENV *env) {
396     env_setup_real_dir(env, &env->i->real_log_dir, env->i->lg_dir);
397 }
398 
399 static void
env_setup_real_tmp_dir(DB_ENV * env)400 env_setup_real_tmp_dir(DB_ENV *env) {
401     env_setup_real_dir(env, &env->i->real_tmp_dir, env->i->tmp_dir);
402 }
403 
keep_cachetable_callback(DB_ENV * env,CACHETABLE cachetable)404 static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
405 {
406     env->i->cachetable = cachetable;
407 }
408 
409 static int
ydb_do_recovery(DB_ENV * env)410 ydb_do_recovery (DB_ENV *env) {
411     assert(env->i->real_log_dir);
412     int r = tokuft_recover(env,
413                            toku_keep_prepared_txn_callback,
414                            keep_cachetable_callback,
415                            env->i->logger,
416                            env->i->dir, env->i->real_log_dir, env->i->bt_compare,
417                            env->i->update_function,
418                            env->i->generate_row_for_put, env->i->generate_row_for_del,
419                            env->i->cachetable_size);
420     return r;
421 }
422 
423 static int
needs_recovery(DB_ENV * env)424 needs_recovery (DB_ENV *env) {
425     assert(env->i->real_log_dir);
426     int recovery_needed = tokuft_needs_recovery(env->i->real_log_dir, true);
427     return recovery_needed ? DB_RUNRECOVERY : 0;
428 }
429 
430 static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, uint32_t flags);
431 
432 // Keys used in persistent environment dictionary:
433 // Following keys added in version 12
434 static const char * orig_env_ver_key = "original_version";
435 static const char * curr_env_ver_key = "current_version";
436 // Following keys added in version 14, add more keys for future versions
437 static const char * creation_time_key         = "creation_time";
438 
get_upgrade_time_key(int version)439 static char * get_upgrade_time_key(int version) {
440     static char upgrade_time_key[sizeof("upgrade_v_time") + 12];
441     {
442         int n;
443         n = snprintf(upgrade_time_key, sizeof(upgrade_time_key), "upgrade_v%d_time", version);
444         assert(n >= 0 && n < (int)sizeof(upgrade_time_key));
445     }
446     return &upgrade_time_key[0];
447 }
448 
get_upgrade_footprint_key(int version)449 static char * get_upgrade_footprint_key(int version) {
450     static char upgrade_footprint_key[sizeof("upgrade_v_footprint") + 12];
451     {
452         int n;
453         n = snprintf(upgrade_footprint_key, sizeof(upgrade_footprint_key), "upgrade_v%d_footprint", version);
454         assert(n >= 0 && n < (int)sizeof(upgrade_footprint_key));
455     }
456     return &upgrade_footprint_key[0];
457 }
458 
get_upgrade_last_lsn_key(int version)459 static char * get_upgrade_last_lsn_key(int version) {
460     static char upgrade_last_lsn_key[sizeof("upgrade_v_last_lsn") + 12];
461     {
462         int n;
463         n = snprintf(upgrade_last_lsn_key, sizeof(upgrade_last_lsn_key), "upgrade_v%d_last_lsn", version);
464         assert(n >= 0 && n < (int)sizeof(upgrade_last_lsn_key));
465     }
466     return &upgrade_last_lsn_key[0];
467 }
468 
469 // Values read from (or written into) persistent environment,
470 // kept here for read-only access from engine status.
471 // Note, persistent_upgrade_status info is separate in part to simplify its exclusion from engine status until relevant.
472 typedef enum {
473     PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION = 0,
474     PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP,    // read from curr_env_ver_key, prev version as of this startup
475     PERSISTENT_UPGRADE_LAST_LSN_OF_V13,
476     PERSISTENT_UPGRADE_V14_TIME,
477     PERSISTENT_UPGRADE_V14_FOOTPRINT,
478     PERSISTENT_UPGRADE_STATUS_NUM_ROWS
479 } persistent_upgrade_status_entry;
480 
481 typedef struct {
482     bool initialized;
483     TOKU_ENGINE_STATUS_ROW_S status[PERSISTENT_UPGRADE_STATUS_NUM_ROWS];
484 } PERSISTENT_UPGRADE_STATUS_S, *PERSISTENT_UPGRADE_STATUS;
485 
486 static PERSISTENT_UPGRADE_STATUS_S persistent_upgrade_status;
487 
488 #define PERSISTENT_UPGRADE_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(persistent_upgrade_status, k, c, t, "upgrade: " l, inc)
489 
490 static void
persistent_upgrade_status_init(void)491 persistent_upgrade_status_init (void) {
492     // Note, this function initializes the keyname, type, and legend fields.
493     // Value fields are initialized to zero by compiler.
494 
495     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION,           nullptr, UINT64,   "original version (at time of environment creation)", TOKU_ENGINE_STATUS);
496     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP,  nullptr, UINT64,   "version at time of startup", TOKU_ENGINE_STATUS);
497     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_LAST_LSN_OF_V13,                nullptr, UINT64,   "last LSN of version 13", TOKU_ENGINE_STATUS);
498     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_TIME,                       nullptr, UNIXTIME, "time of upgrade to version 14", TOKU_ENGINE_STATUS);
499     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_FOOTPRINT,                  nullptr, UINT64,   "footprint from version 13 to 14", TOKU_ENGINE_STATUS);
500     persistent_upgrade_status.initialized = true;
501 }
502 
503 #define PERSISTENT_UPGRADE_STATUS_VALUE(x) persistent_upgrade_status.status[x].value.num
504 
505 // Requires: persistent environment dictionary is already open.
506 // Input arg is lsn of clean shutdown of previous version,
507 // or ZERO_LSN if no upgrade or if crash between log upgrade and here.
508 // NOTE: To maintain compatibility with previous versions, do not change the
509 //       format of any information stored in the persistent environment dictionary.
510 //       For example, some values are stored as 32 bits, even though they are immediately
511 //       converted to 64 bits when read.  Do not change them to be stored as 64 bits.
512 //
513 static int
maybe_upgrade_persistent_environment_dictionary(DB_ENV * env,DB_TXN * txn,LSN last_lsn_of_clean_shutdown_read_from_log)514 maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN last_lsn_of_clean_shutdown_read_from_log) {
515     int r;
516     DBT key, val;
517     DB *persistent_environment = env->i->persistent_environment;
518 
519     if (!persistent_upgrade_status.initialized)
520         persistent_upgrade_status_init();
521 
522     toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
523     toku_init_dbt(&val);
524     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
525     assert(r == 0);
526     uint32_t stored_env_version = toku_dtoh32(*(uint32_t*)val.data);
527     PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP) = stored_env_version;
528     if (stored_env_version > FT_LAYOUT_VERSION)
529         r = TOKUDB_DICTIONARY_TOO_NEW;
530     else if (stored_env_version < FT_LAYOUT_MIN_SUPPORTED_VERSION)
531         r = TOKUDB_DICTIONARY_TOO_OLD;
532     else if (stored_env_version < FT_LAYOUT_VERSION) {
533         const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
534         toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
535         toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
536         r = toku_db_put(persistent_environment, txn, &key, &val, 0, false);
537         assert_zero(r);
538 
539         time_t upgrade_time_d = toku_htod64(time(NULL));
540         uint64_t upgrade_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
541         uint64_t upgrade_last_lsn_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
542         for (int version = stored_env_version+1; version <= FT_LAYOUT_VERSION; version++) {
543             uint32_t put_flag = DB_NOOVERWRITE;
544             if (version <= FT_LAYOUT_VERSION_19) {
545                 // See #5902.
546                 // To prevent a crash (and any higher complexity code) we'll simply
547                 // silently not overwrite anything if it exists.
548                 // The keys existing for version <= 19 is not necessarily an error.
549                 // If this happens for versions > 19 it IS an error and we'll use DB_NOOVERWRITE.
550                 put_flag = DB_NOOVERWRITE_NO_ERROR;
551             }
552 
553 
554             char* upgrade_time_key = get_upgrade_time_key(version);
555             toku_fill_dbt(&key, upgrade_time_key, strlen(upgrade_time_key));
556             toku_fill_dbt(&val, &upgrade_time_d, sizeof(upgrade_time_d));
557             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
558             assert_zero(r);
559 
560             char* upgrade_footprint_key = get_upgrade_footprint_key(version);
561             toku_fill_dbt(&key, upgrade_footprint_key, strlen(upgrade_footprint_key));
562             toku_fill_dbt(&val, &upgrade_footprint_d, sizeof(upgrade_footprint_d));
563             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
564             assert_zero(r);
565 
566             char* upgrade_last_lsn_key = get_upgrade_last_lsn_key(version);
567             toku_fill_dbt(&key, upgrade_last_lsn_key, strlen(upgrade_last_lsn_key));
568             toku_fill_dbt(&val, &upgrade_last_lsn_d, sizeof(upgrade_last_lsn_d));
569             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
570             assert_zero(r);
571         }
572 
573     }
574     return r;
575 }
576 
577 // Capture contents of persistent_environment dictionary so that it can be read by engine status
578 static void
capture_persistent_env_contents(DB_ENV * env,DB_TXN * txn)579 capture_persistent_env_contents (DB_ENV * env, DB_TXN * txn) {
580     int r;
581     DBT key, val;
582     DB *persistent_environment = env->i->persistent_environment;
583 
584     toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
585     toku_init_dbt(&val);
586     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
587     assert_zero(r);
588     uint32_t curr_env_version = toku_dtoh32(*(uint32_t*)val.data);
589     assert(curr_env_version == FT_LAYOUT_VERSION);
590 
591     toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
592     toku_init_dbt(&val);
593     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
594     assert_zero(r);
595     uint64_t persistent_original_env_version = toku_dtoh32(*(uint32_t*)val.data);
596     PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION) = persistent_original_env_version;
597     assert(persistent_original_env_version <= curr_env_version);
598 
599     // make no assertions about timestamps, clock may have been reset
600     if (persistent_original_env_version >= FT_LAYOUT_VERSION_14) {
601         toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
602         toku_init_dbt(&val);
603         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
604         assert_zero(r);
605         STATUS_VALUE(YDB_LAYER_TIME_CREATION) = toku_dtoh64((*(time_t*)val.data));
606     }
607 
608     if (persistent_original_env_version != curr_env_version) {
609         // an upgrade was performed at some time, capture info about the upgrade
610 
611         char * last_lsn_key = get_upgrade_last_lsn_key(curr_env_version);
612         toku_fill_dbt(&key, last_lsn_key, strlen(last_lsn_key));
613         toku_init_dbt(&val);
614         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
615         assert_zero(r);
616         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_LAST_LSN_OF_V13) = toku_dtoh64(*(uint64_t*)val.data);
617 
618         char * time_key = get_upgrade_time_key(curr_env_version);
619         toku_fill_dbt(&key, time_key, strlen(time_key));
620         toku_init_dbt(&val);
621         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
622         assert_zero(r);
623         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_TIME) = toku_dtoh64(*(time_t*)val.data);
624 
625         char * footprint_key = get_upgrade_footprint_key(curr_env_version);
626         toku_fill_dbt(&key, footprint_key, strlen(footprint_key));
627         toku_init_dbt(&val);
628         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
629         assert_zero(r);
630         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_FOOTPRINT) = toku_dtoh64(*(uint64_t*)val.data);
631     }
632 
633 }
634 
635 // return 0 if log exists or ENOENT if log does not exist
636 static int
ydb_recover_log_exists(DB_ENV * env)637 ydb_recover_log_exists(DB_ENV *env) {
638     int r = tokuft_recover_log_exists(env->i->real_log_dir);
639     return r;
640 }
641 
642 // Validate that all required files are present, no side effects.
643 // Return 0 if all is well, ENOENT if some files are present but at least one is
644 // missing,
645 // other non-zero value if some other error occurs.
646 // Set *valid_newenv if creating a new environment (all files missing).
647 // (Note, if special dictionaries exist, then they were created transactionally
648 // and log should exist.)
validate_env(DB_ENV * env,bool * valid_newenv,bool need_rollback_cachefile)649 static int validate_env(DB_ENV *env,
650                         bool *valid_newenv,
651                         bool need_rollback_cachefile) {
652     int r;
653     bool expect_newenv = false;  // set true if we expect to create a new env
654     toku_struct_stat buf;
655     char *path = NULL;
656 
657     // Test for persistent environment
658     path = toku_construct_full_name(
659         2, env->i->dir, toku_product_name_strings.environmentdictionary);
660     assert(path);
661     r = toku_stat(path, &buf, toku_uninstrumented);
662     if (r == 0) {
663         expect_newenv = false;  // persistent info exists
664     } else {
665         int stat_errno = get_error_errno();
666         if (stat_errno == ENOENT) {
667             expect_newenv = true;
668             r = 0;
669         } else {
670             r = toku_ydb_do_error(
671                 env,
672                 stat_errno,
673                 "Unable to access persistent environment [%s] in [%s]\n",
674                 toku_product_name_strings.environmentdictionary,
675                 env->i->dir);
676             assert(r);
677         }
678     }
679     toku_free(path);
680 
681     // Test for existence of rollback cachefile if it is expected to exist
682     if (r == 0 && need_rollback_cachefile) {
683         path = toku_construct_full_name(
684             2, env->i->dir, toku_product_name_strings.rollback_cachefile);
685         assert(path);
686         r = toku_stat(path, &buf, toku_uninstrumented);
687         if (r == 0) {
688             if (expect_newenv)  // rollback cachefile exists, but persistent env
689                                 // is missing
690                 r = toku_ydb_do_error(
691                     env,
692                     ENOENT,
693                     "Persistent environment is missing while looking for "
694                     "rollback cachefile [%s] in [%s]\n",
695                     toku_product_name_strings.rollback_cachefile, env->i->dir);
696         } else {
697             int stat_errno = get_error_errno();
698             if (stat_errno == ENOENT) {
699                 if (!expect_newenv)  // rollback cachefile is missing but
700                                      // persistent env exists
701                     r = toku_ydb_do_error(
702                         env,
703                         ENOENT,
704                         "rollback cachefile [%s] is missing from [%s]\n",
705                         toku_product_name_strings.rollback_cachefile,
706                         env->i->dir);
707                 else
708                     r = 0;  // both rollback cachefile and persistent env are
709                             // missing
710             } else {
711                 r = toku_ydb_do_error(
712                     env,
713                     stat_errno,
714                     "Unable to access rollback cachefile [%s] in [%s]\n",
715                     toku_product_name_strings.rollback_cachefile,
716                     env->i->dir);
717                 assert(r);
718             }
719         }
720         toku_free(path);
721     }
722 
723     // Test for fileops directory
724     if (r == 0 && force_recovery != 6) {
725         path = toku_construct_full_name(
726             2, env->i->dir, toku_product_name_strings.fileopsdirectory);
727         assert(path);
728         r = toku_stat(path, &buf, toku_uninstrumented);
729         if (r == 0) {
730             if (expect_newenv)  // fileops directory exists, but persistent env
731                                 // is missing
732                 r = toku_ydb_do_error(
733                     env,
734                     ENOENT,
735                     "Persistent environment is missing while looking for "
736                     "fileops directory [%s] in [%s]\n",
737                     toku_product_name_strings.fileopsdirectory,
738                     env->i->dir);
739         } else {
740             int stat_errno = get_error_errno();
741             if (stat_errno == ENOENT) {
742                 if (!expect_newenv)  // fileops directory is missing but
743                                      // persistent env exists
744                     r = toku_ydb_do_error(
745                         env,
746                         ENOENT,
747                         "Fileops directory [%s] is missing from [%s]\n",
748                         toku_product_name_strings.fileopsdirectory,
749                         env->i->dir);
750                 else
751                     r = 0;  // both fileops directory and persistent env are
752                             // missing
753             } else {
754                 r = toku_ydb_do_error(
755                     env,
756                     stat_errno,
757                     "Unable to access fileops directory [%s] in [%s]\n",
758                     toku_product_name_strings.fileopsdirectory,
759                     env->i->dir);
760                 assert(r);
761             }
762         }
763         toku_free(path);
764     }
765 
766     // Test for recovery log
767     if ((r == 0) && (env->i->open_flags & DB_INIT_LOG) && force_recovery != 6) {
768         // if using transactions, test for existence of log
769         r = ydb_recover_log_exists(env);  // return 0 or ENOENT
770         if (expect_newenv && (r != ENOENT))
771             r = toku_ydb_do_error(env,
772                                   ENOENT,
773                                   "Persistent environment information is "
774                                   "missing (but log exists) while looking for "
775                                   "recovery log files in [%s]\n",
776                                   env->i->real_log_dir);
777         else if (!expect_newenv && r == ENOENT)
778             r = toku_ydb_do_error(env,
779                                   ENOENT,
780                                   "Recovery log is missing (persistent "
781                                   "environment information is present) while "
782                                   "looking for recovery log files in [%s]\n",
783                                   env->i->real_log_dir);
784         else
785             r = 0;
786     }
787 
788     if (r == 0)
789         *valid_newenv = expect_newenv;
790     else
791         *valid_newenv = false;
792     return r;
793 }
794 
795 // The version of the environment (on disk) is the version of the recovery log.
796 // If the recovery log is of the current version, then there is no upgrade to be done.
797 // If the recovery log is of an old version, then replacing it with a new recovery log
798 // of the current version is how the upgrade is done.
799 // Note, the upgrade procedure takes a checkpoint, so we must release the ydb lock.
800 static int
ydb_maybe_upgrade_env(DB_ENV * env,LSN * last_lsn_of_clean_shutdown_read_from_log,bool * upgrade_in_progress)801 ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, bool * upgrade_in_progress) {
802     int r = 0;
803     if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
804         r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
805     }
806     return r;
807 }
808 
809 static void
unlock_single_process(DB_ENV * env)810 unlock_single_process(DB_ENV *env) {
811     int r;
812     r = toku_single_process_unlock(&env->i->envdir_lockfd);
813     lazy_assert_zero(r);
814     r = toku_single_process_unlock(&env->i->datadir_lockfd);
815     lazy_assert_zero(r);
816     r = toku_single_process_unlock(&env->i->logdir_lockfd);
817     lazy_assert_zero(r);
818     r = toku_single_process_unlock(&env->i->tmpdir_lockfd);
819     lazy_assert_zero(r);
820 }
821 
822 // Open the environment.
823 // If this is a new environment, then create the necessary files.
824 // Return 0 on success, ENOENT if any of the expected necessary files are missing.
825 // (The set of necessary files is defined in the function validate_env() above.)
826 static int
env_open(DB_ENV * env,const char * home,uint32_t flags,int mode)827 env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
828 
829     if(force_recovery == 6) {
830         {
831             const int len = strlen(toku_product_name_strings.rollback_cachefile);
832             toku_product_name_strings.rollback_cachefile[len] = '2';
833             toku_product_name_strings.rollback_cachefile[len+1] = 0;
834         }
835 
836         {
837             const int len = strlen(toku_product_name_strings.single_process_lock);
838             toku_product_name_strings.single_process_lock[len] = '2';
839             toku_product_name_strings.single_process_lock[len+1] = 0;
840         }
841 
842         {
843             const int len = strlen(toku_product_name_strings.environmentdictionary);
844             toku_product_name_strings.environmentdictionary[len] = '2';
845             toku_product_name_strings.environmentdictionary[len+1] = 0;
846         }
847     }
848 
849     HANDLE_PANICKED_ENV(env);
850     int r;
851     bool newenv;  // true iff creating a new environment
852     uint32_t unused_flags=flags;
853     CHECKPOINTER cp;
854     DB_TXN *txn = NULL;
855 
856     if (env_opened(env)) {
857         r = toku_ydb_do_error(env, EINVAL, "The environment is already open\n");
858         goto cleanup;
859     }
860 
861     if (env->get_check_thp(env) && toku_os_huge_pages_enabled()) {
862         r = toku_ydb_do_error(env, TOKUDB_HUGE_PAGES_ENABLED,
863                               "Huge pages are enabled, disable them before continuing\n");
864         goto cleanup;
865     }
866 
867     most_recent_env = NULL;
868 
869     assert(sizeof(time_t) == sizeof(uint64_t));
870 
871     HANDLE_EXTRA_FLAGS(env, flags,
872                        DB_CREATE|DB_PRIVATE|DB_INIT_LOG|DB_INIT_TXN|DB_RECOVER|DB_INIT_MPOOL|DB_INIT_LOCK|DB_THREAD);
873 
874     // DB_CREATE means create if env does not exist, and PerconaFT requires it because
875     // PerconaFT requries DB_PRIVATE.
876     if ((flags & DB_PRIVATE) && !(flags & DB_CREATE)) {
877         r = toku_ydb_do_error(env, ENOENT, "DB_PRIVATE requires DB_CREATE (seems gratuitous to us, but that's BDB's behavior\n");
878         goto cleanup;
879     }
880 
881     if (!(flags & DB_PRIVATE)) {
882         r = toku_ydb_do_error(env, ENOENT, "PerconaFT requires DB_PRIVATE\n");
883         goto cleanup;
884     }
885 
886     if ((flags & DB_INIT_LOG) && !(flags & DB_INIT_TXN)) {
887         r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires transactions for logging\n");
888         goto cleanup;
889     }
890 
891     if (!home) home = ".";
892 
893     // Verify that the home exists.
894     toku_struct_stat buf;
895     r = toku_stat(home, &buf, toku_uninstrumented);
896     if (r != 0) {
897         int e = get_error_errno();
898         r = toku_ydb_do_error(
899             env, e, "Error from toku_stat(\"%s\",...)\n", home);
900         goto cleanup;
901     }
902     unused_flags &= ~DB_PRIVATE;
903 
904     if (env->i->dir) {
905         toku_free(env->i->dir);
906     }
907     env->i->dir = toku_strdup(home);
908     if (env->i->dir == 0) {
909         r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
910         goto cleanup;
911     }
912     env->i->open_flags = flags;
913     env->i->open_mode = mode;
914 
915     // Instrumentation probe start
916     TOKU_PROBE_START(toku_instr_probe_1);
917 
918     env_setup_real_data_dir(env);
919     env_setup_real_log_dir(env);
920     env_setup_real_tmp_dir(env);
921 
922     // Instrumentation probe stop
923     toku_instr_probe_1->stop();
924 
925     r = toku_single_process_lock(
926         env->i->dir, "environment", &env->i->envdir_lockfd);
927     if (r != 0)
928         goto cleanup;
929     r = toku_single_process_lock(
930         env->i->real_data_dir, "data", &env->i->datadir_lockfd);
931     if (r!=0) goto cleanup;
932     r = toku_single_process_lock(env->i->real_log_dir, "logs", &env->i->logdir_lockfd);
933     if (r!=0) goto cleanup;
934     r = toku_single_process_lock(env->i->real_tmp_dir, "temp", &env->i->tmpdir_lockfd);
935     if (r!=0) goto cleanup;
936 
937     bool need_rollback_cachefile;
938     need_rollback_cachefile = false;
939     if (flags & (DB_INIT_TXN | DB_INIT_LOG) && force_recovery != 6) {
940         need_rollback_cachefile = true;
941     }
942 
943     ydb_layer_status_init();  // do this before possibly upgrading, so upgrade work is counted in status counters
944 
945     LSN last_lsn_of_clean_shutdown_read_from_log;
946     last_lsn_of_clean_shutdown_read_from_log = ZERO_LSN;
947     bool upgrade_in_progress;
948     upgrade_in_progress = false;
949     r = ydb_maybe_upgrade_env(env, &last_lsn_of_clean_shutdown_read_from_log, &upgrade_in_progress);
950     if (r!=0) goto cleanup;
951 
952     if (upgrade_in_progress || force_recovery == 6) {
953         // Delete old rollback file.  There was a clean shutdown, so it has nothing useful,
954         // and there is no value in upgrading it.  It is simpler to just create a new one.
955         char* rollback_filename = toku_construct_full_name(2, env->i->dir, toku_product_name_strings.rollback_cachefile);
956         assert(rollback_filename);
957         r = unlink(rollback_filename);
958         if (r != 0) {
959             assert(get_error_errno() == ENOENT);
960         }
961         toku_free(rollback_filename);
962         need_rollback_cachefile = false;  // we're not expecting it to exist now
963     }
964 
965     r = validate_env(env, &newenv, need_rollback_cachefile);  // make sure that environment is either new or complete
966     if (r != 0) goto cleanup;
967 
968     unused_flags &= ~DB_INIT_TXN & ~DB_INIT_LOG;
969 
970     if(force_recovery == 6) {
971         flags |= DB_INIT_LOG | DB_INIT_TXN;
972     }
973 
974     // do recovery only if there exists a log and recovery is requested
975     // otherwise, a log is created when the logger is opened later
976     if (!newenv && force_recovery == 0) {
977         if (flags & DB_INIT_LOG) {
978             // the log does exist
979             if (flags & DB_RECOVER) {
980                 r = ydb_do_recovery(env);
981                 if (r != 0) goto cleanup;
982             } else {
983                 // the log is required to have clean shutdown if recovery is not requested
984                 r = needs_recovery(env);
985                 if (r != 0) goto cleanup;
986             }
987         }
988     }
989 
990     toku_loader_cleanup_temp_files(env);
991 
992     if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
993         assert(env->i->logger);
994         toku_logger_write_log_files(env->i->logger, (bool)((flags & DB_INIT_LOG) != 0));
995         if (!toku_logger_is_open(env->i->logger)) {
996             r = toku_logger_open(env->i->real_log_dir, env->i->logger);
997             if (r!=0) {
998                 toku_ydb_do_error(env, r, "Could not open logger\n");
999             }
1000         }
1001     } else {
1002         r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
1003         assert_zero(r);
1004     }
1005 
1006     unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
1007     unused_flags &= ~DB_CREATE;     // we always do DB_CREATE
1008     unused_flags &= ~DB_INIT_LOCK;  // we check this later (e.g. in db->open)
1009     unused_flags &= ~DB_RECOVER;
1010 
1011 // This is probably correct, but it will be pain...
1012 //    if ((flags & DB_THREAD)==0) {
1013 //        r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires DB_THREAD");
1014 //        goto cleanup;
1015 //    }
1016     unused_flags &= ~DB_THREAD;
1017 
1018     if (unused_flags!=0) {
1019         r = toku_ydb_do_error(env, EINVAL, "Extra flags not understood by tokuft: %u\n", unused_flags);
1020         goto cleanup;
1021     }
1022 
1023     if (env->i->cachetable==NULL) {
1024         // If we ran recovery then the cachetable should be set here.
1025         r = toku_cachetable_create_ex(&env->i->cachetable, env->i->cachetable_size,
1026                                    env->i->client_pool_threads,
1027                                    env->i->cachetable_pool_threads,
1028                                    env->i->checkpoint_pool_threads,
1029                                    ZERO_LSN, env->i->logger);
1030         if (r != 0) {
1031             r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
1032             goto cleanup;
1033         }
1034     }
1035 
1036     toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
1037 
1038     int using_txns;
1039     using_txns = env->i->open_flags & DB_INIT_TXN;
1040     if (env->i->logger) {
1041         // if this is a newborn env or if this is an upgrade, then create a brand new rollback file
1042         assert (using_txns);
1043         toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
1044         if (!toku_logger_rollback_is_open(env->i->logger)) {
1045             bool create_new_rollback_file = newenv | upgrade_in_progress | (force_recovery == 6);
1046             r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
1047             if (r != 0) {
1048                 r = toku_ydb_do_error(env, r, "Cant open rollback\n");
1049                 goto cleanup;
1050             }
1051         }
1052     }
1053 
1054     if (using_txns) {
1055         r = toku_txn_begin(env, 0, &txn, 0);
1056         assert_zero(r);
1057     }
1058 
1059     {
1060         r = toku_db_create(&env->i->persistent_environment, env, 0);
1061         assert_zero(r);
1062         r = toku_db_use_builtin_key_cmp(env->i->persistent_environment);
1063         assert_zero(r);
1064 	writing_rollback++;
1065         r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
1066         if (r != 0) {
1067             r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
1068             goto cleanup;
1069         }
1070         if (newenv) {
1071             // create new persistent_environment
1072             DBT key, val;
1073             uint32_t persistent_original_env_version = FT_LAYOUT_VERSION;
1074             const uint32_t environment_version = toku_htod32(persistent_original_env_version);
1075 
1076             toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
1077             toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1078             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1079             assert_zero(r);
1080 
1081             toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
1082             toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1083             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1084             assert_zero(r);
1085 
1086             time_t creation_time_d = toku_htod64(time(NULL));
1087             toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
1088             toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
1089             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1090             assert_zero(r);
1091         }
1092         else {
1093             r = maybe_upgrade_persistent_environment_dictionary(env, txn, last_lsn_of_clean_shutdown_read_from_log);
1094             assert_zero(r);
1095         }
1096         capture_persistent_env_contents(env, txn);
1097 	writing_rollback--;
1098     }
1099     {
1100         r = toku_db_create(&env->i->directory, env, 0);
1101         assert_zero(r);
1102         r = toku_db_use_builtin_key_cmp(env->i->directory);
1103         assert_zero(r);
1104         r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
1105         if (r != 0) {
1106             r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
1107             goto cleanup;
1108         }
1109     }
1110     if (using_txns) {
1111         r = locked_txn_commit(txn, 0);
1112         assert_zero(r);
1113         txn = NULL;
1114     }
1115     cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1116     if (!force_recovery) {
1117         r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
1118     }
1119     writing_rollback--;
1120     env_fs_poller(env);          // get the file system state at startup
1121     r = env_fs_init_minicron(env);
1122     if (r != 0) {
1123         r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
1124         goto cleanup;
1125     }
1126     r = env_fsync_log_cron_init(env);
1127     if (r != 0) {
1128         r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
1129         goto cleanup;
1130     }
1131 cleanup:
1132     if (r!=0) {
1133         if (txn) {
1134             locked_txn_abort(txn);
1135         }
1136         if (env && env->i) {
1137             unlock_single_process(env);
1138         }
1139     }
1140     if (r == 0) {
1141         set_errno(0); // tabula rasa.   If there's a crash after env was successfully opened, no misleading errno will have been left around by this code.
1142         most_recent_env = env;
1143         uint64_t num_rows;
1144         env_get_engine_status_num_rows(env, &num_rows);
1145         toku_assert_set_fpointers(toku_maybe_get_engine_status_text, toku_maybe_err_engine_status, toku_maybe_set_env_panic, num_rows);
1146     }
1147     return r;
1148 }
1149 
1150 static int
env_close(DB_ENV * env,uint32_t flags)1151 env_close(DB_ENV * env, uint32_t flags) {
1152     int r = 0;
1153     const char * err_msg = NULL;
1154     bool clean_shutdown = true;
1155 
1156     if (flags & TOKUFT_DIRTY_SHUTDOWN) {
1157         clean_shutdown = false;
1158         flags &= ~TOKUFT_DIRTY_SHUTDOWN;
1159     }
1160 
1161     most_recent_env = NULL; // Set most_recent_env to NULL so that we don't have a dangling pointer (and if there's an error, the toku assert code would try to look at the env.)
1162 
1163     // if panicked, or if any open transactions, or any open dbs, then do nothing.
1164 
1165     if (toku_env_is_panicked(env)) {
1166         goto panic_and_quit_early;
1167     }
1168     if (env->i->logger && toku_logger_txns_exist(env->i->logger)) {
1169         err_msg = "Cannot close environment due to open transactions\n";
1170         r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1171         goto panic_and_quit_early;
1172     }
1173     if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
1174         if (env->i->open_dbs_by_dname->size() > 0) {
1175             err_msg = "Cannot close environment due to open DBs\n";
1176             r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1177             goto panic_and_quit_early;
1178         }
1179     }
1180     if (env->i->persistent_environment) {
1181         r = toku_db_close(env->i->persistent_environment);
1182         if (r) {
1183             err_msg = "Cannot close persistent environment dictionary (DB->close error)\n";
1184             toku_ydb_do_error(env, r, "%s", err_msg);
1185             goto panic_and_quit_early;
1186         }
1187     }
1188     if (env->i->directory) {
1189         r = toku_db_close(env->i->directory);
1190         if (r) {
1191             err_msg = "Cannot close Directory dictionary (DB->close error)\n";
1192             toku_ydb_do_error(env, r, "%s", err_msg);
1193             goto panic_and_quit_early;
1194         }
1195     }
1196     env_fsync_log_cron_destroy(env);
1197     if (env->i->cachetable) {
1198         toku_cachetable_prepare_close(env->i->cachetable);
1199         toku_cachetable_minicron_shutdown(env->i->cachetable);
1200         if (env->i->logger) {
1201             CHECKPOINTER cp = nullptr;
1202             if (clean_shutdown) {
1203                 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1204                 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1205                 if (r) {
1206                     err_msg = "Cannot close environment (error during checkpoint)\n";
1207                     toku_ydb_do_error(env, r, "%s", err_msg);
1208                     goto panic_and_quit_early;
1209                 }
1210             }
1211             toku_logger_close_rollback_check_empty(env->i->logger, clean_shutdown);
1212             if (clean_shutdown) {
1213                 //Do a second checkpoint now that the rollback cachefile is closed.
1214                 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1215                 if (r) {
1216                     err_msg = "Cannot close environment (error during checkpoint)\n";
1217                     toku_ydb_do_error(env, r, "%s", err_msg);
1218                     goto panic_and_quit_early;
1219                 }
1220                 toku_logger_shutdown(env->i->logger);
1221             }
1222         }
1223         toku_cachetable_close(&env->i->cachetable);
1224     }
1225     if (env->i->logger) {
1226         r = toku_logger_close(&env->i->logger);
1227         if (r) {
1228             err_msg = "Cannot close environment (logger close error)\n";
1229             env->i->logger = NULL;
1230             toku_ydb_do_error(env, r, "%s", err_msg);
1231             goto panic_and_quit_early;
1232         }
1233     }
1234     // Even if nothing else went wrong, but we were panicked, then raise an error.
1235     // But if something else went wrong then raise that error (above)
1236     if (toku_env_is_panicked(env)) {
1237         goto panic_and_quit_early;
1238     } else {
1239         assert(env->i->panic_string == 0);
1240     }
1241 
1242     env_fs_destroy(env);
1243     env->i->ltm.destroy();
1244     if (env->i->data_dir)
1245         toku_free(env->i->data_dir);
1246     if (env->i->lg_dir)
1247         toku_free(env->i->lg_dir);
1248     if (env->i->tmp_dir)
1249         toku_free(env->i->tmp_dir);
1250     if (env->i->real_data_dir)
1251         toku_free(env->i->real_data_dir);
1252     if (env->i->real_log_dir)
1253         toku_free(env->i->real_log_dir);
1254     if (env->i->real_tmp_dir)
1255         toku_free(env->i->real_tmp_dir);
1256     if (env->i->open_dbs_by_dname) {
1257         env->i->open_dbs_by_dname->destroy();
1258         toku_free(env->i->open_dbs_by_dname);
1259     }
1260     if (env->i->open_dbs_by_dict_id) {
1261         env->i->open_dbs_by_dict_id->destroy();
1262         toku_free(env->i->open_dbs_by_dict_id);
1263     }
1264     if (env->i->dir)
1265         toku_free(env->i->dir);
1266     toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
1267 
1268     // Immediately before freeing internal environment unlock the directories
1269     unlock_single_process(env);
1270     toku_free(env->i);
1271     toku_free(env);
1272     toku_sync_fetch_and_add(&tokuft_num_envs, -1);
1273     if (flags != 0) {
1274         r = EINVAL;
1275     }
1276     return r;
1277 
1278 panic_and_quit_early:
1279     //release lock files.
1280     unlock_single_process(env);
1281     //r is the panic error
1282     if (toku_env_is_panicked(env)) {
1283         char *panic_string = env->i->panic_string;
1284         r = toku_ydb_do_error(env, toku_env_is_panicked(env), "Cannot close environment due to previous error: %s\n", panic_string);
1285     }
1286     else {
1287         env_panic(env, r, err_msg);
1288     }
1289     return r;
1290 }
1291 
1292 static int
env_log_archive(DB_ENV * env,char ** list[],uint32_t flags)1293 env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
1294     return toku_logger_log_archive(env->i->logger, list, flags);
1295 }
1296 
1297 static int
env_log_flush(DB_ENV * env,const DB_LSN * lsn)1298 env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
1299     HANDLE_PANICKED_ENV(env);
1300     // do nothing if no logger
1301     if (env->i->logger) {
1302         // We just flush everything. MySQL uses lsn == 0 which means flush everything.
1303         // For anyone else using the log, it is correct to flush too much, so we are OK.
1304         toku_logger_fsync(env->i->logger);
1305     }
1306     return 0;
1307 }
1308 
1309 static int
env_set_cachesize(DB_ENV * env,uint32_t gbytes,uint32_t bytes,int ncache)1310 env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
1311     HANDLE_PANICKED_ENV(env);
1312     if (ncache != 1) {
1313         return EINVAL;
1314     }
1315     uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
1316     unsigned long cs = cs64;
1317     if (cs64 > cs) {
1318         return EINVAL;
1319     }
1320     env->i->cachetable_size = cs;
1321     return 0;
1322 }
1323 
1324 static int
env_set_client_pool_threads(DB_ENV * env,uint32_t threads)1325 env_set_client_pool_threads(DB_ENV * env, uint32_t threads) {
1326     HANDLE_PANICKED_ENV(env);
1327     env->i->client_pool_threads = threads;
1328     return 0;
1329 }
1330 
1331 static int
env_set_cachetable_pool_threads(DB_ENV * env,uint32_t threads)1332 env_set_cachetable_pool_threads(DB_ENV * env, uint32_t threads) {
1333     HANDLE_PANICKED_ENV(env);
1334     env->i->cachetable_pool_threads = threads;
1335     return 0;
1336 }
1337 
1338 static int
env_set_checkpoint_pool_threads(DB_ENV * env,uint32_t threads)1339 env_set_checkpoint_pool_threads(DB_ENV * env, uint32_t threads) {
1340     HANDLE_PANICKED_ENV(env);
1341     env->i->checkpoint_pool_threads = threads;
1342     return 0;
1343 }
1344 
1345 static void
env_set_check_thp(DB_ENV * env,bool new_val)1346 env_set_check_thp(DB_ENV * env, bool new_val) {
1347     assert(env);
1348     env->i->check_thp = new_val;
1349 }
1350 
1351 static bool
env_get_check_thp(DB_ENV * env)1352 env_get_check_thp(DB_ENV * env) {
1353     assert(env);
1354     return env->i->check_thp;
1355 }
1356 
env_set_dir_per_db(DB_ENV * env,bool new_val)1357 static bool env_set_dir_per_db(DB_ENV *env, bool new_val) {
1358     HANDLE_PANICKED_ENV(env);
1359     bool r = env->i->dir_per_db;
1360     env->i->dir_per_db = new_val;
1361     return r;
1362 }
1363 
env_get_dir_per_db(DB_ENV * env)1364 static bool env_get_dir_per_db(DB_ENV *env) {
1365     HANDLE_PANICKED_ENV(env);
1366     return env->i->dir_per_db;
1367 }
1368 
env_get_data_dir(DB_ENV * env)1369 static const char *env_get_data_dir(DB_ENV *env) {
1370     return env->i->real_data_dir;
1371 }
1372 
env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1373 static int env_dirtool_attach(DB_ENV *env,
1374                               DB_TXN *txn,
1375                               const char *dname,
1376                               const char *iname) {
1377     int r;
1378     DBT dname_dbt;
1379     DBT iname_dbt;
1380 
1381     HANDLE_PANICKED_ENV(env);
1382     if (!env_opened(env)) {
1383         return EINVAL;
1384     }
1385     HANDLE_READ_ONLY_TXN(txn);
1386     toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1387     toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
1388 
1389     r = toku_db_put(env->i->directory,
1390                     txn,
1391                     &dname_dbt,
1392                     &iname_dbt,
1393                     0,
1394                     true);
1395         return r;
1396 }
1397 
env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1398 static int env_dirtool_detach(DB_ENV *env,
1399                               DB_TXN *txn,
1400                               const char *dname) {
1401     int r;
1402     DBT dname_dbt;
1403     DBT old_iname_dbt;
1404 
1405     HANDLE_PANICKED_ENV(env);
1406     if (!env_opened(env)) {
1407         return EINVAL;
1408     }
1409     HANDLE_READ_ONLY_TXN(txn);
1410 
1411     toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1412     toku_init_dbt_flags(&old_iname_dbt, DB_DBT_REALLOC);
1413 
1414     r = toku_db_get(env->i->directory,
1415                     txn,
1416                     &dname_dbt,
1417                     &old_iname_dbt,
1418                     DB_SERIALIZABLE);  // allocates memory for iname
1419     if (r == DB_NOTFOUND)
1420         return EEXIST;
1421     toku_free(old_iname_dbt.data);
1422 
1423     r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
1424 
1425     return r;
1426 }
1427 
env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1428 static int env_dirtool_move(DB_ENV *env,
1429                             DB_TXN *txn,
1430                             const char *old_dname,
1431                             const char *new_dname) {
1432     int r;
1433     DBT old_dname_dbt;
1434     DBT new_dname_dbt;
1435     DBT iname_dbt;
1436 
1437     HANDLE_PANICKED_ENV(env);
1438     if (!env_opened(env)) {
1439         return EINVAL;
1440     }
1441     HANDLE_READ_ONLY_TXN(txn);
1442 
1443     toku_fill_dbt(&old_dname_dbt, old_dname, strlen(old_dname) + 1);
1444     toku_fill_dbt(&new_dname_dbt, new_dname, strlen(new_dname) + 1);
1445     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
1446 
1447     r = toku_db_get(env->i->directory,
1448                     txn,
1449                     &old_dname_dbt,
1450                     &iname_dbt,
1451                     DB_SERIALIZABLE);  // allocates memory for iname
1452     if (r == DB_NOTFOUND)
1453         return EEXIST;
1454 
1455     r = toku_db_del(
1456         env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
1457     if (r != 0)
1458         goto exit;
1459 
1460     r = toku_db_put(
1461         env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, true);
1462 
1463 exit:
1464     toku_free(iname_dbt.data);
1465     return r;
1466 }
1467 
locked_env_op(DB_ENV * env,DB_TXN * txn,std::function<int (DB_TXN *)> f)1468 static int locked_env_op(DB_ENV *env,
1469                          DB_TXN *txn,
1470                          std::function<int(DB_TXN *)> f) {
1471     int ret, r;
1472     HANDLE_READ_ONLY_TXN(txn);
1473     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1474 
1475     DB_TXN *child_txn = NULL;
1476     int using_txns = env->i->open_flags & DB_INIT_TXN;
1477     if (using_txns) {
1478         ret = toku_txn_begin(env, txn, &child_txn, 0);
1479         lazy_assert_zero(ret);
1480     }
1481 
1482     // cannot begin a checkpoint
1483     toku_multi_operation_client_lock();
1484     r = f(child_txn);
1485     toku_multi_operation_client_unlock();
1486 
1487     if (using_txns) {
1488         if (r == 0) {
1489             ret = locked_txn_commit(child_txn, 0);
1490             lazy_assert_zero(ret);
1491         } else {
1492             ret = locked_txn_abort(child_txn);
1493             lazy_assert_zero(ret);
1494         }
1495     }
1496     return r;
1497 
1498 }
1499 
locked_env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1500 static int locked_env_dirtool_attach(DB_ENV *env,
1501                                      DB_TXN *txn,
1502                                      const char *dname,
1503                                      const char *iname) {
1504     auto f = std::bind(
1505         env_dirtool_attach, env, std::placeholders::_1, dname, iname);
1506     return locked_env_op(env, txn, f);
1507 }
1508 
locked_env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1509 static int locked_env_dirtool_detach(DB_ENV *env,
1510                                      DB_TXN *txn,
1511                                      const char *dname) {
1512     auto f = std::bind(
1513         env_dirtool_detach, env, std::placeholders::_1, dname);
1514     return locked_env_op(env, txn, f);
1515 }
1516 
locked_env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1517 static int locked_env_dirtool_move(DB_ENV *env,
1518                                    DB_TXN *txn,
1519                                    const char *old_dname,
1520                                    const char *new_dname) {
1521     auto f = std::bind(
1522         env_dirtool_move, env, std::placeholders::_1, old_dname, new_dname);
1523     return locked_env_op(env, txn, f);
1524 }
1525 
1526 static int env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags);
1527 
1528 static int
locked_env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)1529 locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
1530     int ret, r;
1531     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1532     HANDLE_READ_ONLY_TXN(txn);
1533 
1534     DB_TXN *child_txn = NULL;
1535     int using_txns = env->i->open_flags & DB_INIT_TXN;
1536     if (using_txns) {
1537         ret = toku_txn_begin(env, txn, &child_txn, 0);
1538         lazy_assert_zero(ret);
1539     }
1540 
1541     // cannot begin a checkpoint
1542     toku_multi_operation_client_lock();
1543     r = env_dbremove(env, child_txn, fname, dbname, flags);
1544     toku_multi_operation_client_unlock();
1545 
1546     if (using_txns) {
1547         if (r == 0) {
1548             ret = locked_txn_commit(child_txn, 0);
1549             lazy_assert_zero(ret);
1550         } else {
1551             ret = locked_txn_abort(child_txn);
1552             lazy_assert_zero(ret);
1553         }
1554     }
1555     return r;
1556 }
1557 
1558 static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags);
1559 
1560 static int
locked_env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)1561 locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
1562     int ret, r;
1563     HANDLE_READ_ONLY_TXN(txn);
1564     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1565 
1566     DB_TXN *child_txn = NULL;
1567     int using_txns = env->i->open_flags & DB_INIT_TXN;
1568     if (using_txns) {
1569         ret = toku_txn_begin(env, txn, &child_txn, 0);
1570         lazy_assert_zero(ret);
1571     }
1572 
1573     // cannot begin a checkpoint
1574     toku_multi_operation_client_lock();
1575     r = env_dbrename(env, child_txn, fname, dbname, newname, flags);
1576     toku_multi_operation_client_unlock();
1577 
1578     if (using_txns) {
1579         if (r == 0) {
1580             ret = locked_txn_commit(child_txn, 0);
1581             lazy_assert_zero(ret);
1582         } else {
1583             ret = locked_txn_abort(child_txn);
1584             lazy_assert_zero(ret);
1585         }
1586     }
1587     return r;
1588 }
1589 
1590 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
1591 
1592 static int
env_get_cachesize(DB_ENV * env,uint32_t * gbytes,uint32_t * bytes,int * ncache)1593 env_get_cachesize(DB_ENV * env, uint32_t *gbytes, uint32_t *bytes, int *ncache) {
1594     HANDLE_PANICKED_ENV(env);
1595     *gbytes = env->i->cachetable_size >> 30;
1596     *bytes = env->i->cachetable_size & ((1<<30)-1);
1597     *ncache = 1;
1598     return 0;
1599 }
1600 
1601 #endif
1602 
1603 static int
env_set_data_dir(DB_ENV * env,const char * dir)1604 env_set_data_dir(DB_ENV * env, const char *dir) {
1605     HANDLE_PANICKED_ENV(env);
1606     int r;
1607 
1608     if (env_opened(env) || !dir) {
1609         r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
1610     }
1611     else if (env->i->data_dir)
1612         r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir more than once.\n");
1613     else {
1614         env->i->data_dir = toku_strdup(dir);
1615         if (env->i->data_dir==NULL) {
1616             assert(get_error_errno() == ENOMEM);
1617             r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1618         }
1619         else r = 0;
1620     }
1621     return r;
1622 }
1623 
1624 static void
env_set_errcall(DB_ENV * env,toku_env_errcall_t errcall)1625 env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
1626     env->i->errcall = errcall;
1627 }
1628 
1629 static void
env_set_errfile(DB_ENV * env,FILE * errfile)1630 env_set_errfile(DB_ENV*env, FILE*errfile) {
1631     env->i->errfile = errfile;
1632 }
1633 
1634 static void
env_set_errpfx(DB_ENV * env,const char * errpfx)1635 env_set_errpfx(DB_ENV * env, const char *errpfx) {
1636     env->i->errpfx = errpfx;
1637 }
1638 
1639 static int
env_set_flags(DB_ENV * env,uint32_t flags,int onoff)1640 env_set_flags(DB_ENV * env, uint32_t flags, int onoff) {
1641     HANDLE_PANICKED_ENV(env);
1642 
1643     uint32_t change = 0;
1644     if (flags & DB_AUTO_COMMIT) {
1645         change |=  DB_AUTO_COMMIT;
1646         flags  &= ~DB_AUTO_COMMIT;
1647     }
1648     if (flags != 0 && onoff) {
1649         return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
1650     }
1651     if   (onoff) env->i->open_flags |=  change;
1652     else         env->i->open_flags &= ~change;
1653     return 0;
1654 }
1655 
1656 static int
env_set_lg_bsize(DB_ENV * env,uint32_t bsize)1657 env_set_lg_bsize(DB_ENV * env, uint32_t bsize) {
1658     HANDLE_PANICKED_ENV(env);
1659     return toku_logger_set_lg_bsize(env->i->logger, bsize);
1660 }
1661 
1662 static int
env_set_lg_dir(DB_ENV * env,const char * dir)1663 env_set_lg_dir(DB_ENV * env, const char *dir) {
1664     HANDLE_PANICKED_ENV(env);
1665     if (env_opened(env)) {
1666         return toku_ydb_do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
1667     }
1668 
1669     if (env->i->lg_dir) toku_free(env->i->lg_dir);
1670     if (dir) {
1671         env->i->lg_dir = toku_strdup(dir);
1672         if (!env->i->lg_dir) {
1673             return toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1674         }
1675     }
1676     else env->i->lg_dir = NULL;
1677     return 0;
1678 }
1679 
1680 static int
env_set_lg_max(DB_ENV * env,uint32_t lg_max)1681 env_set_lg_max(DB_ENV * env, uint32_t lg_max) {
1682     HANDLE_PANICKED_ENV(env);
1683     return toku_logger_set_lg_max(env->i->logger, lg_max);
1684 }
1685 
1686 static int
env_get_lg_max(DB_ENV * env,uint32_t * lg_maxp)1687 env_get_lg_max(DB_ENV * env, uint32_t *lg_maxp) {
1688     HANDLE_PANICKED_ENV(env);
1689     return toku_logger_get_lg_max(env->i->logger, lg_maxp);
1690 }
1691 
1692 static int
env_set_lk_detect(DB_ENV * env,uint32_t UU (detect))1693 env_set_lk_detect(DB_ENV * env, uint32_t UU(detect)) {
1694     HANDLE_PANICKED_ENV(env);
1695     return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support set_lk_detect\n");
1696 }
1697 
1698 static int
env_set_lk_max_memory(DB_ENV * env,uint64_t lock_memory_limit)1699 env_set_lk_max_memory(DB_ENV *env, uint64_t lock_memory_limit) {
1700     HANDLE_PANICKED_ENV(env);
1701     int r = 0;
1702     if (env_opened(env)) {
1703         r = EINVAL;
1704     } else {
1705         r = env->i->ltm.set_max_lock_memory(lock_memory_limit);
1706     }
1707     return r;
1708 }
1709 
1710 static int
env_get_lk_max_memory(DB_ENV * env,uint64_t * lk_maxp)1711 env_get_lk_max_memory(DB_ENV *env, uint64_t *lk_maxp) {
1712     HANDLE_PANICKED_ENV(env);
1713     uint32_t max_lock_memory = env->i->ltm.get_max_lock_memory();
1714     *lk_maxp = max_lock_memory;
1715     return 0;
1716 }
1717 
1718 //void toku__env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
1719 //    env->i->noticecall = noticecall;
1720 //}
1721 
1722 static int
env_set_tmp_dir(DB_ENV * env,const char * tmp_dir)1723 env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
1724     HANDLE_PANICKED_ENV(env);
1725     if (env_opened(env)) {
1726         return toku_ydb_do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
1727     }
1728     if (!tmp_dir) {
1729         return toku_ydb_do_error(env, EINVAL, "Tmp dir bust be non-null\n");
1730     }
1731     if (env->i->tmp_dir)
1732         toku_free(env->i->tmp_dir);
1733     env->i->tmp_dir = toku_strdup(tmp_dir);
1734     return env->i->tmp_dir ? 0 : ENOMEM;
1735 }
1736 
1737 static int
env_set_verbose(DB_ENV * env,uint32_t UU (which),int UU (onoff))1738 env_set_verbose(DB_ENV * env, uint32_t UU(which), int UU(onoff)) {
1739     HANDLE_PANICKED_ENV(env);
1740     return 1;
1741 }
1742 
1743 static int
toku_env_txn_checkpoint(DB_ENV * env,uint32_t kbyte,uint32_t min,uint32_t flags)1744 toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte __attribute__((__unused__)), uint32_t min __attribute__((__unused__)), uint32_t flags __attribute__((__unused__))) {
1745     CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1746     int r = toku_checkpoint(cp, env->i->logger,
1747                             checkpoint_callback_f,  checkpoint_callback_extra,
1748                             checkpoint_callback2_f, checkpoint_callback2_extra,
1749                             CLIENT_CHECKPOINT);
1750     if (r) {
1751         // Panicking the whole environment may be overkill, but I'm not sure what else to do.
1752         env_panic(env, r, "checkpoint error\n");
1753         toku_ydb_do_error(env, r, "Checkpoint\n");
1754     }
1755     return r;
1756 }
1757 
1758 static int
env_txn_stat(DB_ENV * env,DB_TXN_STAT ** UU (statp),uint32_t UU (flags))1759 env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), uint32_t UU(flags)) {
1760     HANDLE_PANICKED_ENV(env);
1761     return 1;
1762 }
1763 
1764 //
1765 // We can assume the client calls this function right after recovery
1766 // to return a list of prepared transactions to the user. When called,
1767 // we can assume that no other work is being done in the system,
1768 // as we are in the state of being after recovery,
1769 // but before client operations should commence
1770 //
1771 static int
env_txn_xa_recover(DB_ENV * env,TOKU_XA_XID xids[],long count,long * retp,uint32_t flags)1772 env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1773     struct tokulogger_preplist *MALLOC_N(count,preps);
1774     int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1775     if (r==0) {
1776         assert(*retp<=count);
1777         for (int i=0; i<*retp; i++) {
1778             xids[i] = preps[i].xid;
1779         }
1780     }
1781     toku_free(preps);
1782     return r;
1783 }
1784 
1785 //
1786 // We can assume the client calls this function right after recovery
1787 // to return a list of prepared transactions to the user. When called,
1788 // we can assume that no other work is being done in the system,
1789 // as we are in the state of being after recovery,
1790 // but before client operations should commence
1791 //
1792 static int
env_txn_recover(DB_ENV * env,DB_PREPLIST preplist[],long count,long * retp,uint32_t flags)1793 env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1794     struct tokulogger_preplist *MALLOC_N(count,preps);
1795     int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1796     if (r==0) {
1797         assert(*retp<=count);
1798         for (int i=0; i<*retp; i++) {
1799             preplist[i].txn = preps[i].txn;
1800             memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
1801         }
1802     }
1803     toku_free(preps);
1804     return r;
1805 }
1806 
1807 static int
env_get_txn_from_xid(DB_ENV * env,TOKU_XA_XID * xid,DB_TXN ** txnp)1808 env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
1809     return toku_txn_manager_get_root_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
1810 }
1811 
1812 static int
env_checkpointing_set_period(DB_ENV * env,uint32_t seconds)1813 env_checkpointing_set_period(DB_ENV * env, uint32_t seconds) {
1814     HANDLE_PANICKED_ENV(env);
1815     int r = 0;
1816     if (!env_opened(env)) {
1817         r = EINVAL;
1818     } else {
1819         toku_set_checkpoint_period(env->i->cachetable, seconds);
1820     }
1821     return r;
1822 }
1823 
1824 static int
env_cleaner_set_period(DB_ENV * env,uint32_t seconds)1825 env_cleaner_set_period(DB_ENV * env, uint32_t seconds) {
1826     HANDLE_PANICKED_ENV(env);
1827     int r = 0;
1828     if (!env_opened(env)) {
1829         r = EINVAL;
1830     } else {
1831         toku_set_cleaner_period(env->i->cachetable, seconds);
1832     }
1833     return r;
1834 }
1835 
1836 static int
env_cleaner_set_iterations(DB_ENV * env,uint32_t iterations)1837 env_cleaner_set_iterations(DB_ENV * env, uint32_t iterations) {
1838     HANDLE_PANICKED_ENV(env);
1839     int r = 0;
1840     if (!env_opened(env)) {
1841         r = EINVAL;
1842     } else {
1843         toku_set_cleaner_iterations(env->i->cachetable, iterations);
1844     }
1845     return r;
1846 }
1847 
1848 static int
env_create_loader(DB_ENV * env,DB_TXN * txn,DB_LOADER ** blp,DB * src_db,int N,DB * dbs[],uint32_t db_flags[],uint32_t dbt_flags[],uint32_t loader_flags)1849 env_create_loader(DB_ENV *env,
1850                   DB_TXN *txn,
1851                   DB_LOADER **blp,
1852                   DB *src_db,
1853                   int N,
1854                   DB *dbs[],
1855                   uint32_t db_flags[/*N*/],
1856                   uint32_t dbt_flags[/*N*/],
1857                   uint32_t loader_flags) {
1858     int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
1859     return r;
1860 }
1861 
1862 static int
env_checkpointing_get_period(DB_ENV * env,uint32_t * seconds)1863 env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
1864     HANDLE_PANICKED_ENV(env);
1865     int r = 0;
1866     if (!env_opened(env)) r = EINVAL;
1867     else
1868         *seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
1869     return r;
1870 }
1871 
1872 static int
env_cleaner_get_period(DB_ENV * env,uint32_t * seconds)1873 env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
1874     HANDLE_PANICKED_ENV(env);
1875     int r = 0;
1876     if (!env_opened(env)) r = EINVAL;
1877     else
1878         *seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
1879     return r;
1880 }
1881 
1882 static int
env_cleaner_get_iterations(DB_ENV * env,uint32_t * iterations)1883 env_cleaner_get_iterations(DB_ENV * env, uint32_t *iterations) {
1884     HANDLE_PANICKED_ENV(env);
1885     int r = 0;
1886     if (!env_opened(env)) r = EINVAL;
1887     else
1888         *iterations = toku_get_cleaner_iterations(env->i->cachetable);
1889     return r;
1890 }
1891 
1892 static int
env_evictor_set_enable_partial_eviction(DB_ENV * env,bool enabled)1893 env_evictor_set_enable_partial_eviction(DB_ENV* env, bool enabled) {
1894     HANDLE_PANICKED_ENV(env);
1895     int r = 0;
1896     if (!env_opened(env)) r = EINVAL;
1897     else toku_set_enable_partial_eviction(env->i->cachetable, enabled);
1898     return r;
1899 }
1900 
1901 static int
env_evictor_get_enable_partial_eviction(DB_ENV * env,bool * enabled)1902 env_evictor_get_enable_partial_eviction(DB_ENV* env, bool *enabled) {
1903     HANDLE_PANICKED_ENV(env);
1904     int r = 0;
1905     if (!env_opened(env)) r = EINVAL;
1906     else *enabled = toku_get_enable_partial_eviction(env->i->cachetable);
1907     return r;
1908 }
1909 
1910 static int
env_checkpointing_postpone(DB_ENV * env)1911 env_checkpointing_postpone(DB_ENV * env) {
1912     HANDLE_PANICKED_ENV(env);
1913     int r = 0;
1914     if (!env_opened(env)) r = EINVAL;
1915     else toku_checkpoint_safe_client_lock();
1916     return r;
1917 }
1918 
1919 static int
env_checkpointing_resume(DB_ENV * env)1920 env_checkpointing_resume(DB_ENV * env) {
1921     HANDLE_PANICKED_ENV(env);
1922     int r = 0;
1923     if (!env_opened(env)) r = EINVAL;
1924     else toku_checkpoint_safe_client_unlock();
1925     return r;
1926 }
1927 
1928 static int
env_checkpointing_begin_atomic_operation(DB_ENV * env)1929 env_checkpointing_begin_atomic_operation(DB_ENV * env) {
1930     HANDLE_PANICKED_ENV(env);
1931     int r = 0;
1932     if (!env_opened(env)) r = EINVAL;
1933     else toku_multi_operation_client_lock();
1934     return r;
1935 }
1936 
1937 static int
env_checkpointing_end_atomic_operation(DB_ENV * env)1938 env_checkpointing_end_atomic_operation(DB_ENV * env) {
1939     HANDLE_PANICKED_ENV(env);
1940     int r = 0;
1941     if (!env_opened(env)) r = EINVAL;
1942     else toku_multi_operation_client_unlock();
1943     return r;
1944 }
1945 
1946 static int
env_set_default_bt_compare(DB_ENV * env,int (* bt_compare)(DB *,const DBT *,const DBT *))1947 env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1948     HANDLE_PANICKED_ENV(env);
1949     int r = 0;
1950     if (env_opened(env)) r = EINVAL;
1951     else {
1952         env->i->bt_compare = bt_compare;
1953     }
1954     return r;
1955 }
1956 
1957 static void
env_set_update(DB_ENV * env,int (* update_function)(DB *,const DBT * key,const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra))1958 env_set_update (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
1959     env->i->update_function = update_function;
1960 }
1961 
1962 static int
env_set_generate_row_callback_for_put(DB_ENV * env,generate_row_for_put_func generate_row_for_put)1963 env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
1964     HANDLE_PANICKED_ENV(env);
1965     int r = 0;
1966     if (env_opened(env)) r = EINVAL;
1967     else {
1968         env->i->generate_row_for_put = generate_row_for_put;
1969     }
1970     return r;
1971 }
1972 
1973 static int
env_set_generate_row_callback_for_del(DB_ENV * env,generate_row_for_del_func generate_row_for_del)1974 env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
1975     HANDLE_PANICKED_ENV(env);
1976     int r = 0;
1977     if (env_opened(env)) r = EINVAL;
1978     else {
1979         env->i->generate_row_for_del = generate_row_for_del;
1980     }
1981     return r;
1982 }
1983 static int
env_set_redzone(DB_ENV * env,int redzone)1984 env_set_redzone(DB_ENV *env, int redzone) {
1985     HANDLE_PANICKED_ENV(env);
1986     int r;
1987     if (env_opened(env))
1988         r = EINVAL;
1989     else {
1990         env->i->redzone = redzone;
1991         r = 0;
1992     }
1993     return r;
1994 }
1995 
env_get_lock_timeout(DB_ENV * env,uint64_t * lock_timeout_msec)1996 static int env_get_lock_timeout(DB_ENV *env, uint64_t *lock_timeout_msec) {
1997     uint64_t t = env->i->default_lock_timeout_msec;
1998     if (env->i->get_lock_timeout_callback)
1999         t = env->i->get_lock_timeout_callback(t);
2000     *lock_timeout_msec = t;
2001     return 0;
2002 }
2003 
env_set_lock_timeout(DB_ENV * env,uint64_t default_lock_timeout_msec,uint64_t (* get_lock_timeout_callback)(uint64_t default_lock_timeout_msec))2004 static int env_set_lock_timeout(DB_ENV *env, uint64_t default_lock_timeout_msec, uint64_t (*get_lock_timeout_callback)(uint64_t default_lock_timeout_msec)) {
2005     env->i->default_lock_timeout_msec = default_lock_timeout_msec;
2006     env->i->get_lock_timeout_callback = get_lock_timeout_callback;
2007     return 0;
2008 }
2009 
2010 static int
env_set_lock_timeout_callback(DB_ENV * env,lock_timeout_callback callback)2011 env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
2012     env->i->lock_wait_timeout_callback = callback;
2013     return 0;
2014 }
2015 
2016 static int
env_set_lock_wait_callback(DB_ENV * env,lock_wait_callback callback)2017 env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
2018     env->i->lock_wait_needed_callback = callback;
2019     return 0;
2020 }
2021 
2022 static void
format_time(const time_t * timer,char * buf)2023 format_time(const time_t *timer, char *buf) {
2024     ctime_r(timer, buf);
2025     size_t len = strlen(buf);
2026     assert(len < 26);
2027     char end;
2028 
2029     assert(len>=1);
2030     end = buf[len-1];
2031     while (end == '\n' || end == '\r') {
2032         buf[len-1] = '\0';
2033         len--;
2034         assert(len>=1);
2035         end = buf[len-1];
2036     }
2037 }
2038 
2039 ////////////////////////////////////////////////////////////////////////////////////////////////
2040 // Local definition of status information from portability layer, which should not include db.h.
2041 // Local status structs are used to concentrate file system information collected from various places
2042 // and memory information collected from memory.c.
2043 //
2044 typedef enum {
2045     FS_ENOSPC_REDZONE_STATE = 0,  // possible values are enumerated by fs_redzone_state
2046     FS_ENOSPC_THREADS_BLOCKED,    // how many threads currently blocked on ENOSPC
2047     FS_ENOSPC_REDZONE_CTR,        // number of operations rejected by enospc prevention (red zone)
2048     FS_ENOSPC_MOST_RECENT,        // most recent time that file system was completely full
2049     FS_ENOSPC_COUNT,              // total number of times ENOSPC was returned from an attempt to write
2050     FS_FSYNC_TIME,
2051     FS_FSYNC_COUNT,
2052     FS_LONG_FSYNC_TIME,
2053     FS_LONG_FSYNC_COUNT,
2054     FS_STATUS_NUM_ROWS,           // must be last
2055 } fs_status_entry;
2056 
2057 typedef struct {
2058     bool initialized;
2059     TOKU_ENGINE_STATUS_ROW_S status[FS_STATUS_NUM_ROWS];
2060 } FS_STATUS_S, *FS_STATUS;
2061 
2062 static FS_STATUS_S fsstat;
2063 
2064 #define FS_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(fsstat, k, c, t, "filesystem: " l, inc)
2065 
2066 static void
fs_status_init(void)2067 fs_status_init(void) {
2068     FS_STATUS_INIT(FS_ENOSPC_REDZONE_STATE,   nullptr, FS_STATE, "ENOSPC redzone state", TOKU_ENGINE_STATUS);
2069     FS_STATUS_INIT(FS_ENOSPC_THREADS_BLOCKED, FILESYSTEM_THREADS_BLOCKED_BY_FULL_DISK, UINT64,   "threads currently blocked by full disk", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2070     FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR,     nullptr, UINT64,   "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
2071     FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT,     nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
2072     FS_STATUS_INIT(FS_ENOSPC_COUNT,           nullptr, UINT64,   "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
2073     FS_STATUS_INIT(FS_FSYNC_TIME,             FILESYSTEM_FSYNC_TIME, UINT64,   "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2074     FS_STATUS_INIT(FS_FSYNC_COUNT,            FILESYSTEM_FSYNC_NUM, UINT64,   "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2075     FS_STATUS_INIT(FS_LONG_FSYNC_TIME,        FILESYSTEM_LONG_FSYNC_TIME, UINT64,   "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2076     FS_STATUS_INIT(FS_LONG_FSYNC_COUNT,       FILESYSTEM_LONG_FSYNC_NUM, UINT64,   "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2077     fsstat.initialized = true;
2078 }
2079 #undef FS_STATUS_INIT
2080 
2081 #define FS_STATUS_VALUE(x) fsstat.status[x].value.num
2082 
2083 static void
fs_get_status(DB_ENV * env,fs_redzone_state * redzone_state)2084 fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
2085     if (!fsstat.initialized)
2086         fs_status_init();
2087 
2088     time_t   enospc_most_recent_timestamp;
2089     uint64_t enospc_threads_blocked, enospc_total;
2090     toku_fs_get_write_info(&enospc_most_recent_timestamp, &enospc_threads_blocked, &enospc_total);
2091     if (enospc_threads_blocked)
2092         FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = FS_BLOCKED;
2093     else
2094         FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = env->i->fs_state;
2095     *redzone_state = (fs_redzone_state) FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE);
2096     FS_STATUS_VALUE(FS_ENOSPC_THREADS_BLOCKED) = enospc_threads_blocked;
2097     FS_STATUS_VALUE(FS_ENOSPC_REDZONE_CTR) = env->i->enospc_redzone_ctr;
2098     FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
2099     FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
2100 
2101     uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
2102     toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
2103     FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
2104     FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
2105     FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
2106     FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
2107 }
2108 #undef FS_STATUS_VALUE
2109 
2110 // Local status struct used to get information from memory.c
2111 typedef enum {
2112     MEMORY_MALLOC_COUNT = 0,
2113     MEMORY_FREE_COUNT,
2114     MEMORY_REALLOC_COUNT,
2115     MEMORY_MALLOC_FAIL,
2116     MEMORY_REALLOC_FAIL,
2117     MEMORY_REQUESTED,
2118     MEMORY_USED,
2119     MEMORY_FREED,
2120     MEMORY_MAX_REQUESTED_SIZE,
2121     MEMORY_LAST_FAILED_SIZE,
2122     MEMORY_MAX_IN_USE,
2123     MEMORY_MALLOCATOR_VERSION,
2124     MEMORY_MMAP_THRESHOLD,
2125     MEMORY_STATUS_NUM_ROWS
2126 } memory_status_entry;
2127 
2128 typedef struct {
2129     bool initialized;
2130     TOKU_ENGINE_STATUS_ROW_S status[MEMORY_STATUS_NUM_ROWS];
2131 } MEMORY_STATUS_S, *MEMORY_STATUS;
2132 
2133 static MEMORY_STATUS_S memory_status;
2134 
2135 #define STATUS_INIT(k,c,t,l) TOKUFT_STATUS_INIT(memory_status, k, c, t, "memory: " l, TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS)
2136 
2137 static void
memory_status_init(void)2138 memory_status_init(void) {
2139     // Note, this function initializes the keyname, type, and legend fields.
2140     // Value fields are initialized to zero by compiler.
2141     STATUS_INIT(MEMORY_MALLOC_COUNT,        MEMORY_MALLOC_COUNT,        UINT64, "number of malloc operations");
2142     STATUS_INIT(MEMORY_FREE_COUNT,          MEMORY_FREE_COUNT,          UINT64, "number of free operations");
2143     STATUS_INIT(MEMORY_REALLOC_COUNT,       MEMORY_REALLOC_COUNT,       UINT64, "number of realloc operations");
2144     STATUS_INIT(MEMORY_MALLOC_FAIL,         MEMORY_MALLOC_FAIL,         UINT64, "number of malloc operations that failed");
2145     STATUS_INIT(MEMORY_REALLOC_FAIL,        MEMORY_REALLOC_FAIL,        UINT64, "number of realloc operations that failed" );
2146     STATUS_INIT(MEMORY_REQUESTED,           MEMORY_REQUESTED,           UINT64, "number of bytes requested");
2147     STATUS_INIT(MEMORY_USED,                MEMORY_USED,                UINT64, "number of bytes used (requested + overhead)");
2148     STATUS_INIT(MEMORY_FREED,               MEMORY_FREED,               UINT64, "number of bytes freed");
2149     STATUS_INIT(MEMORY_MAX_REQUESTED_SIZE,  MEMORY_MAX_REQUESTED_SIZE,  UINT64, "largest attempted allocation size");
2150     STATUS_INIT(MEMORY_LAST_FAILED_SIZE,    MEMORY_LAST_FAILED_SIZE,    UINT64, "size of the last failed allocation attempt");
2151     STATUS_INIT(MEMORY_MAX_IN_USE,          MEM_ESTIMATED_MAXIMUM_MEMORY_FOOTPRINT, UINT64, "estimated maximum memory footprint");
2152     STATUS_INIT(MEMORY_MALLOCATOR_VERSION,  MEMORY_MALLOCATOR_VERSION,  CHARSTR, "mallocator version");
2153     STATUS_INIT(MEMORY_MMAP_THRESHOLD,      MEMORY_MMAP_THRESHOLD,      UINT64, "mmap threshold");
2154     memory_status.initialized = true;
2155 }
2156 #undef STATUS_INIT
2157 
2158 #define MEMORY_STATUS_VALUE(x) memory_status.status[x].value.num
2159 
2160 static void
memory_get_status(void)2161 memory_get_status(void) {
2162     if (!memory_status.initialized)
2163         memory_status_init();
2164     LOCAL_MEMORY_STATUS_S local_memstat;
2165     toku_memory_get_status(&local_memstat);
2166     MEMORY_STATUS_VALUE(MEMORY_MALLOC_COUNT) = local_memstat.malloc_count;
2167     MEMORY_STATUS_VALUE(MEMORY_FREE_COUNT) = local_memstat.free_count;
2168     MEMORY_STATUS_VALUE(MEMORY_REALLOC_COUNT) = local_memstat.realloc_count;
2169     MEMORY_STATUS_VALUE(MEMORY_MALLOC_FAIL) = local_memstat.malloc_fail;
2170     MEMORY_STATUS_VALUE(MEMORY_REALLOC_FAIL) = local_memstat.realloc_fail;
2171     MEMORY_STATUS_VALUE(MEMORY_REQUESTED) = local_memstat.requested;
2172     MEMORY_STATUS_VALUE(MEMORY_USED) = local_memstat.used;
2173     MEMORY_STATUS_VALUE(MEMORY_FREED) = local_memstat.freed;
2174     MEMORY_STATUS_VALUE(MEMORY_MAX_IN_USE) = local_memstat.max_in_use;
2175     MEMORY_STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = local_memstat.mmap_threshold;
2176     memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str = local_memstat.mallocator_version;
2177 }
2178 #undef MEMORY_STATUS_VALUE
2179 
2180 // how many rows are in engine status?
2181 static int
env_get_engine_status_num_rows(DB_ENV * UU (env),uint64_t * num_rowsp)2182 env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp) {
2183     uint64_t num_rows = 0;
2184     num_rows += YDB_LAYER_STATUS_NUM_ROWS;
2185     num_rows += YDB_C_LAYER_STATUS_NUM_ROWS;
2186     num_rows += YDB_WRITE_LAYER_STATUS_NUM_ROWS;
2187     num_rows += LE_STATUS_S::LE_STATUS_NUM_ROWS;
2188     num_rows += CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS;
2189     num_rows += CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS;
2190     num_rows += LTM_STATUS_S::LTM_STATUS_NUM_ROWS;
2191     num_rows += FT_STATUS_S::FT_STATUS_NUM_ROWS;
2192     num_rows += FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS;
2193     num_rows += FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS;
2194     num_rows += TXN_STATUS_S::TXN_STATUS_NUM_ROWS;
2195     num_rows += LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS;
2196     num_rows += MEMORY_STATUS_NUM_ROWS;
2197     num_rows += FS_STATUS_NUM_ROWS;
2198     num_rows += INDEXER_STATUS_NUM_ROWS;
2199     num_rows += LOADER_STATUS_NUM_ROWS;
2200     num_rows += CTX_STATUS_NUM_ROWS;
2201 #if 0
2202     // enable when upgrade is supported
2203     num_rows += FT_UPGRADE_STATUS_NUM_ROWS;
2204     num_rows += PERSISTENT_UPGRADE_STATUS_NUM_ROWS;
2205 #endif
2206     *num_rowsp = num_rows;
2207     return 0;
2208 }
2209 
2210 // Do not take ydb lock or any other lock around or in this function.
2211 // If the engine is blocked because some thread is holding a lock, this function
2212 // can help diagnose the problem.
2213 // This function only collects information, and it does not matter if something gets garbled
2214 // because of a race condition.
2215 // Note, engine status is still collected even if the environment or logger is panicked
2216 static int
env_get_engine_status(DB_ENV * env,TOKU_ENGINE_STATUS_ROW engstat,uint64_t maxrows,uint64_t * num_rows,fs_redzone_state * redzone_state,uint64_t * env_panicp,char * env_panic_string_buf,int env_panic_string_length,toku_engine_status_include_type include_flags)2217 env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t maxrows,  uint64_t *num_rows, fs_redzone_state* redzone_state, uint64_t * env_panicp, char * env_panic_string_buf, int env_panic_string_length, toku_engine_status_include_type include_flags) {
2218     int r;
2219 
2220     if (env_panic_string_buf) {
2221         if (env && env->i && env->i->is_panicked && env->i->panic_string) {
2222             strncpy(env_panic_string_buf, env->i->panic_string, env_panic_string_length);
2223             env_panic_string_buf[env_panic_string_length - 1] = '\0';  // just in case
2224         }
2225         else
2226             *env_panic_string_buf = '\0';
2227     }
2228 
2229     if ( !(env)     ||
2230          !(env->i)  ||
2231          !(env_opened(env)) ||
2232          !num_rows ||
2233          !include_flags)
2234         r = EINVAL;
2235     else {
2236         r = 0;
2237         uint64_t row = 0;  // which row to fill next
2238         *env_panicp = env->i->is_panicked;
2239 
2240         {
2241             YDB_LAYER_STATUS_S ydb_stat;
2242             ydb_layer_get_status(env, &ydb_stat);
2243             for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2244                 if (ydb_stat.status[i].include & include_flags) {
2245                     engstat[row++] = ydb_stat.status[i];
2246                 }
2247             }
2248         }
2249         {
2250             YDB_C_LAYER_STATUS_S ydb_c_stat;
2251             ydb_c_layer_get_status(&ydb_c_stat);
2252             for (int i = 0; i < YDB_C_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2253                 if (ydb_c_stat.status[i].include & include_flags) {
2254                     engstat[row++] = ydb_c_stat.status[i];
2255                 }
2256             }
2257         }
2258         {
2259             YDB_WRITE_LAYER_STATUS_S ydb_write_stat;
2260             ydb_write_layer_get_status(&ydb_write_stat);
2261             for (int i = 0; i < YDB_WRITE_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2262                 if (ydb_write_stat.status[i].include & include_flags) {
2263                     engstat[row++] = ydb_write_stat.status[i];
2264                 }
2265             }
2266         }
2267         {
2268             LE_STATUS_S lestat;                    // Rice's vampire
2269             toku_le_get_status(&lestat);
2270             for (int i = 0; i < LE_STATUS_S::LE_STATUS_NUM_ROWS && row < maxrows; i++) {
2271                 if (lestat.status[i].include & include_flags) {
2272                     engstat[row++] = lestat.status[i];
2273                 }
2274             }
2275         }
2276         {
2277             CHECKPOINT_STATUS_S cpstat;
2278             toku_checkpoint_get_status(env->i->cachetable, &cpstat);
2279             for (int i = 0; i < CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS && row < maxrows; i++) {
2280                 if (cpstat.status[i].include & include_flags) {
2281                     engstat[row++] = cpstat.status[i];
2282                 }
2283             }
2284         }
2285         {
2286             CACHETABLE_STATUS_S ctstat;
2287             toku_cachetable_get_status(env->i->cachetable, &ctstat);
2288             for (int i = 0; i < CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS && row < maxrows; i++) {
2289                 if (ctstat.status[i].include & include_flags) {
2290                     engstat[row++] = ctstat.status[i];
2291                 }
2292             }
2293         }
2294         {
2295             LTM_STATUS_S ltmstat;
2296             env->i->ltm.get_status(&ltmstat);
2297             for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS && row < maxrows; i++) {
2298                 if (ltmstat.status[i].include & include_flags) {
2299                     engstat[row++] = ltmstat.status[i];
2300                 }
2301             }
2302         }
2303         {
2304             FT_STATUS_S ftstat;
2305             toku_ft_get_status(&ftstat);
2306             for (int i = 0; i < FT_STATUS_S::FT_STATUS_NUM_ROWS && row < maxrows; i++) {
2307                 if (ftstat.status[i].include & include_flags) {
2308                     engstat[row++] = ftstat.status[i];
2309                 }
2310             }
2311         }
2312         {
2313             FT_FLUSHER_STATUS_S flusherstat;
2314             toku_ft_flusher_get_status(&flusherstat);
2315             for (int i = 0; i < FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS && row < maxrows; i++) {
2316                 if (flusherstat.status[i].include & include_flags) {
2317                     engstat[row++] = flusherstat.status[i];
2318                 }
2319             }
2320         }
2321         {
2322             FT_HOT_STATUS_S hotstat;
2323             toku_ft_hot_get_status(&hotstat);
2324             for (int i = 0; i < FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS && row < maxrows; i++) {
2325                 if (hotstat.status[i].include & include_flags) {
2326                     engstat[row++] = hotstat.status[i];
2327                 }
2328             }
2329         }
2330         {
2331             TXN_STATUS_S txnstat;
2332             toku_txn_get_status(&txnstat);
2333             for (int i = 0; i < TXN_STATUS_S::TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
2334                 if (txnstat.status[i].include & include_flags) {
2335                     engstat[row++] = txnstat.status[i];
2336                 }
2337             }
2338         }
2339         {
2340             LOGGER_STATUS_S loggerstat;
2341             toku_logger_get_status(env->i->logger, &loggerstat);
2342             for (int i = 0; i < LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS && row < maxrows; i++) {
2343                 if (loggerstat.status[i].include & include_flags) {
2344                     engstat[row++] = loggerstat.status[i];
2345                 }
2346             }
2347         }
2348 
2349         {
2350             INDEXER_STATUS_S indexerstat;
2351             toku_indexer_get_status(&indexerstat);
2352             for (int i = 0; i < INDEXER_STATUS_NUM_ROWS && row < maxrows; i++) {
2353                 if (indexerstat.status[i].include & include_flags) {
2354                     engstat[row++] = indexerstat.status[i];
2355                 }
2356             }
2357         }
2358         {
2359             LOADER_STATUS_S loaderstat;
2360             toku_loader_get_status(&loaderstat);
2361             for (int i = 0; i < LOADER_STATUS_NUM_ROWS && row < maxrows; i++) {
2362                 if (loaderstat.status[i].include & include_flags) {
2363                     engstat[row++] = loaderstat.status[i];
2364                 }
2365             }
2366         }
2367 
2368         {
2369             // memory_status is local to this file
2370             memory_get_status();
2371             for (int i = 0; i < MEMORY_STATUS_NUM_ROWS && row < maxrows; i++) {
2372                 if (memory_status.status[i].include & include_flags) {
2373                     engstat[row++] = memory_status.status[i];
2374                 }
2375             }
2376         }
2377         {
2378             // Note, fs_get_status() and the fsstat structure are local to this file because they
2379             // are used to concentrate file system information collected from various places.
2380             fs_get_status(env, redzone_state);
2381             for (int i = 0; i < FS_STATUS_NUM_ROWS && row < maxrows; i++) {
2382                 if (fsstat.status[i].include & include_flags) {
2383                     engstat[row++] = fsstat.status[i];
2384                 }
2385             }
2386         }
2387         {
2388             struct context_status ctxstatus;
2389             toku_context_get_status(&ctxstatus);
2390             for (int i = 0; i < CTX_STATUS_NUM_ROWS && row < maxrows; i++) {
2391                 if (ctxstatus.status[i].include & include_flags) {
2392                     engstat[row++] = ctxstatus.status[i];
2393                 }
2394             }
2395         }
2396 #if 0
2397         // enable when upgrade is supported
2398         {
2399             for (int i = 0; i < PERSISTENT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2400                 if (persistent_upgrade_status.status[i].include & include_flags) {
2401                     engstat[row++] = persistent_upgrade_status.status[i];
2402                 }
2403             }
2404             FT_UPGRADE_STATUS_S ft_upgradestat;
2405             toku_ft_upgrade_get_status(&ft_upgradestat);
2406             for (int i = 0; i < FT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2407                 if (ft_upgradestat.status[i].include & include_flags) {
2408                     engstat[row++] = ft_upgradestat.status[i];
2409                 }
2410             }
2411 
2412         }
2413 #endif
2414         if (r==0) {
2415             *num_rows = row;
2416         }
2417     }
2418     return r;
2419 }
2420 
2421 // Fill buff with text description of engine status up to bufsiz bytes.
2422 // Intended for use by test programs that do not have the handlerton available,
2423 // and for use by toku_assert logic to print diagnostic info on crash.
2424 static int
env_get_engine_status_text(DB_ENV * env,char * buff,int bufsiz)2425 env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
2426     uint32_t stringsize = 1024;
2427     uint64_t panic;
2428     char panicstring[stringsize];
2429     int n = 0;  // number of characters printed so far
2430     uint64_t num_rows;
2431     uint64_t max_rows;
2432     fs_redzone_state redzone_state;
2433 
2434     n = snprintf(buff, bufsiz - n, "BUILD_ID = %d\n", BUILD_ID);
2435 
2436     (void) env_get_engine_status_num_rows (env, &max_rows);
2437     TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2438     int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2439 
2440     if (r) {
2441         n += snprintf(buff + n, bufsiz - n, "Engine status not available: ");
2442         if (!env) {
2443             n += snprintf(buff + n, bufsiz - n, "no environment\n");
2444         }
2445         else if (!(env->i)) {
2446             n += snprintf(buff + n, bufsiz - n, "environment internal struct is null\n");
2447         }
2448         else if (!env_opened(env)) {
2449             n += snprintf(buff + n, bufsiz - n, "environment is not open\n");
2450         }
2451     }
2452     else {
2453         if (panic) {
2454             n += snprintf(buff + n, bufsiz - n, "Env panic code: %" PRIu64 "\n", panic);
2455             if (strlen(panicstring)) {
2456                 invariant(strlen(panicstring) <= stringsize);
2457                 n += snprintf(buff + n, bufsiz - n, "Env panic string: %s\n", panicstring);
2458             }
2459         }
2460 
2461         for (uint64_t row = 0; row < num_rows; row++) {
2462             n += snprintf(buff + n, bufsiz - n, "%s: ", mystat[row].legend);
2463             switch (mystat[row].type) {
2464             case FS_STATE:
2465                 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2466                 break;
2467             case UINT64:
2468                 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2469                 break;
2470             case CHARSTR:
2471                 n += snprintf(buff + n, bufsiz - n, "%s\n", mystat[row].value.str);
2472                 break;
2473             case UNIXTIME:
2474                 {
2475                     char tbuf[26];
2476                     format_time((time_t*)&mystat[row].value.num, tbuf);
2477                     n += snprintf(buff + n, bufsiz - n, "%s\n", tbuf);
2478                 }
2479                 break;
2480             case TOKUTIME:
2481                 {
2482                     double t = tokutime_to_seconds(mystat[row].value.num);
2483                     n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
2484                 }
2485                 break;
2486             case PARCOUNT:
2487                 {
2488                     uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2489                     n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
2490                 }
2491                 break;
2492             default:
2493                 n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
2494                 break;
2495             }
2496         }
2497     }
2498 
2499     if (n > bufsiz) {
2500         const char * errmsg = "BUFFER TOO SMALL\n";
2501         int len = strlen(errmsg) + 1;
2502         (void) snprintf(buff + (bufsiz - 1) - len, len, "%s", errmsg);
2503     }
2504 
2505     return r;
2506 }
2507 
2508 // prints engine status using toku_env_err line-by-line
2509 static int
env_err_engine_status(DB_ENV * env)2510 env_err_engine_status(DB_ENV * env) {
2511     uint32_t stringsize = 1024;
2512     uint64_t panic;
2513     char panicstring[stringsize];
2514     uint64_t num_rows;
2515     uint64_t max_rows;
2516     fs_redzone_state redzone_state;
2517 
2518     toku_env_err(env, 0, "BUILD_ID = %d", BUILD_ID);
2519 
2520     (void) env_get_engine_status_num_rows (env, &max_rows);
2521     TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2522     int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2523 
2524     if (r) {
2525         toku_env_err(env, 0, "Engine status not available: ");
2526         if (!env) {
2527             toku_env_err(env, 0, "no environment");
2528         }
2529         else if (!(env->i)) {
2530             toku_env_err(env, 0, "environment internal struct is null");
2531         }
2532         else if (!env_opened(env)) {
2533             toku_env_err(env, 0, "environment is not open");
2534         }
2535     }
2536     else {
2537         if (panic) {
2538             toku_env_err(env, 0, "Env panic code: %" PRIu64, panic);
2539             if (strlen(panicstring)) {
2540                 invariant(strlen(panicstring) <= stringsize);
2541                 toku_env_err(env, 0, "Env panic string: %s", panicstring);
2542             }
2543         }
2544 
2545         for (uint64_t row = 0; row < num_rows; row++) {
2546             switch (mystat[row].type) {
2547             case FS_STATE:
2548                 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2549                 break;
2550             case UINT64:
2551                 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2552                 break;
2553             case CHARSTR:
2554                 toku_env_err(env, 0, "%s: %s", mystat[row].legend, mystat[row].value.str);
2555                 break;
2556             case UNIXTIME:
2557                 {
2558                     char tbuf[26];
2559                     format_time((time_t*)&mystat[row].value.num, tbuf);
2560                     toku_env_err(env, 0, "%s: %s", mystat[row].legend, tbuf);
2561                 }
2562                 break;
2563             case TOKUTIME:
2564                 {
2565                     double t = tokutime_to_seconds(mystat[row].value.num);
2566                     toku_env_err(env, 0, "%s: %.6f", mystat[row].legend, t);
2567                 }
2568                 break;
2569             case PARCOUNT:
2570                 {
2571                     uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2572                     toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, v);
2573                 }
2574                 break;
2575             default:
2576                 toku_env_err(env, 0, "%s: UNKNOWN STATUS TYPE: %d", mystat[row].legend, mystat[row].type);
2577                 break;
2578             }
2579         }
2580     }
2581 
2582     return r;
2583 }
2584 
2585 // intended for use by toku_assert logic, when env is not known
2586 static int
toku_maybe_get_engine_status_text(char * buff,int buffsize)2587 toku_maybe_get_engine_status_text (char * buff, int buffsize) {
2588     DB_ENV * env = most_recent_env;
2589     int r;
2590     if (engine_status_enable && env != NULL) {
2591         r = env_get_engine_status_text(env, buff, buffsize);
2592     }
2593     else {
2594         r = EOPNOTSUPP;
2595         snprintf(buff, buffsize, "Engine status not available: disabled by user.  This should only happen in test programs.\n");
2596     }
2597     return r;
2598 }
2599 
2600 static int
toku_maybe_err_engine_status(void)2601 toku_maybe_err_engine_status (void) {
2602     DB_ENV * env = most_recent_env;
2603     int r;
2604     if (engine_status_enable && env != NULL) {
2605         r = env_err_engine_status(env);
2606     }
2607     else {
2608         r = EOPNOTSUPP;
2609     }
2610     return r;
2611 }
2612 
2613 // Set panic code and panic string if not already panicked,
2614 // intended for use by toku_assert when about to abort().
2615 static void
toku_maybe_set_env_panic(int code,const char * msg)2616 toku_maybe_set_env_panic(int code, const char * msg) {
2617     if (code == 0)
2618         code = -1;
2619     if (msg == NULL)
2620         msg = "Unknown cause from abort (failed assert)\n";
2621     env_is_panicked = code;  // disable library destructor no matter what
2622     DB_ENV * env = most_recent_env;
2623     if (env &&
2624         env->i &&
2625         (env->i->is_panicked == 0)) {
2626         env_panic(env, code, msg);
2627     }
2628 }
2629 
2630 // handlerton's call to fractal tree layer on failed assert in handlerton
2631 static int
env_crash(DB_ENV * UU (db_env),const char * msg,const char * fun,const char * file,int line,int caller_errno)2632 env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* file, int line, int caller_errno) {
2633     toku_do_assert_fail(msg, fun, file, line, caller_errno);
2634     return -1;  // placate compiler
2635 }
2636 
2637 static int
env_get_cursor_for_persistent_environment(DB_ENV * env,DB_TXN * txn,DBC ** c)2638 env_get_cursor_for_persistent_environment(DB_ENV* env, DB_TXN* txn, DBC** c) {
2639     if (!env_opened(env)) {
2640         return EINVAL;
2641     }
2642     return toku_db_cursor(env->i->persistent_environment, txn, c, 0);
2643 }
2644 
2645 static int
env_get_cursor_for_directory(DB_ENV * env,DB_TXN * txn,DBC ** c)2646 env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
2647     if (!env_opened(env)) {
2648         return EINVAL;
2649     }
2650     return toku_db_cursor(env->i->directory, txn, c, 0);
2651 }
2652 
2653 static DB *
env_get_db_for_directory(DB_ENV * env)2654 env_get_db_for_directory(DB_ENV* env) {
2655     if (!env_opened(env)) {
2656         return NULL;
2657     }
2658     return env->i->directory;
2659 }
2660 
2661 struct ltm_iterate_requests_callback_extra {
ltm_iterate_requests_callback_extraltm_iterate_requests_callback_extra2662     ltm_iterate_requests_callback_extra(DB_ENV *e,
2663                                         iterate_requests_callback cb,
2664                                         void *ex) :
2665         env(e), callback(cb), extra(ex) {
2666     }
2667     DB_ENV *env;
2668     iterate_requests_callback callback;
2669     void *extra;
2670 };
2671 
2672 static int
find_db_by_dict_id(DB * const & db,const DICTIONARY_ID & dict_id_find)2673 find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
2674     DICTIONARY_ID dict_id = db->i->dict_id;
2675     if (dict_id.dictid < dict_id_find.dictid) {
2676         return -1;
2677     } else if (dict_id.dictid > dict_id_find.dictid) {
2678         return 1;
2679     } else {
2680         return 0;
2681     }
2682 }
2683 
2684 static DB *
locked_get_db_by_dict_id(DB_ENV * env,DICTIONARY_ID dict_id)2685 locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
2686     DB *db;
2687     int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
2688     return r == 0 ? db : nullptr;
2689 }
2690 
ltm_iterate_requests_callback(DICTIONARY_ID dict_id,TXNID txnid,const DBT * left_key,const DBT * right_key,TXNID blocking_txnid,uint64_t start_time,void * extra)2691 static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
2692                                          const DBT *left_key,
2693                                          const DBT *right_key,
2694                                          TXNID blocking_txnid,
2695                                          uint64_t start_time,
2696                                          void *extra) {
2697     ltm_iterate_requests_callback_extra *info =
2698         reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
2699 
2700     toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2701     int r = 0;
2702     DB *db = locked_get_db_by_dict_id(info->env, dict_id);
2703     if (db != nullptr) {
2704         r = info->callback(db, txnid, left_key, right_key,
2705                            blocking_txnid, start_time, info->extra);
2706     }
2707     toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2708     return r;
2709 }
2710 
2711 static int
env_iterate_pending_lock_requests(DB_ENV * env,iterate_requests_callback callback,void * extra)2712 env_iterate_pending_lock_requests(DB_ENV *env,
2713                                   iterate_requests_callback callback,
2714                                   void *extra) {
2715     if (!env_opened(env)) {
2716         return EINVAL;
2717     }
2718 
2719     toku::locktree_manager *mgr = &env->i->ltm;
2720     ltm_iterate_requests_callback_extra e(env, callback, extra);
2721     return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
2722 }
2723 
2724 // for the lifetime of this object:
2725 // - open_dbs_rwlock must be read locked (or better)
2726 // - txn_mutex must be held
2727 struct iter_txn_row_locks_callback_extra {
iter_txn_row_locks_callback_extraiter_txn_row_locks_callback_extra2728     iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
2729         env(e), current_db(nullptr), which_lt(0), lt_map(m) {
2730         if (lt_map->size() > 0) {
2731             set_iterator_and_current_db();
2732         }
2733     }
2734 
set_iterator_and_current_dbiter_txn_row_locks_callback_extra2735     void set_iterator_and_current_db() {
2736         txn_lt_key_ranges ranges;
2737         const int r = lt_map->fetch(which_lt, &ranges);
2738         invariant_zero(r);
2739         current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
2740         iter = toku::range_buffer::iterator(ranges.buffer);
2741     }
2742 
2743     DB_ENV *env;
2744     DB *current_db;
2745     size_t which_lt;
2746     toku::omt<txn_lt_key_ranges> *lt_map;
2747     toku::range_buffer::iterator iter;
2748     toku::range_buffer::iterator::record rec;
2749 };
2750 
iter_txn_row_locks_callback(DB ** db,DBT * left_key,DBT * right_key,void * extra)2751 static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
2752     iter_txn_row_locks_callback_extra *info =
2753         reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
2754 
2755     while (info->which_lt < info->lt_map->size()) {
2756         const bool more = info->iter.current(&info->rec);
2757         if (more) {
2758             *db = info->current_db;
2759             // The caller should interpret data/size == 0 to mean infinity.
2760             // Therefore, when we copyref pos/neg infinity into left/right_key,
2761             // the caller knows what we're talking about.
2762             toku_copyref_dbt(left_key, *info->rec.get_left_key());
2763             toku_copyref_dbt(right_key, *info->rec.get_right_key());
2764             info->iter.next();
2765             return 0;
2766         } else {
2767             info->which_lt++;
2768             if (info->which_lt < info->lt_map->size()) {
2769                 info->set_iterator_and_current_db();
2770             }
2771         }
2772     }
2773     return DB_NOTFOUND;
2774 }
2775 
2776 struct iter_txns_callback_extra {
iter_txns_callback_extraiter_txns_callback_extra2777     iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
2778         env(e), callback(cb), extra(ex) {
2779     }
2780     DB_ENV *env;
2781     iterate_transactions_callback callback;
2782     void *extra;
2783 };
2784 
iter_txns_callback(TOKUTXN txn,void * extra)2785 static int iter_txns_callback(TOKUTXN txn, void *extra) {
2786     int r = 0;
2787     iter_txns_callback_extra *info =
2788         reinterpret_cast<iter_txns_callback_extra *>(extra);
2789     DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
2790     invariant_notnull(dbtxn);
2791     struct __toku_db_txn_internal *db_txn_internal __attribute__((__unused__)) = db_txn_struct_i(dbtxn);
2792     TOKU_VALGRIND_HG_DISABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2793     if (db_txn_struct_i(dbtxn)->tokutxn == txn) { // make sure that the dbtxn is fully initialized
2794         toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
2795         toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2796 
2797         iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
2798         r = info->callback(dbtxn, iter_txn_row_locks_callback, &e, info->extra);
2799 
2800         toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2801         toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
2802     }
2803     TOKU_VALGRIND_HG_ENABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2804 
2805     return r;
2806 }
2807 
2808 static int
env_iterate_live_transactions(DB_ENV * env,iterate_transactions_callback callback,void * extra)2809 env_iterate_live_transactions(DB_ENV *env,
2810                               iterate_transactions_callback callback,
2811                               void *extra) {
2812     if (!env_opened(env)) {
2813         return EINVAL;
2814     }
2815 
2816     TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
2817     iter_txns_callback_extra e(env, callback, extra);
2818     return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
2819 }
2820 
env_set_loader_memory_size(DB_ENV * env,uint64_t (* get_loader_memory_size_callback)(void))2821 static void env_set_loader_memory_size(DB_ENV *env, uint64_t (*get_loader_memory_size_callback)(void)) {
2822     env->i->get_loader_memory_size_callback = get_loader_memory_size_callback;
2823 }
2824 
env_get_loader_memory_size(DB_ENV * env)2825 static uint64_t env_get_loader_memory_size(DB_ENV *env) {
2826     uint64_t memory_size = 0;
2827     if (env->i->get_loader_memory_size_callback)
2828         memory_size = env->i->get_loader_memory_size_callback();
2829     return memory_size;
2830 }
2831 
env_set_killed_callback(DB_ENV * env,uint64_t default_killed_time_msec,uint64_t (* get_killed_time_callback)(uint64_t default_killed_time_msec),int (* killed_callback)(void))2832 static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_msec, uint64_t (*get_killed_time_callback)(uint64_t default_killed_time_msec), int (*killed_callback)(void)) {
2833     env->i->default_killed_time_msec = default_killed_time_msec;
2834     env->i->get_killed_time_callback = get_killed_time_callback;
2835     env->i->killed_callback = killed_callback;
2836 }
2837 
env_kill_waiter(DB_ENV * env,void * extra)2838 static void env_kill_waiter(DB_ENV *env, void *extra) {
2839     env->i->ltm.kill_waiter(extra);
2840 }
2841 
env_do_backtrace(DB_ENV * env)2842 static void env_do_backtrace(DB_ENV *env) {
2843     if (env->i->errcall) {
2844         db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
2845     }
2846     if (env->i->errfile) {
2847         db_env_do_backtrace((FILE *) env->i->errfile);
2848     } else {
2849         db_env_do_backtrace(stderr);
2850     }
2851 }
2852 
2853 static int
toku_env_create(DB_ENV ** envp,uint32_t flags)2854 toku_env_create(DB_ENV ** envp, uint32_t flags) {
2855     int r = ENOSYS;
2856     DB_ENV* result = NULL;
2857 
2858     if (flags!=0)    { r = EINVAL; goto cleanup; }
2859     MALLOC(result);
2860     if (result == 0) { r = ENOMEM; goto cleanup; }
2861     memset(result, 0, sizeof *result);
2862 
2863     // locked methods
2864     result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_env_err;
2865 #define SENV(name) result->name = locked_env_ ## name
2866     SENV(dbremove);
2867     SENV(dbrename);
2868     SENV(dirtool_attach);
2869     SENV(dirtool_detach);
2870     SENV(dirtool_move);
2871     //SENV(set_noticecall);
2872 #undef SENV
2873 #define USENV(name) result->name = env_ ## name
2874     // methods with locking done internally
2875     USENV(put_multiple);
2876     USENV(del_multiple);
2877     USENV(update_multiple);
2878     // unlocked methods
2879     USENV(open);
2880     USENV(close);
2881     USENV(set_default_bt_compare);
2882     USENV(set_update);
2883     USENV(set_generate_row_callback_for_put);
2884     USENV(set_generate_row_callback_for_del);
2885     USENV(set_lg_bsize);
2886     USENV(set_lg_dir);
2887     USENV(set_lg_max);
2888     USENV(get_lg_max);
2889     USENV(set_lk_max_memory);
2890     USENV(get_lk_max_memory);
2891     USENV(get_iname);
2892     USENV(set_errcall);
2893     USENV(set_errfile);
2894     USENV(set_errpfx);
2895     USENV(set_data_dir);
2896     USENV(checkpointing_set_period);
2897     USENV(checkpointing_get_period);
2898     USENV(cleaner_set_period);
2899     USENV(cleaner_get_period);
2900     USENV(cleaner_set_iterations);
2901     USENV(cleaner_get_iterations);
2902     USENV(evictor_set_enable_partial_eviction);
2903     USENV(evictor_get_enable_partial_eviction);
2904     USENV(set_cachesize);
2905     USENV(set_client_pool_threads);
2906     USENV(set_cachetable_pool_threads);
2907     USENV(set_checkpoint_pool_threads);
2908 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
2909     USENV(get_cachesize);
2910 #endif
2911 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
2912     USENV(set_lk_max);
2913 #endif
2914     USENV(set_lk_detect);
2915     USENV(set_flags);
2916     USENV(set_tmp_dir);
2917     USENV(set_verbose);
2918     USENV(txn_recover);
2919     USENV(txn_xa_recover);
2920     USENV(get_txn_from_xid);
2921     USENV(txn_stat);
2922     USENV(get_lock_timeout);
2923     USENV(set_lock_timeout);
2924     USENV(set_lock_timeout_callback);
2925     USENV(set_lock_wait_callback);
2926     USENV(set_redzone);
2927     USENV(log_flush);
2928     USENV(log_archive);
2929     USENV(create_loader);
2930     USENV(get_cursor_for_persistent_environment);
2931     USENV(get_cursor_for_directory);
2932     USENV(get_db_for_directory);
2933     USENV(iterate_pending_lock_requests);
2934     USENV(iterate_live_transactions);
2935     USENV(change_fsync_log_period);
2936     USENV(set_loader_memory_size);
2937     USENV(get_loader_memory_size);
2938     USENV(set_killed_callback);
2939     USENV(do_backtrace);
2940     USENV(set_check_thp);
2941     USENV(get_check_thp);
2942     USENV(set_dir_per_db);
2943     USENV(get_dir_per_db);
2944     USENV(get_data_dir);
2945     USENV(kill_waiter);
2946 #undef USENV
2947 
2948     // unlocked methods
2949     result->create_indexer = toku_indexer_create_indexer;
2950     result->txn_checkpoint = toku_env_txn_checkpoint;
2951     result->checkpointing_postpone = env_checkpointing_postpone;
2952     result->checkpointing_resume = env_checkpointing_resume;
2953     result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
2954     result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
2955     result->get_engine_status_num_rows = env_get_engine_status_num_rows;
2956     result->get_engine_status = env_get_engine_status;
2957     result->get_engine_status_text = env_get_engine_status_text;
2958     result->crash = env_crash;  // handlerton's call to fractal tree layer on failed assert
2959     result->txn_begin = toku_txn_begin;
2960 
2961     MALLOC(result->i);
2962     if (result->i == 0) { r = ENOMEM; goto cleanup; }
2963     memset(result->i, 0, sizeof *result->i);
2964     result->i->envdir_lockfd  = -1;
2965     result->i->datadir_lockfd = -1;
2966     result->i->logdir_lockfd  = -1;
2967     result->i->tmpdir_lockfd  = -1;
2968     env_fs_init(result);
2969     env_fsync_log_init(result);
2970 
2971     result->i->check_thp = true;
2972 
2973     result->i->bt_compare = toku_builtin_compare_fun;
2974 
2975     r = toku_logger_create(&result->i->logger);
2976     invariant_zero(r);
2977     invariant_notnull(result->i->logger);
2978 
2979     // Create the locktree manager, passing in the create/destroy/escalate callbacks.
2980     // The extra parameter for escalation is simply a pointer to this environment.
2981     // The escalate callback will need it to translate txnids to DB_TXNs
2982     result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
2983 
2984     XMALLOC(result->i->open_dbs_by_dname);
2985     result->i->open_dbs_by_dname->create();
2986     XMALLOC(result->i->open_dbs_by_dict_id);
2987     result->i->open_dbs_by_dict_id->create();
2988     toku_pthread_rwlock_init(
2989         *result_i_open_dbs_rwlock_key, &result->i->open_dbs_rwlock, nullptr);
2990 
2991     *envp = result;
2992     r = 0;
2993     toku_sync_fetch_and_add(&tokuft_num_envs, 1);
2994 cleanup:
2995     if (r!=0) {
2996         if (result) {
2997             toku_free(result->i);
2998             toku_free(result);
2999         }
3000     }
3001     return r;
3002 }
3003 
3004 int
DB_ENV_CREATE_FUN(DB_ENV ** envp,uint32_t flags)3005 DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
3006     int r = toku_env_create(envp, flags);
3007     return r;
3008 }
3009 
3010 // return 0 if v and dbv refer to same db (including same dname)
3011 // return <0 if v is earlier in omt than dbv
3012 // return >0 if v is later in omt than dbv
3013 static int
find_db_by_db_dname(DB * const & db,DB * const & dbfind)3014 find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
3015     int cmp;
3016     const char *dname     = db->i->dname;
3017     const char *dnamefind = dbfind->i->dname;
3018     cmp = strcmp(dname, dnamefind);
3019     if (cmp != 0) return cmp;
3020     if (db < dbfind) return -1;
3021     if (db > dbfind) return  1;
3022     return 0;
3023 }
3024 
3025 static int
find_db_by_db_dict_id(DB * const & db,DB * const & dbfind)3026 find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
3027     DICTIONARY_ID dict_id = db->i->dict_id;
3028     DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
3029     if (dict_id.dictid < dict_id_find.dictid) {
3030         return -1;
3031     } else if (dict_id.dictid > dict_id_find.dictid) {
3032         return 1;
3033     } else if (db < dbfind) {
3034         return -1;
3035     } else if (db > dbfind) {
3036         return 1;
3037     } else {
3038         return 0;
3039     }
3040 }
3041 
3042 // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
3043 void
env_note_db_opened(DB_ENV * env,DB * db)3044 env_note_db_opened(DB_ENV *env, DB *db) {
3045     toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3046     assert(db->i->dname); // internal (non-user) dictionary has no dname
3047 
3048     int r;
3049     uint32_t idx;
3050 
3051     r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3052     assert(r == DB_NOTFOUND);
3053     r = env->i->open_dbs_by_dname->insert_at(db, idx);
3054     assert_zero(r);
3055     r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3056     assert(r == DB_NOTFOUND);
3057     r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
3058     assert_zero(r);
3059 
3060     STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3061     STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
3062     if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
3063         STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
3064     }
3065     toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3066 }
3067 
3068 // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API.  The DB may still be in use by the fractal tree internals.
3069 void
env_note_db_closed(DB_ENV * env,DB * db)3070 env_note_db_closed(DB_ENV *env, DB *db) {
3071     toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3072     assert(db->i->dname); // internal (non-user) dictionary has no dname
3073     assert(env->i->open_dbs_by_dname->size() > 0);
3074     assert(env->i->open_dbs_by_dict_id->size() > 0);
3075 
3076     int r;
3077     uint32_t idx;
3078 
3079     r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3080     assert_zero(r);
3081     r = env->i->open_dbs_by_dname->delete_at(idx);
3082     assert_zero(r);
3083     r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3084     assert_zero(r);
3085     r = env->i->open_dbs_by_dict_id->delete_at(idx);
3086     assert_zero(r);
3087 
3088     STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
3089     STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3090     toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3091 }
3092 
3093 static int
find_open_db_by_dname(DB * const & db,const char * const & dnamefind)3094 find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
3095     return strcmp(db->i->dname, dnamefind);
3096 }
3097 
3098 // return true if there is any db open with the given dname
3099 static bool
env_is_db_with_dname_open(DB_ENV * env,const char * dname)3100 env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
3101     DB *db;
3102     toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
3103     int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
3104     if (r == 0) {
3105         invariant(strcmp(dname, db->i->dname) == 0);
3106     } else {
3107         invariant(r == DB_NOTFOUND);
3108     }
3109     toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
3110     return r == 0 ? true : false;
3111 }
3112 
3113 //We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
3114 static int
env_dbremove_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,int32_t flags)3115 env_dbremove_subdb(DB_ENV * env, DB_TXN * txn, const char *fname, const char *dbname, int32_t flags) {
3116     int r;
3117     if (!fname || !dbname) r = EINVAL;
3118     else {
3119         char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3120         int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3121         assert(bytes==(int)sizeof(subdb_full_name)-1);
3122         const char *null_subdbname = NULL;
3123         r = env_dbremove(env, txn, subdb_full_name, null_subdbname, flags);
3124     }
3125     return r;
3126 }
3127 
3128 // see if we can acquire a table lock for the given dname.
3129 // requires: write lock on dname in the directory. dictionary
3130 //          open, close, and begin checkpoint cannot occur.
3131 // returns: zero if we could open, lock, and close a dictionary
3132 //          with the given dname, errno otherwise.
3133 static int
can_acquire_table_lock(DB_ENV * env,DB_TXN * txn,const char * iname_in_env)3134 can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
3135     int r;
3136     DB *db;
3137 
3138     r = toku_db_create(&db, env, 0);
3139     assert_zero(r);
3140     r = toku_db_open_iname(db, txn, iname_in_env, 0, 0);
3141     if(r) {
3142 	if (r == ENAMETOOLONG)
3143 	    toku_ydb_do_error(env, r, "File name too long!\n");
3144 	goto exit;
3145     }
3146     r = toku_db_pre_acquire_table_lock(db, txn);
3147     if (r) {
3148         r = DB_LOCK_NOTGRANTED;
3149     }
3150 exit:
3151     if(db) {
3152         int r2 = toku_db_close(db);
3153         assert_zero(r2);
3154     }
3155     return r;
3156 }
3157 
3158 static int
env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)3159 env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
3160     int r;
3161     HANDLE_PANICKED_ENV(env);
3162     if (!env_opened(env) || flags != 0) {
3163         return EINVAL;
3164     }
3165     HANDLE_READ_ONLY_TXN(txn);
3166     if (dbname != NULL) {
3167         // env_dbremove_subdb() converts (fname, dbname) to dname
3168         return env_dbremove_subdb(env, txn, fname, dbname, flags);
3169     }
3170 
3171     const char * dname = fname;
3172     assert(dbname == NULL);
3173 
3174     // We check for an open db here as a "fast path" to error.
3175     // We'll need to check again below to be sure.
3176     if (env_is_db_with_dname_open(env, dname)) {
3177         return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3178     }
3179 
3180     DBT dname_dbt;
3181     DBT iname_dbt;
3182     toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
3183     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3184 
3185     // get iname
3186     r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3187     char *iname = (char *) iname_dbt.data;
3188     DB *db = NULL;
3189     if (r != 0) {
3190         if (r == DB_NOTFOUND) {
3191             r = ENOENT;
3192         }
3193         goto exit;
3194     }
3195     // remove (dname,iname) from directory
3196     r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
3197     if (r != 0) {
3198         goto exit;
3199     }
3200     r = toku_db_create(&db, env, 0);
3201     lazy_assert_zero(r);
3202     r = toku_db_open_iname(db, txn, iname, 0, 0);
3203     if (txn && r) {
3204         if (r == EMFILE || r == ENFILE)
3205             r = toku_ydb_do_error(env, r, "toku dbremove failed because open file limit reached\n");
3206         else if (r != ENOENT)
3207             r = toku_ydb_do_error(env, r, "toku dbremove failed\n");
3208         else
3209             r = 0;
3210         goto exit;
3211     }
3212     if (txn) {
3213         // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
3214         if (env_is_db_with_dname_open(env, dname)) {
3215             r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3216             goto exit;
3217         }
3218         // we know a live db handle does not exist.
3219         //
3220         // use the internally opened db to try and get a table lock
3221         //
3222         // if we can't get it, then some txn needs the ft and we
3223         // should return lock not granted.
3224         //
3225         // otherwise, we're okay in marking this ft as remove on
3226         // commit. no new handles can open for this dictionary
3227         // because the txn has directory write locks on the dname
3228         r = toku_db_pre_acquire_table_lock(db, txn);
3229         if (r != 0) {
3230             r = DB_LOCK_NOTGRANTED;
3231             goto exit;
3232         }
3233         // The ft will be unlinked when the txn commits
3234         toku_ft_unlink_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
3235     }
3236     else {
3237         // unlink the ft without a txn
3238         toku_ft_unlink(db->i->ft_handle);
3239     }
3240 
3241 exit:
3242     if (db) {
3243         int ret = toku_db_close(db);
3244         assert(ret == 0);
3245     }
3246     if (iname) {
3247         toku_free(iname);
3248     }
3249     return r;
3250 }
3251 
3252 static int
env_dbrename_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3253 env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3254     int r;
3255     if (!fname || !dbname || !newname) r = EINVAL;
3256     else {
3257         char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3258         {
3259             int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3260             assert(bytes==(int)sizeof(subdb_full_name)-1);
3261         }
3262         char new_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3263         {
3264             int bytes = snprintf(new_full_name, sizeof(new_full_name), "%s/%s", fname, dbname);
3265             assert(bytes==(int)sizeof(new_full_name)-1);
3266         }
3267         const char *null_subdbname = NULL;
3268         r = env_dbrename(env, txn, subdb_full_name, null_subdbname, new_full_name, flags);
3269     }
3270     return r;
3271 }
3272 
3273 static int
env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3274 env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3275     int r;
3276     HANDLE_PANICKED_ENV(env);
3277     if (!env_opened(env) || flags != 0) {
3278         return EINVAL;
3279     }
3280     HANDLE_READ_ONLY_TXN(txn);
3281     if (dbname != NULL) {
3282         // env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
3283         return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
3284     }
3285 
3286     const char * dname = fname;
3287     assert(dbname == NULL);
3288 
3289     // We check for open dnames for the old and new name as a "fast path" to error.
3290     // We will need to check these again later.
3291     if (env_is_db_with_dname_open(env, dname)) {
3292         return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3293     }
3294     if (env_is_db_with_dname_open(env, newname)) {
3295         return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3296     }
3297 
3298     DBT old_dname_dbt;
3299     DBT new_dname_dbt;
3300     DBT iname_dbt;
3301     toku_fill_dbt(&old_dname_dbt, dname, strlen(dname)+1);
3302     toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
3303     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3304 
3305     // get iname
3306     r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3307     char *iname = (char *) iname_dbt.data;
3308     if (r == DB_NOTFOUND) {
3309         r = ENOENT;
3310     } else if (r == 0) {
3311         // verify that newname does not already exist
3312         r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
3313         if (r == 0) {
3314             r = EEXIST;
3315         }
3316         else if (r == DB_NOTFOUND) {
3317             DBT new_iname_dbt;
3318             // Do not rename ft file if 'dir_per_db' option is not set
3319             auto new_iname =
3320                 env->get_dir_per_db(env)
3321                     ? generate_iname_for_rename_or_open(
3322                           env, txn, newname, false)
3323                     : std::unique_ptr<char[], decltype(&toku_free)>(
3324                           toku_strdup(iname), &toku_free);
3325             toku_fill_dbt(
3326                 &new_iname_dbt, new_iname.get(), strlen(new_iname.get()) + 1);
3327 
3328             // remove old (dname,iname) and insert (newname,iname) in directory
3329             r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
3330             if (r != 0) { goto exit; }
3331 
3332             // Do not rename ft file if 'dir_per_db' option is not set
3333             if (env->get_dir_per_db(env))
3334                 r = toku_ft_rename_iname(txn,
3335                                          env->get_data_dir(env),
3336                                          iname,
3337                                          new_iname.get(),
3338                                          env->i->cachetable);
3339 
3340             r = toku_db_put(env->i->directory,
3341                             txn,
3342                             &new_dname_dbt,
3343                             &new_iname_dbt,
3344                             0,
3345                             true);
3346             if (r != 0) { goto exit; }
3347 
3348             //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
3349             if (env_is_db_with_dname_open(env, dname)) {
3350                 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3351                 goto exit;
3352             }
3353             if (env_is_db_with_dname_open(env, newname)) {
3354                 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3355                 goto exit;
3356             }
3357 
3358             // we know a live db handle does not exist.
3359             //
3360             // use the internally opened db to try and get a table lock
3361             //
3362             // if we can't get it, then some txn needs the ft and we
3363             // should return lock not granted.
3364             //
3365             // otherwise, we're okay in marking this ft as remove on
3366             // commit. no new handles can open for this dictionary
3367             // because the txn has directory write locks on the dname
3368             if (txn) {
3369                 r = can_acquire_table_lock(env, txn, new_iname.get());
3370             }
3371             // We don't do anything at the ft or cachetable layer for rename.
3372             // We just update entries in the environment's directory.
3373         }
3374     }
3375 
3376 exit:
3377     if (iname) {
3378         toku_free(iname);
3379     }
3380     return r;
3381 }
3382 
3383 int
DB_CREATE_FUN(DB ** db,DB_ENV * env,uint32_t flags)3384 DB_CREATE_FUN (DB ** db, DB_ENV * env, uint32_t flags) {
3385     int r = toku_db_create(db, env, flags);
3386     return r;
3387 }
3388 
3389 /* need db_strerror_r for multiple threads */
3390 
3391 const char *
db_strerror(int error)3392 db_strerror(int error) {
3393     char *errorstr;
3394     if (error >= 0) {
3395         errorstr = strerror(error);
3396         if (errorstr)
3397             return errorstr;
3398     }
3399 
3400     switch (error) {
3401         case DB_BADFORMAT:
3402             return "Database Bad Format (probably a corrupted database)";
3403         case DB_NOTFOUND:
3404             return "Not found";
3405         case TOKUDB_OUT_OF_LOCKS:
3406             return "Out of locks";
3407         case TOKUDB_DICTIONARY_TOO_OLD:
3408             return "Dictionary too old for this version of PerconaFT";
3409         case TOKUDB_DICTIONARY_TOO_NEW:
3410             return "Dictionary too new for this version of PerconaFT";
3411         case TOKUDB_CANCELED:
3412             return "User cancelled operation";
3413         case TOKUDB_NO_DATA:
3414             return "Ran out of data (not EOF)";
3415         case TOKUDB_HUGE_PAGES_ENABLED:
3416             return "Transparent huge pages are enabled but PerconaFT's memory allocator will oversubscribe main memory with transparent huge pages.  This check can be disabled by setting the environment variable TOKU_HUGE_PAGES_OK.";
3417     }
3418 
3419     static char unknown_result[100];    // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
3420     errorstr = unknown_result;
3421     snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
3422     return errorstr;
3423 }
3424 
3425 const char *
db_version(int * major,int * minor,int * patch)3426 db_version(int *major, int *minor, int *patch) {
3427     if (major)
3428         *major = DB_VERSION_MAJOR;
3429     if (minor)
3430         *minor = DB_VERSION_MINOR;
3431     if (patch)
3432         *patch = DB_VERSION_PATCH;
3433     return toku_product_name_strings.db_version;
3434 }
3435 
3436 // HACK: To ensure toku_pthread_yield gets included in the .so
3437 // non-static would require a prototype in a header
3438 // static (since unused) would give a warning
3439 // static + unused would not actually help toku_pthread_yield get in the .so
3440 // static + used avoids all the warnings and makes sure toku_pthread_yield is in the .so
3441 static void __attribute__((__used__))
include_toku_pthread_yield(void)3442 include_toku_pthread_yield (void) {
3443     toku_pthread_yield();
3444 }
3445 
3446 // For test purposes only, translate dname to iname
3447 // YDB lock is NOT held when this function is called,
3448 // as it is called by user
3449 static int
env_get_iname(DB_ENV * env,DBT * dname_dbt,DBT * iname_dbt)3450 env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
3451     DB *directory = env->i->directory;
3452     int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_SERIALIZABLE|DB_PRELOCKED); // allocates memory for iname
3453     return r;
3454 }
3455 
3456 // TODO 2216:  Patch out this (dangerous) function when loader is working and
3457 //             we don't need to test the low-level redirect anymore.
3458 // for use by test programs only, just a wrapper around ft call:
3459 int
toku_test_db_redirect_dictionary(DB * db,const char * dname_of_new_file,DB_TXN * dbtxn)3460 toku_test_db_redirect_dictionary(DB * db, const char * dname_of_new_file, DB_TXN *dbtxn) {
3461     int r;
3462     DBT dname_dbt;
3463     DBT iname_dbt;
3464     char * new_iname_in_env;
3465 
3466     FT_HANDLE ft_handle = db->i->ft_handle;
3467     TOKUTXN tokutxn = db_txn_struct_i(dbtxn)->tokutxn;
3468 
3469     toku_fill_dbt(&dname_dbt, dname_of_new_file, strlen(dname_of_new_file)+1);
3470     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3471     r = toku_db_get(db->dbenv->i->directory, dbtxn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3472     assert_zero(r);
3473     new_iname_in_env = (char *) iname_dbt.data;
3474 
3475     toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
3476     r = toku_dictionary_redirect(new_iname_in_env, ft_handle, tokutxn);
3477     toku_multi_operation_client_unlock();
3478 
3479     toku_free(new_iname_in_env);
3480     return r;
3481 }
3482 
3483 //Tets only function
3484 uint64_t
toku_test_get_latest_lsn(DB_ENV * env)3485 toku_test_get_latest_lsn(DB_ENV *env) {
3486     LSN rval = ZERO_LSN;
3487     if (env && env->i->logger) {
3488         rval = toku_logger_last_lsn(env->i->logger);
3489     }
3490     return rval.lsn;
3491 }
3492 
toku_set_test_txn_sync_callback(void (* cb)(pthread_t,void *),void * extra)3493 void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
3494     set_test_txn_sync_callback(cb, extra);
3495 }
3496 
3497 int
toku_test_get_checkpointing_user_data_status(void)3498 toku_test_get_checkpointing_user_data_status (void) {
3499     return toku_cachetable_get_checkpointing_user_data_status();
3500 }
3501 
3502 #undef STATUS_VALUE
3503 #undef PERSISTENT_UPGRADE_STATUS_VALUE
3504 
3505 #include <toku_race_tools.h>
3506 void __attribute__((constructor)) toku_ydb_helgrind_ignore(void);
3507 void
toku_ydb_helgrind_ignore(void)3508 toku_ydb_helgrind_ignore(void) {
3509     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_layer_status, sizeof ydb_layer_status);
3510 }
3511