1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
3 #ident "$Id$"
4 /*======
5 This file is part of PerconaFT.
6 
7 
8 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
9 
10     PerconaFT is free software: you can redistribute it and/or modify
11     it under the terms of the GNU General Public License, version 2,
12     as published by the Free Software Foundation.
13 
14     PerconaFT is distributed in the hope that it will be useful,
15     but WITHOUT ANY WARRANTY; without even the implied warranty of
16     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17     GNU General Public License for more details.
18 
19     You should have received a copy of the GNU General Public License
20     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
21 
22 ----------------------------------------
23 
24     PerconaFT is free software: you can redistribute it and/or modify
25     it under the terms of the GNU Affero General Public License, version 3,
26     as published by the Free Software Foundation.
27 
28     PerconaFT is distributed in the hope that it will be useful,
29     but WITHOUT ANY WARRANTY; without even the implied warranty of
30     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31     GNU Affero General Public License for more details.
32 
33     You should have received a copy of the GNU Affero General Public License
34     along with PerconaFT.  If not, see <http://www.gnu.org/licenses/>.
35 ======= */
36 
37 #ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
38 
39 extern const char *toku_patent_string;
40 const char *toku_copyright_string = "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.";
41 
42 
43 extern int writing_rollback;
44 
45 #include <db.h>
46 #include <errno.h>
47 #include <string.h>
48 
49 #include "portability/memory.h"
50 #include "portability/toku_assert.h"
51 #include "portability/toku_portability.h"
52 #include "portability/toku_pthread.h"
53 #include "portability/toku_stdlib.h"
54 
55 #include "ft/ft-flusher.h"
56 #include "ft/cachetable/cachetable.h"
57 #include "ft/cachetable/checkpoint.h"
58 #include "ft/logger/log.h"
59 #include "ft/loader/loader.h"
60 #include "ft/log_header.h"
61 #include "ft/ft.h"
62 #include "ft/txn/txn_manager.h"
63 #include "src/ydb.h"
64 #include "src/ydb-internal.h"
65 #include "src/ydb_cursor.h"
66 #include "src/ydb_row_lock.h"
67 #include "src/ydb_env_func.h"
68 #include "src/ydb_db.h"
69 #include "src/ydb_write.h"
70 #include "src/ydb_txn.h"
71 #include "src/loader.h"
72 #include "src/indexer.h"
73 #include "util/status.h"
74 #include "util/context.h"
75 
76 #include <functional>
77 
78 // Include ydb_lib.cc here so that its constructor/destructor gets put into
79 // ydb.o, to make sure they don't get erased at link time (when linking to
80 // a static libtokufractaltree.a that was compiled with gcc).  See #5094.
81 #include "ydb_lib.cc"
82 
83 #ifdef TOKUTRACE
84  #define DB_ENV_CREATE_FUN db_env_create_toku10
85  #define DB_CREATE_FUN db_create_toku10
86 #else
87  #define DB_ENV_CREATE_FUN db_env_create
88  #define DB_CREATE_FUN db_create
toku_set_trace_file(const char * fname)89  int toku_set_trace_file (const char *fname __attribute__((__unused__))) { return 0; }
toku_close_trace_file(void)90  int toku_close_trace_file (void) { return 0; }
91 #endif
92 
93 extern uint force_recovery;
94 
95 // Set when env is panicked, never cleared.
96 static int env_is_panicked = 0;
97 
98 void
env_panic(DB_ENV * env,int cause,const char * msg)99 env_panic(DB_ENV * env, int cause, const char * msg) {
100     if (cause == 0)
101         cause = -1;  // if unknown cause, at least guarantee panic
102     if (msg == NULL)
103         msg = "Unknown cause in env_panic\n";
104     env_is_panicked = cause;
105     env->i->is_panicked = cause;
106     env->i->panic_string = toku_strdup(msg);
107 }
108 
109 static int env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp);
110 
111 /********************************************************************************
112  * Status is intended for display to humans to help understand system behavior.
113  * It does not need to be perfectly thread-safe.
114  */
115 
116 typedef enum {
117     YDB_LAYER_TIME_CREATION = 0,            /* timestamp of environment creation, read from persistent environment */
118     YDB_LAYER_TIME_STARTUP,                 /* timestamp of system startup */
119     YDB_LAYER_TIME_NOW,                     /* timestamp of engine status query */
120     YDB_LAYER_NUM_DB_OPEN,
121     YDB_LAYER_NUM_DB_CLOSE,
122     YDB_LAYER_NUM_OPEN_DBS,
123     YDB_LAYER_MAX_OPEN_DBS,
124     YDB_LAYER_FSYNC_LOG_PERIOD,
125 #if 0
126     YDB_LAYER_ORIGINAL_ENV_VERSION,         /* version of original environment, read from persistent environment */
127     YDB_LAYER_STARTUP_ENV_VERSION,          /* version of environment at this startup, read from persistent environment (curr_env_ver_key) */
128     YDB_LAYER_LAST_LSN_OF_V13,              /* read from persistent environment */
129     YDB_LAYER_UPGRADE_V14_TIME,             /* timestamp of upgrade to version 14, read from persistent environment */
130     YDB_LAYER_UPGRADE_V14_FOOTPRINT,        /* footprint of upgrade to version 14, read from persistent environment */
131 #endif
132     YDB_LAYER_STATUS_NUM_ROWS              /* number of rows in this status array */
133 } ydb_layer_status_entry;
134 
135 typedef struct {
136     bool initialized;
137     TOKU_ENGINE_STATUS_ROW_S status[YDB_LAYER_STATUS_NUM_ROWS];
138 } YDB_LAYER_STATUS_S, *YDB_LAYER_STATUS;
139 
140 static YDB_LAYER_STATUS_S ydb_layer_status;
141 #define STATUS_VALUE(x) ydb_layer_status.status[x].value.num
142 
143 #define STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(ydb_layer_status, k, c, t, l, inc)
144 
145 static void
ydb_layer_status_init(void)146 ydb_layer_status_init (void) {
147     // Note, this function initializes the keyname, type, and legend fields.
148     // Value fields are initialized to zero by compiler.
149 
150     STATUS_INIT(YDB_LAYER_TIME_CREATION,              nullptr, UNIXTIME, "time of environment creation", TOKU_ENGINE_STATUS);
151     STATUS_INIT(YDB_LAYER_TIME_STARTUP,               nullptr, UNIXTIME, "time of engine startup", TOKU_ENGINE_STATUS);
152     STATUS_INIT(YDB_LAYER_TIME_NOW,                   nullptr, UNIXTIME, "time now", TOKU_ENGINE_STATUS);
153     STATUS_INIT(YDB_LAYER_NUM_DB_OPEN,                DB_OPENS, UINT64,   "db opens", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
154     STATUS_INIT(YDB_LAYER_NUM_DB_CLOSE,               DB_CLOSES, UINT64,   "db closes", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
155     STATUS_INIT(YDB_LAYER_NUM_OPEN_DBS,               DB_OPEN_CURRENT, UINT64,   "num open dbs now", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
156     STATUS_INIT(YDB_LAYER_MAX_OPEN_DBS,               DB_OPEN_MAX, UINT64,   "max open dbs", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
157     STATUS_INIT(YDB_LAYER_FSYNC_LOG_PERIOD,           nullptr, UINT64,   "period, in ms, that recovery log is automatically fsynced", TOKU_ENGINE_STATUS);
158 
159     STATUS_VALUE(YDB_LAYER_TIME_STARTUP) = time(NULL);
160     ydb_layer_status.initialized = true;
161 }
162 #undef STATUS_INIT
163 
164 static void
ydb_layer_get_status(DB_ENV * env,YDB_LAYER_STATUS statp)165 ydb_layer_get_status(DB_ENV* env, YDB_LAYER_STATUS statp) {
166     STATUS_VALUE(YDB_LAYER_TIME_NOW) = time(NULL);
167     STATUS_VALUE(YDB_LAYER_FSYNC_LOG_PERIOD) = toku_minicron_get_period_in_ms_unlocked(&env->i->fsync_log_cron);
168     *statp = ydb_layer_status;
169 }
170 
171 /********************************************************************************
172  * End of ydb_layer local status section.
173  */
174 
175 static DB_ENV * volatile most_recent_env;   // most recently opened env, used for engine status on crash.  Note there are likely to be races on this if you have multiple threads creating and closing environments in parallel.  We'll declare it volatile since at least that helps make sure the compiler doesn't optimize away certain code (e.g., if while debugging, you write a code that spins on most_recent_env, you'd like to compiler not to optimize your code away.)
176 
177 static int env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt);
178 static int toku_maybe_get_engine_status_text (char* buff, int buffsize);  // for use by toku_assert
179 static int toku_maybe_err_engine_status (void);
180 static void toku_maybe_set_env_panic(int code, const char * msg);               // for use by toku_assert
181 
182 int
toku_ydb_init(void)183 toku_ydb_init(void) {
184     int r = 0;
185     //Lower level must be initialized first.
186     r = toku_ft_layer_init();
187     return r;
188 }
189 
190 // Do not clean up resources if env is panicked, just exit ugly
191 void
toku_ydb_destroy(void)192 toku_ydb_destroy(void) {
193     if (!ydb_layer_status.initialized)
194         return;
195     if (env_is_panicked == 0) {
196         toku_ft_layer_destroy();
197     }
198     ydb_layer_status.initialized = false;
199 }
200 
201 static int
ydb_getf_do_nothing(DBT const * UU (key),DBT const * UU (val),void * UU (extra))202 ydb_getf_do_nothing(DBT const* UU(key), DBT const* UU(val), void* UU(extra)) {
203     return 0;
204 }
205 
206 /* env methods */
207 
208 static void
env_fs_report_in_yellow(DB_ENV * UU (env))209 env_fs_report_in_yellow(DB_ENV *UU(env)) {
210     char tbuf[26];
211     time_t tnow = time(NULL);
212     fprintf(stderr, "%.24s PerconaFT file system space is low\n", ctime_r(&tnow, tbuf)); fflush(stderr);
213 }
214 
215 static void
env_fs_report_in_red(DB_ENV * UU (env))216 env_fs_report_in_red(DB_ENV *UU(env)) {
217     char tbuf[26];
218     time_t tnow = time(NULL);
219     fprintf(stderr, "%.24s PerconaFT file system space is really low and access is restricted\n", ctime_r(&tnow, tbuf)); fflush(stderr);
220 }
221 
222 static inline uint64_t
env_fs_redzone(DB_ENV * env,uint64_t total)223 env_fs_redzone(DB_ENV *env, uint64_t total) {
224     return total * env->i->redzone / 100;
225 }
226 
227 #define ZONEREPORTLIMIT 12
228 // Check the available space in the file systems used by tokuft and erect barriers when available space gets low.
229 static int
env_fs_poller(void * arg)230 env_fs_poller(void *arg) {
231     if(force_recovery == 6) {
232 	    return 0;
233     }
234     DB_ENV *env = (DB_ENV *) arg;
235     int r;
236 
237     int in_yellow; // set true to issue warning to user
238     int in_red;    // set true to prevent certain operations (returning ENOSPC)
239 
240     // get the fs sizes for the home dir
241     uint64_t avail_size = 0, total_size = 0;
242     r = toku_get_filesystem_sizes(env->i->dir, &avail_size, NULL, &total_size);
243     assert(r == 0);
244     in_yellow = (avail_size < 2 * env_fs_redzone(env, total_size));
245     in_red = (avail_size < env_fs_redzone(env, total_size));
246 
247     // get the fs sizes for the data dir if different than the home dir
248     if (strcmp(env->i->dir, env->i->real_data_dir) != 0) {
249         r = toku_get_filesystem_sizes(env->i->real_data_dir, &avail_size, NULL, &total_size);
250         assert(r == 0);
251         in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
252         in_red += (avail_size < env_fs_redzone(env, total_size));
253     }
254 
255     // get the fs sizes for the log dir if different than the home dir and data dir
256     if (strcmp(env->i->dir, env->i->real_log_dir) != 0 && strcmp(env->i->real_data_dir, env->i->real_log_dir) != 0) {
257         r = toku_get_filesystem_sizes(env->i->real_log_dir, &avail_size, NULL, &total_size);
258         assert(r == 0);
259         in_yellow += (avail_size < 2 * env_fs_redzone(env, total_size));
260         in_red += (avail_size < env_fs_redzone(env, total_size));
261     }
262 
263     env->i->fs_seq++;                    // how many times through this polling loop?
264     uint64_t now = env->i->fs_seq;
265 
266     // Don't issue report if we have not been out of this fs_state for a while, unless we're at system startup
267     switch (env->i->fs_state) {
268     case FS_RED:
269         if (!in_red) {
270             if (in_yellow) {
271                 env->i->fs_state = FS_YELLOW;
272             } else {
273                 env->i->fs_state = FS_GREEN;
274             }
275         }
276         break;
277     case FS_YELLOW:
278         if (in_red) {
279             if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
280                 env_fs_report_in_red(env);
281             env->i->fs_state = FS_RED;
282             env->i->last_seq_entered_red = now;
283         } else if (!in_yellow) {
284             env->i->fs_state = FS_GREEN;
285         }
286         break;
287     case FS_GREEN:
288         if (in_red) {
289             if ((now - env->i->last_seq_entered_red > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
290                 env_fs_report_in_red(env);
291             env->i->fs_state = FS_RED;
292             env->i->last_seq_entered_red = now;
293         } else if (in_yellow) {
294             if ((now - env->i->last_seq_entered_yellow > ZONEREPORTLIMIT) || (now < ZONEREPORTLIMIT))
295                 env_fs_report_in_yellow(env);
296             env->i->fs_state = FS_YELLOW;
297             env->i->last_seq_entered_yellow = now;
298         }
299         break;
300     default:
301         assert(0);
302     }
303     return 0;
304 }
305 #undef ZONEREPORTLIMIT
306 
307 static void
env_fs_init(DB_ENV * env)308 env_fs_init(DB_ENV *env) {
309     env->i->fs_state = FS_GREEN;
310     env->i->fs_poll_time = 5;  // seconds
311     env->i->redzone = 5;       // percent of total space
312     env->i->fs_poller_is_init = false;
313 }
314 
315 // Initialize the minicron that polls file system space
316 static int
env_fs_init_minicron(DB_ENV * env)317 env_fs_init_minicron(DB_ENV *env) {
318     if(force_recovery == 6) {
319         return 0;
320     }
321     int r = toku_minicron_setup(&env->i->fs_poller, env->i->fs_poll_time*1000, env_fs_poller, env);
322     if (r == 0)
323         env->i->fs_poller_is_init = true;
324     return r;
325 }
326 
327 // Destroy the file system space minicron
328 static void
env_fs_destroy(DB_ENV * env)329 env_fs_destroy(DB_ENV *env) {
330     if (env->i->fs_poller_is_init) {
331         int r = toku_minicron_shutdown(&env->i->fs_poller);
332         assert(r == 0);
333         env->i->fs_poller_is_init = false;
334     }
335 }
336 
337 static int
env_fsync_log_on_minicron(void * arg)338 env_fsync_log_on_minicron(void *arg) {
339     DB_ENV *env = (DB_ENV *) arg;
340     int r = env->log_flush(env, 0);
341     assert(r == 0);
342     return 0;
343 }
344 
345 static void
env_fsync_log_init(DB_ENV * env)346 env_fsync_log_init(DB_ENV *env) {
347     env->i->fsync_log_period_ms = 0;
348     env->i->fsync_log_cron_is_init = false;
349 }
350 
UU()351 static void UU()
352 env_change_fsync_log_period(DB_ENV* env, uint32_t period_ms) {
353     env->i->fsync_log_period_ms = period_ms;
354     if (env->i->fsync_log_cron_is_init) {
355         toku_minicron_change_period(&env->i->fsync_log_cron, period_ms);
356     }
357 }
358 
359 static int
env_fsync_log_cron_init(DB_ENV * env)360 env_fsync_log_cron_init(DB_ENV *env) {
361     int r = toku_minicron_setup(&env->i->fsync_log_cron, env->i->fsync_log_period_ms, env_fsync_log_on_minicron, env);
362     if (r == 0)
363         env->i->fsync_log_cron_is_init = true;
364     return r;
365 }
366 
367 static void
env_fsync_log_cron_destroy(DB_ENV * env)368 env_fsync_log_cron_destroy(DB_ENV *env) {
369     if (env->i->fsync_log_cron_is_init) {
370         int r = toku_minicron_shutdown(&env->i->fsync_log_cron);
371         assert(r == 0);
372         env->i->fsync_log_cron_is_init = false;
373     }
374 }
375 
376 static void
env_setup_real_dir(DB_ENV * env,char ** real_dir,const char * nominal_dir)377 env_setup_real_dir(DB_ENV *env, char **real_dir, const char *nominal_dir) {
378     toku_free(*real_dir);
379     *real_dir = NULL;
380 
381     assert(env->i->dir);
382     if (nominal_dir)
383         *real_dir = toku_construct_full_name(2, env->i->dir, nominal_dir);
384     else
385         *real_dir = toku_strdup(env->i->dir);
386 }
387 
388 static void
env_setup_real_data_dir(DB_ENV * env)389 env_setup_real_data_dir(DB_ENV *env) {
390     env_setup_real_dir(env, &env->i->real_data_dir, env->i->data_dir);
391 }
392 
393 static void
env_setup_real_log_dir(DB_ENV * env)394 env_setup_real_log_dir(DB_ENV *env) {
395     env_setup_real_dir(env, &env->i->real_log_dir, env->i->lg_dir);
396 }
397 
398 static void
env_setup_real_tmp_dir(DB_ENV * env)399 env_setup_real_tmp_dir(DB_ENV *env) {
400     env_setup_real_dir(env, &env->i->real_tmp_dir, env->i->tmp_dir);
401 }
402 
keep_cachetable_callback(DB_ENV * env,CACHETABLE cachetable)403 static void keep_cachetable_callback (DB_ENV *env, CACHETABLE cachetable)
404 {
405     env->i->cachetable = cachetable;
406 }
407 
408 static int
ydb_do_recovery(DB_ENV * env)409 ydb_do_recovery (DB_ENV *env) {
410     assert(env->i->real_log_dir);
411     int r = tokuft_recover(env,
412                            toku_keep_prepared_txn_callback,
413                            keep_cachetable_callback,
414                            env->i->logger,
415                            env->i->dir, env->i->real_log_dir, env->i->bt_compare,
416                            env->i->update_function,
417                            env->i->generate_row_for_put, env->i->generate_row_for_del,
418                            env->i->cachetable_size);
419     return r;
420 }
421 
422 static int
needs_recovery(DB_ENV * env)423 needs_recovery (DB_ENV *env) {
424     assert(env->i->real_log_dir);
425     int recovery_needed = tokuft_needs_recovery(env->i->real_log_dir, true);
426     return recovery_needed ? DB_RUNRECOVERY : 0;
427 }
428 
429 static int toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte, uint32_t min, uint32_t flags);
430 
431 // Keys used in persistent environment dictionary:
432 // Following keys added in version 12
433 static const char * orig_env_ver_key = "original_version";
434 static const char * curr_env_ver_key = "current_version";
435 // Following keys added in version 14, add more keys for future versions
436 static const char * creation_time_key         = "creation_time";
437 
get_upgrade_time_key(int version)438 static char * get_upgrade_time_key(int version) {
439     static char upgrade_time_key[sizeof("upgrade_v_time") + 12];
440     {
441         int n;
442         n = snprintf(upgrade_time_key, sizeof(upgrade_time_key), "upgrade_v%d_time", version);
443         assert(n >= 0 && n < (int)sizeof(upgrade_time_key));
444     }
445     return &upgrade_time_key[0];
446 }
447 
get_upgrade_footprint_key(int version)448 static char * get_upgrade_footprint_key(int version) {
449     static char upgrade_footprint_key[sizeof("upgrade_v_footprint") + 12];
450     {
451         int n;
452         n = snprintf(upgrade_footprint_key, sizeof(upgrade_footprint_key), "upgrade_v%d_footprint", version);
453         assert(n >= 0 && n < (int)sizeof(upgrade_footprint_key));
454     }
455     return &upgrade_footprint_key[0];
456 }
457 
get_upgrade_last_lsn_key(int version)458 static char * get_upgrade_last_lsn_key(int version) {
459     static char upgrade_last_lsn_key[sizeof("upgrade_v_last_lsn") + 12];
460     {
461         int n;
462         n = snprintf(upgrade_last_lsn_key, sizeof(upgrade_last_lsn_key), "upgrade_v%d_last_lsn", version);
463         assert(n >= 0 && n < (int)sizeof(upgrade_last_lsn_key));
464     }
465     return &upgrade_last_lsn_key[0];
466 }
467 
468 // Values read from (or written into) persistent environment,
469 // kept here for read-only access from engine status.
470 // Note, persistent_upgrade_status info is separate in part to simplify its exclusion from engine status until relevant.
471 typedef enum {
472     PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION = 0,
473     PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP,    // read from curr_env_ver_key, prev version as of this startup
474     PERSISTENT_UPGRADE_LAST_LSN_OF_V13,
475     PERSISTENT_UPGRADE_V14_TIME,
476     PERSISTENT_UPGRADE_V14_FOOTPRINT,
477     PERSISTENT_UPGRADE_STATUS_NUM_ROWS
478 } persistent_upgrade_status_entry;
479 
480 typedef struct {
481     bool initialized;
482     TOKU_ENGINE_STATUS_ROW_S status[PERSISTENT_UPGRADE_STATUS_NUM_ROWS];
483 } PERSISTENT_UPGRADE_STATUS_S, *PERSISTENT_UPGRADE_STATUS;
484 
485 static PERSISTENT_UPGRADE_STATUS_S persistent_upgrade_status;
486 
487 #define PERSISTENT_UPGRADE_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(persistent_upgrade_status, k, c, t, "upgrade: " l, inc)
488 
489 static void
persistent_upgrade_status_init(void)490 persistent_upgrade_status_init (void) {
491     // Note, this function initializes the keyname, type, and legend fields.
492     // Value fields are initialized to zero by compiler.
493 
494     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION,           nullptr, UINT64,   "original version (at time of environment creation)", TOKU_ENGINE_STATUS);
495     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP,  nullptr, UINT64,   "version at time of startup", TOKU_ENGINE_STATUS);
496     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_LAST_LSN_OF_V13,                nullptr, UINT64,   "last LSN of version 13", TOKU_ENGINE_STATUS);
497     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_TIME,                       nullptr, UNIXTIME, "time of upgrade to version 14", TOKU_ENGINE_STATUS);
498     PERSISTENT_UPGRADE_STATUS_INIT(PERSISTENT_UPGRADE_V14_FOOTPRINT,                  nullptr, UINT64,   "footprint from version 13 to 14", TOKU_ENGINE_STATUS);
499     persistent_upgrade_status.initialized = true;
500 }
501 
502 #define PERSISTENT_UPGRADE_STATUS_VALUE(x) persistent_upgrade_status.status[x].value.num
503 
504 // Requires: persistent environment dictionary is already open.
505 // Input arg is lsn of clean shutdown of previous version,
506 // or ZERO_LSN if no upgrade or if crash between log upgrade and here.
507 // NOTE: To maintain compatibility with previous versions, do not change the
508 //       format of any information stored in the persistent environment dictionary.
509 //       For example, some values are stored as 32 bits, even though they are immediately
510 //       converted to 64 bits when read.  Do not change them to be stored as 64 bits.
511 //
512 static int
maybe_upgrade_persistent_environment_dictionary(DB_ENV * env,DB_TXN * txn,LSN last_lsn_of_clean_shutdown_read_from_log)513 maybe_upgrade_persistent_environment_dictionary(DB_ENV * env, DB_TXN * txn, LSN last_lsn_of_clean_shutdown_read_from_log) {
514     int r;
515     DBT key, val;
516     DB *persistent_environment = env->i->persistent_environment;
517 
518     if (!persistent_upgrade_status.initialized)
519         persistent_upgrade_status_init();
520 
521     toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
522     toku_init_dbt(&val);
523     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
524     assert(r == 0);
525     uint32_t stored_env_version = toku_dtoh32(*(uint32_t*)val.data);
526     PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_STORED_ENV_VERSION_AT_STARTUP) = stored_env_version;
527     if (stored_env_version > FT_LAYOUT_VERSION)
528         r = TOKUDB_DICTIONARY_TOO_NEW;
529     else if (stored_env_version < FT_LAYOUT_MIN_SUPPORTED_VERSION)
530         r = TOKUDB_DICTIONARY_TOO_OLD;
531     else if (stored_env_version < FT_LAYOUT_VERSION) {
532         const uint32_t curr_env_ver_d = toku_htod32(FT_LAYOUT_VERSION);
533         toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
534         toku_fill_dbt(&val, &curr_env_ver_d, sizeof(curr_env_ver_d));
535         r = toku_db_put(persistent_environment, txn, &key, &val, 0, false);
536         assert_zero(r);
537 
538         time_t upgrade_time_d = toku_htod64(time(NULL));
539         uint64_t upgrade_footprint_d = toku_htod64(toku_log_upgrade_get_footprint());
540         uint64_t upgrade_last_lsn_d = toku_htod64(last_lsn_of_clean_shutdown_read_from_log.lsn);
541         for (int version = stored_env_version+1; version <= FT_LAYOUT_VERSION; version++) {
542             uint32_t put_flag = DB_NOOVERWRITE;
543             if (version <= FT_LAYOUT_VERSION_19) {
544                 // See #5902.
545                 // To prevent a crash (and any higher complexity code) we'll simply
546                 // silently not overwrite anything if it exists.
547                 // The keys existing for version <= 19 is not necessarily an error.
548                 // If this happens for versions > 19 it IS an error and we'll use DB_NOOVERWRITE.
549                 put_flag = DB_NOOVERWRITE_NO_ERROR;
550             }
551 
552 
553             char* upgrade_time_key = get_upgrade_time_key(version);
554             toku_fill_dbt(&key, upgrade_time_key, strlen(upgrade_time_key));
555             toku_fill_dbt(&val, &upgrade_time_d, sizeof(upgrade_time_d));
556             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
557             assert_zero(r);
558 
559             char* upgrade_footprint_key = get_upgrade_footprint_key(version);
560             toku_fill_dbt(&key, upgrade_footprint_key, strlen(upgrade_footprint_key));
561             toku_fill_dbt(&val, &upgrade_footprint_d, sizeof(upgrade_footprint_d));
562             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
563             assert_zero(r);
564 
565             char* upgrade_last_lsn_key = get_upgrade_last_lsn_key(version);
566             toku_fill_dbt(&key, upgrade_last_lsn_key, strlen(upgrade_last_lsn_key));
567             toku_fill_dbt(&val, &upgrade_last_lsn_d, sizeof(upgrade_last_lsn_d));
568             r = toku_db_put(persistent_environment, txn, &key, &val, put_flag, false);
569             assert_zero(r);
570         }
571 
572     }
573     return r;
574 }
575 
576 // Capture contents of persistent_environment dictionary so that it can be read by engine status
577 static void
capture_persistent_env_contents(DB_ENV * env,DB_TXN * txn)578 capture_persistent_env_contents (DB_ENV * env, DB_TXN * txn) {
579     int r;
580     DBT key, val;
581     DB *persistent_environment = env->i->persistent_environment;
582 
583     toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
584     toku_init_dbt(&val);
585     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
586     assert_zero(r);
587     uint32_t curr_env_version = toku_dtoh32(*(uint32_t*)val.data);
588     assert(curr_env_version == FT_LAYOUT_VERSION);
589 
590     toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
591     toku_init_dbt(&val);
592     r = toku_db_get(persistent_environment, txn, &key, &val, 0);
593     assert_zero(r);
594     uint64_t persistent_original_env_version = toku_dtoh32(*(uint32_t*)val.data);
595     PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_ORIGINAL_ENV_VERSION) = persistent_original_env_version;
596     assert(persistent_original_env_version <= curr_env_version);
597 
598     // make no assertions about timestamps, clock may have been reset
599     if (persistent_original_env_version >= FT_LAYOUT_VERSION_14) {
600         toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
601         toku_init_dbt(&val);
602         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
603         assert_zero(r);
604         STATUS_VALUE(YDB_LAYER_TIME_CREATION) = toku_dtoh64((*(time_t*)val.data));
605     }
606 
607     if (persistent_original_env_version != curr_env_version) {
608         // an upgrade was performed at some time, capture info about the upgrade
609 
610         char * last_lsn_key = get_upgrade_last_lsn_key(curr_env_version);
611         toku_fill_dbt(&key, last_lsn_key, strlen(last_lsn_key));
612         toku_init_dbt(&val);
613         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
614         assert_zero(r);
615         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_LAST_LSN_OF_V13) = toku_dtoh64(*(uint64_t*)val.data);
616 
617         char * time_key = get_upgrade_time_key(curr_env_version);
618         toku_fill_dbt(&key, time_key, strlen(time_key));
619         toku_init_dbt(&val);
620         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
621         assert_zero(r);
622         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_TIME) = toku_dtoh64(*(time_t*)val.data);
623 
624         char * footprint_key = get_upgrade_footprint_key(curr_env_version);
625         toku_fill_dbt(&key, footprint_key, strlen(footprint_key));
626         toku_init_dbt(&val);
627         r = toku_db_get(persistent_environment, txn, &key, &val, 0);
628         assert_zero(r);
629         PERSISTENT_UPGRADE_STATUS_VALUE(PERSISTENT_UPGRADE_V14_FOOTPRINT) = toku_dtoh64(*(uint64_t*)val.data);
630     }
631 
632 }
633 
634 // return 0 if log exists or ENOENT if log does not exist
635 static int
ydb_recover_log_exists(DB_ENV * env)636 ydb_recover_log_exists(DB_ENV *env) {
637     int r = tokuft_recover_log_exists(env->i->real_log_dir);
638     return r;
639 }
640 
641 // Validate that all required files are present, no side effects.
642 // Return 0 if all is well, ENOENT if some files are present but at least one is
643 // missing,
644 // other non-zero value if some other error occurs.
645 // Set *valid_newenv if creating a new environment (all files missing).
646 // (Note, if special dictionaries exist, then they were created transactionally
647 // and log should exist.)
validate_env(DB_ENV * env,bool * valid_newenv,bool need_rollback_cachefile)648 static int validate_env(DB_ENV *env,
649                         bool *valid_newenv,
650                         bool need_rollback_cachefile) {
651     int r;
652     bool expect_newenv = false;  // set true if we expect to create a new env
653     toku_struct_stat buf;
654     char *path = NULL;
655 
656     // Test for persistent environment
657     path = toku_construct_full_name(
658         2, env->i->dir, toku_product_name_strings.environmentdictionary);
659     assert(path);
660     r = toku_stat(path, &buf, toku_uninstrumented);
661     if (r == 0) {
662         expect_newenv = false;  // persistent info exists
663     } else {
664         int stat_errno = get_error_errno();
665         if (stat_errno == ENOENT) {
666             expect_newenv = true;
667             r = 0;
668         } else {
669             r = toku_ydb_do_error(
670                 env,
671                 stat_errno,
672                 "Unable to access persistent environment [%s] in [%s]\n",
673                 toku_product_name_strings.environmentdictionary,
674                 env->i->dir);
675             assert(r);
676         }
677     }
678     toku_free(path);
679 
680     // Test for existence of rollback cachefile if it is expected to exist
681     if (r == 0 && need_rollback_cachefile) {
682         path = toku_construct_full_name(
683             2, env->i->dir, toku_product_name_strings.rollback_cachefile);
684         assert(path);
685         r = toku_stat(path, &buf, toku_uninstrumented);
686         if (r == 0) {
687             if (expect_newenv)  // rollback cachefile exists, but persistent env
688                                 // is missing
689                 r = toku_ydb_do_error(
690                     env,
691                     ENOENT,
692                     "Persistent environment is missing while looking for "
693                     "rollback cachefile [%s] in [%s]\n",
694                     toku_product_name_strings.rollback_cachefile, env->i->dir);
695         } else {
696             int stat_errno = get_error_errno();
697             if (stat_errno == ENOENT) {
698                 if (!expect_newenv)  // rollback cachefile is missing but
699                                      // persistent env exists
700                     r = toku_ydb_do_error(
701                         env,
702                         ENOENT,
703                         "rollback cachefile [%s] is missing from [%s]\n",
704                         toku_product_name_strings.rollback_cachefile,
705                         env->i->dir);
706                 else
707                     r = 0;  // both rollback cachefile and persistent env are
708                             // missing
709             } else {
710                 r = toku_ydb_do_error(
711                     env,
712                     stat_errno,
713                     "Unable to access rollback cachefile [%s] in [%s]\n",
714                     toku_product_name_strings.rollback_cachefile,
715                     env->i->dir);
716                 assert(r);
717             }
718         }
719         toku_free(path);
720     }
721 
722     // Test for fileops directory
723     if (r == 0 && force_recovery != 6) {
724         path = toku_construct_full_name(
725             2, env->i->dir, toku_product_name_strings.fileopsdirectory);
726         assert(path);
727         r = toku_stat(path, &buf, toku_uninstrumented);
728         if (r == 0) {
729             if (expect_newenv)  // fileops directory exists, but persistent env
730                                 // is missing
731                 r = toku_ydb_do_error(
732                     env,
733                     ENOENT,
734                     "Persistent environment is missing while looking for "
735                     "fileops directory [%s] in [%s]\n",
736                     toku_product_name_strings.fileopsdirectory,
737                     env->i->dir);
738         } else {
739             int stat_errno = get_error_errno();
740             if (stat_errno == ENOENT) {
741                 if (!expect_newenv)  // fileops directory is missing but
742                                      // persistent env exists
743                     r = toku_ydb_do_error(
744                         env,
745                         ENOENT,
746                         "Fileops directory [%s] is missing from [%s]\n",
747                         toku_product_name_strings.fileopsdirectory,
748                         env->i->dir);
749                 else
750                     r = 0;  // both fileops directory and persistent env are
751                             // missing
752             } else {
753                 r = toku_ydb_do_error(
754                     env,
755                     stat_errno,
756                     "Unable to access fileops directory [%s] in [%s]\n",
757                     toku_product_name_strings.fileopsdirectory,
758                     env->i->dir);
759                 assert(r);
760             }
761         }
762         toku_free(path);
763     }
764 
765     // Test for recovery log
766     if ((r == 0) && (env->i->open_flags & DB_INIT_LOG) && force_recovery != 6) {
767         // if using transactions, test for existence of log
768         r = ydb_recover_log_exists(env);  // return 0 or ENOENT
769         if (expect_newenv && (r != ENOENT))
770             r = toku_ydb_do_error(env,
771                                   ENOENT,
772                                   "Persistent environment information is "
773                                   "missing (but log exists) while looking for "
774                                   "recovery log files in [%s]\n",
775                                   env->i->real_log_dir);
776         else if (!expect_newenv && r == ENOENT)
777             r = toku_ydb_do_error(env,
778                                   ENOENT,
779                                   "Recovery log is missing (persistent "
780                                   "environment information is present) while "
781                                   "looking for recovery log files in [%s]\n",
782                                   env->i->real_log_dir);
783         else
784             r = 0;
785     }
786 
787     if (r == 0)
788         *valid_newenv = expect_newenv;
789     else
790         *valid_newenv = false;
791     return r;
792 }
793 
794 // The version of the environment (on disk) is the version of the recovery log.
795 // If the recovery log is of the current version, then there is no upgrade to be done.
796 // If the recovery log is of an old version, then replacing it with a new recovery log
797 // of the current version is how the upgrade is done.
798 // Note, the upgrade procedure takes a checkpoint, so we must release the ydb lock.
799 static int
ydb_maybe_upgrade_env(DB_ENV * env,LSN * last_lsn_of_clean_shutdown_read_from_log,bool * upgrade_in_progress)800 ydb_maybe_upgrade_env (DB_ENV *env, LSN * last_lsn_of_clean_shutdown_read_from_log, bool * upgrade_in_progress) {
801     int r = 0;
802     if (env->i->open_flags & DB_INIT_TXN && env->i->open_flags & DB_INIT_LOG) {
803         r = toku_maybe_upgrade_log(env->i->dir, env->i->real_log_dir, last_lsn_of_clean_shutdown_read_from_log, upgrade_in_progress);
804     }
805     return r;
806 }
807 
808 static void
unlock_single_process(DB_ENV * env)809 unlock_single_process(DB_ENV *env) {
810     int r;
811     r = toku_single_process_unlock(&env->i->envdir_lockfd);
812     lazy_assert_zero(r);
813     r = toku_single_process_unlock(&env->i->datadir_lockfd);
814     lazy_assert_zero(r);
815     r = toku_single_process_unlock(&env->i->logdir_lockfd);
816     lazy_assert_zero(r);
817     r = toku_single_process_unlock(&env->i->tmpdir_lockfd);
818     lazy_assert_zero(r);
819 }
820 
821 // Open the environment.
822 // If this is a new environment, then create the necessary files.
823 // Return 0 on success, ENOENT if any of the expected necessary files are missing.
824 // (The set of necessary files is defined in the function validate_env() above.)
825 static int
env_open(DB_ENV * env,const char * home,uint32_t flags,int mode)826 env_open(DB_ENV * env, const char *home, uint32_t flags, int mode) {
827 
828     if(force_recovery == 6) {
829         {
830             const int len = strlen(toku_product_name_strings.rollback_cachefile);
831             toku_product_name_strings.rollback_cachefile[len] = '2';
832             toku_product_name_strings.rollback_cachefile[len+1] = 0;
833         }
834 
835         {
836             const int len = strlen(toku_product_name_strings.single_process_lock);
837             toku_product_name_strings.single_process_lock[len] = '2';
838             toku_product_name_strings.single_process_lock[len+1] = 0;
839         }
840 
841         {
842             const int len = strlen(toku_product_name_strings.environmentdictionary);
843             toku_product_name_strings.environmentdictionary[len] = '2';
844             toku_product_name_strings.environmentdictionary[len+1] = 0;
845         }
846     }
847 
848     HANDLE_PANICKED_ENV(env);
849     int r;
850     bool newenv;  // true iff creating a new environment
851     uint32_t unused_flags=flags;
852     CHECKPOINTER cp;
853     DB_TXN *txn = NULL;
854 
855     if (env_opened(env)) {
856         r = toku_ydb_do_error(env, EINVAL, "The environment is already open\n");
857         goto cleanup;
858     }
859 
860     if (env->get_check_thp(env) && toku_os_huge_pages_enabled()) {
861         r = toku_ydb_do_error(env, TOKUDB_HUGE_PAGES_ENABLED,
862                               "Huge pages are enabled, disable them before continuing\n");
863         goto cleanup;
864     }
865 
866     most_recent_env = NULL;
867 
868     assert(sizeof(time_t) == sizeof(uint64_t));
869 
870     HANDLE_EXTRA_FLAGS(env, flags,
871                        DB_CREATE|DB_PRIVATE|DB_INIT_LOG|DB_INIT_TXN|DB_RECOVER|DB_INIT_MPOOL|DB_INIT_LOCK|DB_THREAD);
872 
873     // DB_CREATE means create if env does not exist, and PerconaFT requires it because
874     // PerconaFT requries DB_PRIVATE.
875     if ((flags & DB_PRIVATE) && !(flags & DB_CREATE)) {
876         r = toku_ydb_do_error(env, ENOENT, "DB_PRIVATE requires DB_CREATE (seems gratuitous to us, but that's BDB's behavior\n");
877         goto cleanup;
878     }
879 
880     if (!(flags & DB_PRIVATE)) {
881         r = toku_ydb_do_error(env, ENOENT, "PerconaFT requires DB_PRIVATE\n");
882         goto cleanup;
883     }
884 
885     if ((flags & DB_INIT_LOG) && !(flags & DB_INIT_TXN)) {
886         r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires transactions for logging\n");
887         goto cleanup;
888     }
889 
890     if (!home) home = ".";
891 
892     // Verify that the home exists.
893     toku_struct_stat buf;
894     r = toku_stat(home, &buf, toku_uninstrumented);
895     if (r != 0) {
896         int e = get_error_errno();
897         r = toku_ydb_do_error(
898             env, e, "Error from toku_stat(\"%s\",...)\n", home);
899         goto cleanup;
900     }
901     unused_flags &= ~DB_PRIVATE;
902 
903     if (env->i->dir) {
904         toku_free(env->i->dir);
905     }
906     env->i->dir = toku_strdup(home);
907     if (env->i->dir == 0) {
908         r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
909         goto cleanup;
910     }
911     env->i->open_flags = flags;
912     env->i->open_mode = mode;
913 
914     // Instrumentation probe start
915     TOKU_PROBE_START(toku_instr_probe_1);
916 
917     env_setup_real_data_dir(env);
918     env_setup_real_log_dir(env);
919     env_setup_real_tmp_dir(env);
920 
921     // Instrumentation probe stop
922     toku_instr_probe_1->stop();
923 
924     r = toku_single_process_lock(
925         env->i->dir, "environment", &env->i->envdir_lockfd);
926     if (r != 0)
927         goto cleanup;
928     r = toku_single_process_lock(
929         env->i->real_data_dir, "data", &env->i->datadir_lockfd);
930     if (r!=0) goto cleanup;
931     r = toku_single_process_lock(env->i->real_log_dir, "logs", &env->i->logdir_lockfd);
932     if (r!=0) goto cleanup;
933     r = toku_single_process_lock(env->i->real_tmp_dir, "temp", &env->i->tmpdir_lockfd);
934     if (r!=0) goto cleanup;
935 
936     bool need_rollback_cachefile;
937     need_rollback_cachefile = false;
938     if (flags & (DB_INIT_TXN | DB_INIT_LOG) && force_recovery != 6) {
939         need_rollback_cachefile = true;
940     }
941 
942     ydb_layer_status_init();  // do this before possibly upgrading, so upgrade work is counted in status counters
943 
944     LSN last_lsn_of_clean_shutdown_read_from_log;
945     last_lsn_of_clean_shutdown_read_from_log = ZERO_LSN;
946     bool upgrade_in_progress;
947     upgrade_in_progress = false;
948     r = ydb_maybe_upgrade_env(env, &last_lsn_of_clean_shutdown_read_from_log, &upgrade_in_progress);
949     if (r!=0) goto cleanup;
950 
951     if (upgrade_in_progress || force_recovery == 6) {
952         // Delete old rollback file.  There was a clean shutdown, so it has nothing useful,
953         // and there is no value in upgrading it.  It is simpler to just create a new one.
954         char* rollback_filename = toku_construct_full_name(2, env->i->dir, toku_product_name_strings.rollback_cachefile);
955         assert(rollback_filename);
956         r = unlink(rollback_filename);
957         if (r != 0) {
958             assert(get_error_errno() == ENOENT);
959         }
960         toku_free(rollback_filename);
961         need_rollback_cachefile = false;  // we're not expecting it to exist now
962     }
963 
964     r = validate_env(env, &newenv, need_rollback_cachefile);  // make sure that environment is either new or complete
965     if (r != 0) goto cleanup;
966 
967     unused_flags &= ~DB_INIT_TXN & ~DB_INIT_LOG;
968 
969     if(force_recovery == 6) {
970         flags |= DB_INIT_LOG | DB_INIT_TXN;
971     }
972 
973     // do recovery only if there exists a log and recovery is requested
974     // otherwise, a log is created when the logger is opened later
975     if (!newenv && force_recovery == 0) {
976         if (flags & DB_INIT_LOG) {
977             // the log does exist
978             if (flags & DB_RECOVER) {
979                 r = ydb_do_recovery(env);
980                 if (r != 0) goto cleanup;
981             } else {
982                 // the log is required to have clean shutdown if recovery is not requested
983                 r = needs_recovery(env);
984                 if (r != 0) goto cleanup;
985             }
986         }
987     }
988 
989     toku_loader_cleanup_temp_files(env);
990 
991     if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
992         assert(env->i->logger);
993         toku_logger_write_log_files(env->i->logger, (bool)((flags & DB_INIT_LOG) != 0));
994         if (!toku_logger_is_open(env->i->logger)) {
995             r = toku_logger_open(env->i->real_log_dir, env->i->logger);
996             if (r!=0) {
997                 toku_ydb_do_error(env, r, "Could not open logger\n");
998             }
999         }
1000     } else {
1001         r = toku_logger_close(&env->i->logger); // if no logging system, then kill the logger
1002         assert_zero(r);
1003     }
1004 
1005     unused_flags &= ~DB_INIT_MPOOL; // we always init an mpool.
1006     unused_flags &= ~DB_CREATE;     // we always do DB_CREATE
1007     unused_flags &= ~DB_INIT_LOCK;  // we check this later (e.g. in db->open)
1008     unused_flags &= ~DB_RECOVER;
1009 
1010 // This is probably correct, but it will be pain...
1011 //    if ((flags & DB_THREAD)==0) {
1012 //        r = toku_ydb_do_error(env, EINVAL, "PerconaFT requires DB_THREAD");
1013 //        goto cleanup;
1014 //    }
1015     unused_flags &= ~DB_THREAD;
1016 
1017     if (unused_flags!=0) {
1018         r = toku_ydb_do_error(env, EINVAL, "Extra flags not understood by tokuft: %u\n", unused_flags);
1019         goto cleanup;
1020     }
1021 
1022     if (env->i->cachetable==NULL) {
1023         // If we ran recovery then the cachetable should be set here.
1024         r = toku_cachetable_create_ex(&env->i->cachetable, env->i->cachetable_size,
1025                                    env->i->client_pool_threads,
1026                                    env->i->cachetable_pool_threads,
1027                                    env->i->checkpoint_pool_threads,
1028                                    ZERO_LSN, env->i->logger);
1029         if (r != 0) {
1030             r = toku_ydb_do_error(env, r, "Cant create a cachetable\n");
1031             goto cleanup;
1032         }
1033     }
1034 
1035     toku_cachetable_set_env_dir(env->i->cachetable, env->i->dir);
1036 
1037     int using_txns;
1038     using_txns = env->i->open_flags & DB_INIT_TXN;
1039     if (env->i->logger) {
1040         // if this is a newborn env or if this is an upgrade, then create a brand new rollback file
1041         assert (using_txns);
1042         toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
1043         if (!toku_logger_rollback_is_open(env->i->logger)) {
1044             bool create_new_rollback_file = newenv | upgrade_in_progress | (force_recovery == 6);
1045             r = toku_logger_open_rollback(env->i->logger, env->i->cachetable, create_new_rollback_file);
1046             if (r != 0) {
1047                 r = toku_ydb_do_error(env, r, "Cant open rollback\n");
1048                 goto cleanup;
1049             }
1050         }
1051     }
1052 
1053     if (using_txns) {
1054         r = toku_txn_begin(env, 0, &txn, 0);
1055         assert_zero(r);
1056     }
1057 
1058     {
1059         r = toku_db_create(&env->i->persistent_environment, env, 0);
1060         assert_zero(r);
1061         r = toku_db_use_builtin_key_cmp(env->i->persistent_environment);
1062         assert_zero(r);
1063 	writing_rollback++;
1064         r = toku_db_open_iname(env->i->persistent_environment, txn, toku_product_name_strings.environmentdictionary, DB_CREATE, mode);
1065         if (r != 0) {
1066             r = toku_ydb_do_error(env, r, "Cant open persistent env\n");
1067             goto cleanup;
1068         }
1069         if (newenv) {
1070             // create new persistent_environment
1071             DBT key, val;
1072             uint32_t persistent_original_env_version = FT_LAYOUT_VERSION;
1073             const uint32_t environment_version = toku_htod32(persistent_original_env_version);
1074 
1075             toku_fill_dbt(&key, orig_env_ver_key, strlen(orig_env_ver_key));
1076             toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1077             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1078             assert_zero(r);
1079 
1080             toku_fill_dbt(&key, curr_env_ver_key, strlen(curr_env_ver_key));
1081             toku_fill_dbt(&val, &environment_version, sizeof(environment_version));
1082             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1083             assert_zero(r);
1084 
1085             time_t creation_time_d = toku_htod64(time(NULL));
1086             toku_fill_dbt(&key, creation_time_key, strlen(creation_time_key));
1087             toku_fill_dbt(&val, &creation_time_d, sizeof(creation_time_d));
1088             r = toku_db_put(env->i->persistent_environment, txn, &key, &val, 0, false);
1089             assert_zero(r);
1090         }
1091         else {
1092             r = maybe_upgrade_persistent_environment_dictionary(env, txn, last_lsn_of_clean_shutdown_read_from_log);
1093             assert_zero(r);
1094         }
1095         capture_persistent_env_contents(env, txn);
1096 	writing_rollback--;
1097     }
1098     {
1099         r = toku_db_create(&env->i->directory, env, 0);
1100         assert_zero(r);
1101         r = toku_db_use_builtin_key_cmp(env->i->directory);
1102         assert_zero(r);
1103         r = toku_db_open_iname(env->i->directory, txn, toku_product_name_strings.fileopsdirectory, DB_CREATE, mode);
1104         if (r != 0) {
1105             r = toku_ydb_do_error(env, r, "Cant open %s\n", toku_product_name_strings.fileopsdirectory);
1106             goto cleanup;
1107         }
1108     }
1109     if (using_txns) {
1110         r = locked_txn_commit(txn, 0);
1111         assert_zero(r);
1112         txn = NULL;
1113     }
1114     cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1115     if (!force_recovery) {
1116         r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, STARTUP_CHECKPOINT);
1117     }
1118     writing_rollback--;
1119     env_fs_poller(env);          // get the file system state at startup
1120     r = env_fs_init_minicron(env);
1121     if (r != 0) {
1122         r = toku_ydb_do_error(env, r, "Cant create fs minicron\n");
1123         goto cleanup;
1124     }
1125     r = env_fsync_log_cron_init(env);
1126     if (r != 0) {
1127         r = toku_ydb_do_error(env, r, "Cant create fsync log minicron\n");
1128         goto cleanup;
1129     }
1130 cleanup:
1131     if (r!=0) {
1132         if (txn) {
1133             locked_txn_abort(txn);
1134         }
1135         if (env && env->i) {
1136             unlock_single_process(env);
1137         }
1138     }
1139     if (r == 0) {
1140         set_errno(0); // tabula rasa.   If there's a crash after env was successfully opened, no misleading errno will have been left around by this code.
1141         most_recent_env = env;
1142         uint64_t num_rows;
1143         env_get_engine_status_num_rows(env, &num_rows);
1144         toku_assert_set_fpointers(toku_maybe_get_engine_status_text, toku_maybe_err_engine_status, toku_maybe_set_env_panic, num_rows);
1145     }
1146     return r;
1147 }
1148 
1149 static int
env_close(DB_ENV * env,uint32_t flags)1150 env_close(DB_ENV * env, uint32_t flags) {
1151     int r = 0;
1152     const char * err_msg = NULL;
1153     bool clean_shutdown = true;
1154 
1155     if (flags & TOKUFT_DIRTY_SHUTDOWN) {
1156         clean_shutdown = false;
1157         flags &= ~TOKUFT_DIRTY_SHUTDOWN;
1158     }
1159 
1160     most_recent_env = NULL; // Set most_recent_env to NULL so that we don't have a dangling pointer (and if there's an error, the toku assert code would try to look at the env.)
1161 
1162     // if panicked, or if any open transactions, or any open dbs, then do nothing.
1163 
1164     if (toku_env_is_panicked(env)) {
1165         goto panic_and_quit_early;
1166     }
1167     if (env->i->logger && toku_logger_txns_exist(env->i->logger)) {
1168         err_msg = "Cannot close environment due to open transactions\n";
1169         r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1170         goto panic_and_quit_early;
1171     }
1172     if (env->i->open_dbs_by_dname) { //Verify that there are no open dbs.
1173         if (env->i->open_dbs_by_dname->size() > 0) {
1174             err_msg = "Cannot close environment due to open DBs\n";
1175             r = toku_ydb_do_error(env, EINVAL, "%s", err_msg);
1176             goto panic_and_quit_early;
1177         }
1178     }
1179     if (env->i->persistent_environment) {
1180         r = toku_db_close(env->i->persistent_environment);
1181         if (r) {
1182             err_msg = "Cannot close persistent environment dictionary (DB->close error)\n";
1183             toku_ydb_do_error(env, r, "%s", err_msg);
1184             goto panic_and_quit_early;
1185         }
1186     }
1187     if (env->i->directory) {
1188         r = toku_db_close(env->i->directory);
1189         if (r) {
1190             err_msg = "Cannot close Directory dictionary (DB->close error)\n";
1191             toku_ydb_do_error(env, r, "%s", err_msg);
1192             goto panic_and_quit_early;
1193         }
1194     }
1195     env_fsync_log_cron_destroy(env);
1196     if (env->i->cachetable) {
1197         toku_cachetable_prepare_close(env->i->cachetable);
1198         toku_cachetable_minicron_shutdown(env->i->cachetable);
1199         if (env->i->logger) {
1200             CHECKPOINTER cp = nullptr;
1201             if (clean_shutdown) {
1202                 cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1203                 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1204                 if (r) {
1205                     err_msg = "Cannot close environment (error during checkpoint)\n";
1206                     toku_ydb_do_error(env, r, "%s", err_msg);
1207                     goto panic_and_quit_early;
1208                 }
1209             }
1210             toku_logger_close_rollback_check_empty(env->i->logger, clean_shutdown);
1211             if (clean_shutdown) {
1212                 //Do a second checkpoint now that the rollback cachefile is closed.
1213                 r = toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, SHUTDOWN_CHECKPOINT);
1214                 if (r) {
1215                     err_msg = "Cannot close environment (error during checkpoint)\n";
1216                     toku_ydb_do_error(env, r, "%s", err_msg);
1217                     goto panic_and_quit_early;
1218                 }
1219                 toku_logger_shutdown(env->i->logger);
1220             }
1221         }
1222         toku_cachetable_close(&env->i->cachetable);
1223     }
1224     if (env->i->logger) {
1225         r = toku_logger_close(&env->i->logger);
1226         if (r) {
1227             err_msg = "Cannot close environment (logger close error)\n";
1228             env->i->logger = NULL;
1229             toku_ydb_do_error(env, r, "%s", err_msg);
1230             goto panic_and_quit_early;
1231         }
1232     }
1233     // Even if nothing else went wrong, but we were panicked, then raise an error.
1234     // But if something else went wrong then raise that error (above)
1235     if (toku_env_is_panicked(env)) {
1236         goto panic_and_quit_early;
1237     } else {
1238         assert(env->i->panic_string == 0);
1239     }
1240 
1241     env_fs_destroy(env);
1242     env->i->ltm.destroy();
1243     if (env->i->data_dir)
1244         toku_free(env->i->data_dir);
1245     if (env->i->lg_dir)
1246         toku_free(env->i->lg_dir);
1247     if (env->i->tmp_dir)
1248         toku_free(env->i->tmp_dir);
1249     if (env->i->real_data_dir)
1250         toku_free(env->i->real_data_dir);
1251     if (env->i->real_log_dir)
1252         toku_free(env->i->real_log_dir);
1253     if (env->i->real_tmp_dir)
1254         toku_free(env->i->real_tmp_dir);
1255     if (env->i->open_dbs_by_dname) {
1256         env->i->open_dbs_by_dname->destroy();
1257         toku_free(env->i->open_dbs_by_dname);
1258     }
1259     if (env->i->open_dbs_by_dict_id) {
1260         env->i->open_dbs_by_dict_id->destroy();
1261         toku_free(env->i->open_dbs_by_dict_id);
1262     }
1263     if (env->i->dir)
1264         toku_free(env->i->dir);
1265     toku_pthread_rwlock_destroy(&env->i->open_dbs_rwlock);
1266 
1267     // Immediately before freeing internal environment unlock the directories
1268     unlock_single_process(env);
1269     toku_free(env->i);
1270     toku_free(env);
1271     toku_sync_fetch_and_add(&tokuft_num_envs, -1);
1272     if (flags != 0) {
1273         r = EINVAL;
1274     }
1275     return r;
1276 
1277 panic_and_quit_early:
1278     //release lock files.
1279     unlock_single_process(env);
1280     //r is the panic error
1281     if (toku_env_is_panicked(env)) {
1282         char *panic_string = env->i->panic_string;
1283         r = toku_ydb_do_error(env, toku_env_is_panicked(env), "Cannot close environment due to previous error: %s\n", panic_string);
1284     }
1285     else {
1286         env_panic(env, r, err_msg);
1287     }
1288     return r;
1289 }
1290 
1291 static int
env_log_archive(DB_ENV * env,char ** list[],uint32_t flags)1292 env_log_archive(DB_ENV * env, char **list[], uint32_t flags) {
1293     return toku_logger_log_archive(env->i->logger, list, flags);
1294 }
1295 
1296 static int
env_log_flush(DB_ENV * env,const DB_LSN * lsn)1297 env_log_flush(DB_ENV * env, const DB_LSN * lsn __attribute__((__unused__))) {
1298     HANDLE_PANICKED_ENV(env);
1299     // do nothing if no logger
1300     if (env->i->logger) {
1301         // We just flush everything. MySQL uses lsn == 0 which means flush everything.
1302         // For anyone else using the log, it is correct to flush too much, so we are OK.
1303         toku_logger_fsync(env->i->logger);
1304     }
1305     return 0;
1306 }
1307 
1308 static int
env_set_cachesize(DB_ENV * env,uint32_t gbytes,uint32_t bytes,int ncache)1309 env_set_cachesize(DB_ENV * env, uint32_t gbytes, uint32_t bytes, int ncache) {
1310     HANDLE_PANICKED_ENV(env);
1311     if (ncache != 1) {
1312         return EINVAL;
1313     }
1314     uint64_t cs64 = ((uint64_t) gbytes << 30) + bytes;
1315     unsigned long cs = cs64;
1316     if (cs64 > cs) {
1317         return EINVAL;
1318     }
1319     env->i->cachetable_size = cs;
1320     return 0;
1321 }
1322 
1323 static int
env_set_client_pool_threads(DB_ENV * env,uint32_t threads)1324 env_set_client_pool_threads(DB_ENV * env, uint32_t threads) {
1325     HANDLE_PANICKED_ENV(env);
1326     env->i->client_pool_threads = threads;
1327     return 0;
1328 }
1329 
1330 static int
env_set_cachetable_pool_threads(DB_ENV * env,uint32_t threads)1331 env_set_cachetable_pool_threads(DB_ENV * env, uint32_t threads) {
1332     HANDLE_PANICKED_ENV(env);
1333     env->i->cachetable_pool_threads = threads;
1334     return 0;
1335 }
1336 
1337 static int
env_set_checkpoint_pool_threads(DB_ENV * env,uint32_t threads)1338 env_set_checkpoint_pool_threads(DB_ENV * env, uint32_t threads) {
1339     HANDLE_PANICKED_ENV(env);
1340     env->i->checkpoint_pool_threads = threads;
1341     return 0;
1342 }
1343 
1344 static void
env_set_check_thp(DB_ENV * env,bool new_val)1345 env_set_check_thp(DB_ENV * env, bool new_val) {
1346     assert(env);
1347     env->i->check_thp = new_val;
1348 }
1349 
1350 static bool
env_get_check_thp(DB_ENV * env)1351 env_get_check_thp(DB_ENV * env) {
1352     assert(env);
1353     return env->i->check_thp;
1354 }
1355 
env_set_dir_per_db(DB_ENV * env,bool new_val)1356 static bool env_set_dir_per_db(DB_ENV *env, bool new_val) {
1357     HANDLE_PANICKED_ENV(env);
1358     bool r = env->i->dir_per_db;
1359     env->i->dir_per_db = new_val;
1360     return r;
1361 }
1362 
env_get_dir_per_db(DB_ENV * env)1363 static bool env_get_dir_per_db(DB_ENV *env) {
1364     HANDLE_PANICKED_ENV(env);
1365     return env->i->dir_per_db;
1366 }
1367 
env_get_data_dir(DB_ENV * env)1368 static const char *env_get_data_dir(DB_ENV *env) {
1369     return env->i->real_data_dir;
1370 }
1371 
env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1372 static int env_dirtool_attach(DB_ENV *env,
1373                               DB_TXN *txn,
1374                               const char *dname,
1375                               const char *iname) {
1376     int r;
1377     DBT dname_dbt;
1378     DBT iname_dbt;
1379 
1380     HANDLE_PANICKED_ENV(env);
1381     if (!env_opened(env)) {
1382         return EINVAL;
1383     }
1384     HANDLE_READ_ONLY_TXN(txn);
1385     toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1386     toku_fill_dbt(&iname_dbt, iname, strlen(iname) + 1);
1387 
1388     r = toku_db_put(env->i->directory,
1389                     txn,
1390                     &dname_dbt,
1391                     &iname_dbt,
1392                     0,
1393                     true);
1394         return r;
1395 }
1396 
env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1397 static int env_dirtool_detach(DB_ENV *env,
1398                               DB_TXN *txn,
1399                               const char *dname) {
1400     int r;
1401     DBT dname_dbt;
1402     DBT old_iname_dbt;
1403 
1404     HANDLE_PANICKED_ENV(env);
1405     if (!env_opened(env)) {
1406         return EINVAL;
1407     }
1408     HANDLE_READ_ONLY_TXN(txn);
1409 
1410     toku_fill_dbt(&dname_dbt, dname, strlen(dname) + 1);
1411     toku_init_dbt_flags(&old_iname_dbt, DB_DBT_REALLOC);
1412 
1413     r = toku_db_get(env->i->directory,
1414                     txn,
1415                     &dname_dbt,
1416                     &old_iname_dbt,
1417                     DB_SERIALIZABLE);  // allocates memory for iname
1418     if (r == DB_NOTFOUND)
1419         return EEXIST;
1420     toku_free(old_iname_dbt.data);
1421 
1422     r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
1423 
1424     return r;
1425 }
1426 
env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1427 static int env_dirtool_move(DB_ENV *env,
1428                             DB_TXN *txn,
1429                             const char *old_dname,
1430                             const char *new_dname) {
1431     int r;
1432     DBT old_dname_dbt;
1433     DBT new_dname_dbt;
1434     DBT iname_dbt;
1435 
1436     HANDLE_PANICKED_ENV(env);
1437     if (!env_opened(env)) {
1438         return EINVAL;
1439     }
1440     HANDLE_READ_ONLY_TXN(txn);
1441 
1442     toku_fill_dbt(&old_dname_dbt, old_dname, strlen(old_dname) + 1);
1443     toku_fill_dbt(&new_dname_dbt, new_dname, strlen(new_dname) + 1);
1444     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
1445 
1446     r = toku_db_get(env->i->directory,
1447                     txn,
1448                     &old_dname_dbt,
1449                     &iname_dbt,
1450                     DB_SERIALIZABLE);  // allocates memory for iname
1451     if (r == DB_NOTFOUND)
1452         return EEXIST;
1453 
1454     r = toku_db_del(
1455         env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
1456     if (r != 0)
1457         goto exit;
1458 
1459     r = toku_db_put(
1460         env->i->directory, txn, &new_dname_dbt, &iname_dbt, 0, true);
1461 
1462 exit:
1463     toku_free(iname_dbt.data);
1464     return r;
1465 }
1466 
locked_env_op(DB_ENV * env,DB_TXN * txn,std::function<int (DB_TXN *)> f)1467 static int locked_env_op(DB_ENV *env,
1468                          DB_TXN *txn,
1469                          std::function<int(DB_TXN *)> f) {
1470     int ret, r;
1471     HANDLE_READ_ONLY_TXN(txn);
1472     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1473 
1474     DB_TXN *child_txn = NULL;
1475     int using_txns = env->i->open_flags & DB_INIT_TXN;
1476     if (using_txns) {
1477         ret = toku_txn_begin(env, txn, &child_txn, 0);
1478         lazy_assert_zero(ret);
1479     }
1480 
1481     // cannot begin a checkpoint
1482     toku_multi_operation_client_lock();
1483     r = f(child_txn);
1484     toku_multi_operation_client_unlock();
1485 
1486     if (using_txns) {
1487         if (r == 0) {
1488             ret = locked_txn_commit(child_txn, 0);
1489             lazy_assert_zero(ret);
1490         } else {
1491             ret = locked_txn_abort(child_txn);
1492             lazy_assert_zero(ret);
1493         }
1494     }
1495     return r;
1496 
1497 }
1498 
locked_env_dirtool_attach(DB_ENV * env,DB_TXN * txn,const char * dname,const char * iname)1499 static int locked_env_dirtool_attach(DB_ENV *env,
1500                                      DB_TXN *txn,
1501                                      const char *dname,
1502                                      const char *iname) {
1503     auto f = std::bind(
1504         env_dirtool_attach, env, std::placeholders::_1, dname, iname);
1505     return locked_env_op(env, txn, f);
1506 }
1507 
locked_env_dirtool_detach(DB_ENV * env,DB_TXN * txn,const char * dname)1508 static int locked_env_dirtool_detach(DB_ENV *env,
1509                                      DB_TXN *txn,
1510                                      const char *dname) {
1511     auto f = std::bind(
1512         env_dirtool_detach, env, std::placeholders::_1, dname);
1513     return locked_env_op(env, txn, f);
1514 }
1515 
locked_env_dirtool_move(DB_ENV * env,DB_TXN * txn,const char * old_dname,const char * new_dname)1516 static int locked_env_dirtool_move(DB_ENV *env,
1517                                    DB_TXN *txn,
1518                                    const char *old_dname,
1519                                    const char *new_dname) {
1520     auto f = std::bind(
1521         env_dirtool_move, env, std::placeholders::_1, old_dname, new_dname);
1522     return locked_env_op(env, txn, f);
1523 }
1524 
1525 static int env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags);
1526 
1527 static int
locked_env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)1528 locked_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
1529     int ret, r;
1530     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1531     HANDLE_READ_ONLY_TXN(txn);
1532 
1533     DB_TXN *child_txn = NULL;
1534     int using_txns = env->i->open_flags & DB_INIT_TXN;
1535     if (using_txns) {
1536         ret = toku_txn_begin(env, txn, &child_txn, 0);
1537         lazy_assert_zero(ret);
1538     }
1539 
1540     // cannot begin a checkpoint
1541     toku_multi_operation_client_lock();
1542     r = env_dbremove(env, child_txn, fname, dbname, flags);
1543     toku_multi_operation_client_unlock();
1544 
1545     if (using_txns) {
1546         if (r == 0) {
1547             ret = locked_txn_commit(child_txn, 0);
1548             lazy_assert_zero(ret);
1549         } else {
1550             ret = locked_txn_abort(child_txn);
1551             lazy_assert_zero(ret);
1552         }
1553     }
1554     return r;
1555 }
1556 
1557 static int env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags);
1558 
1559 static int
locked_env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)1560 locked_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
1561     int ret, r;
1562     HANDLE_READ_ONLY_TXN(txn);
1563     HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn);
1564 
1565     DB_TXN *child_txn = NULL;
1566     int using_txns = env->i->open_flags & DB_INIT_TXN;
1567     if (using_txns) {
1568         ret = toku_txn_begin(env, txn, &child_txn, 0);
1569         lazy_assert_zero(ret);
1570     }
1571 
1572     // cannot begin a checkpoint
1573     toku_multi_operation_client_lock();
1574     r = env_dbrename(env, child_txn, fname, dbname, newname, flags);
1575     toku_multi_operation_client_unlock();
1576 
1577     if (using_txns) {
1578         if (r == 0) {
1579             ret = locked_txn_commit(child_txn, 0);
1580             lazy_assert_zero(ret);
1581         } else {
1582             ret = locked_txn_abort(child_txn);
1583             lazy_assert_zero(ret);
1584         }
1585     }
1586     return r;
1587 }
1588 
1589 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
1590 
1591 static int
env_get_cachesize(DB_ENV * env,uint32_t * gbytes,uint32_t * bytes,int * ncache)1592 env_get_cachesize(DB_ENV * env, uint32_t *gbytes, uint32_t *bytes, int *ncache) {
1593     HANDLE_PANICKED_ENV(env);
1594     *gbytes = env->i->cachetable_size >> 30;
1595     *bytes = env->i->cachetable_size & ((1<<30)-1);
1596     *ncache = 1;
1597     return 0;
1598 }
1599 
1600 #endif
1601 
1602 static int
env_set_data_dir(DB_ENV * env,const char * dir)1603 env_set_data_dir(DB_ENV * env, const char *dir) {
1604     HANDLE_PANICKED_ENV(env);
1605     int r;
1606 
1607     if (env_opened(env) || !dir) {
1608         r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir after opening the env\n");
1609     }
1610     else if (env->i->data_dir)
1611         r = toku_ydb_do_error(env, EINVAL, "You cannot set the data dir more than once.\n");
1612     else {
1613         env->i->data_dir = toku_strdup(dir);
1614         if (env->i->data_dir==NULL) {
1615             assert(get_error_errno() == ENOMEM);
1616             r = toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1617         }
1618         else r = 0;
1619     }
1620     return r;
1621 }
1622 
1623 static void
env_set_errcall(DB_ENV * env,toku_env_errcall_t errcall)1624 env_set_errcall(DB_ENV * env, toku_env_errcall_t errcall) {
1625     env->i->errcall = errcall;
1626 }
1627 
1628 static void
env_set_errfile(DB_ENV * env,FILE * errfile)1629 env_set_errfile(DB_ENV*env, FILE*errfile) {
1630     env->i->errfile = errfile;
1631 }
1632 
1633 static void
env_set_errpfx(DB_ENV * env,const char * errpfx)1634 env_set_errpfx(DB_ENV * env, const char *errpfx) {
1635     env->i->errpfx = errpfx;
1636 }
1637 
1638 static int
env_set_flags(DB_ENV * env,uint32_t flags,int onoff)1639 env_set_flags(DB_ENV * env, uint32_t flags, int onoff) {
1640     HANDLE_PANICKED_ENV(env);
1641 
1642     uint32_t change = 0;
1643     if (flags & DB_AUTO_COMMIT) {
1644         change |=  DB_AUTO_COMMIT;
1645         flags  &= ~DB_AUTO_COMMIT;
1646     }
1647     if (flags != 0 && onoff) {
1648         return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support any nonzero ENV flags other than DB_AUTO_COMMIT\n");
1649     }
1650     if   (onoff) env->i->open_flags |=  change;
1651     else         env->i->open_flags &= ~change;
1652     return 0;
1653 }
1654 
1655 static int
env_set_lg_bsize(DB_ENV * env,uint32_t bsize)1656 env_set_lg_bsize(DB_ENV * env, uint32_t bsize) {
1657     HANDLE_PANICKED_ENV(env);
1658     return toku_logger_set_lg_bsize(env->i->logger, bsize);
1659 }
1660 
1661 static int
env_set_lg_dir(DB_ENV * env,const char * dir)1662 env_set_lg_dir(DB_ENV * env, const char *dir) {
1663     HANDLE_PANICKED_ENV(env);
1664     if (env_opened(env)) {
1665         return toku_ydb_do_error(env, EINVAL, "Cannot set log dir after opening the env\n");
1666     }
1667 
1668     if (env->i->lg_dir) toku_free(env->i->lg_dir);
1669     if (dir) {
1670         env->i->lg_dir = toku_strdup(dir);
1671         if (!env->i->lg_dir) {
1672             return toku_ydb_do_error(env, ENOMEM, "Out of memory\n");
1673         }
1674     }
1675     else env->i->lg_dir = NULL;
1676     return 0;
1677 }
1678 
1679 static int
env_set_lg_max(DB_ENV * env,uint32_t lg_max)1680 env_set_lg_max(DB_ENV * env, uint32_t lg_max) {
1681     HANDLE_PANICKED_ENV(env);
1682     return toku_logger_set_lg_max(env->i->logger, lg_max);
1683 }
1684 
1685 static int
env_get_lg_max(DB_ENV * env,uint32_t * lg_maxp)1686 env_get_lg_max(DB_ENV * env, uint32_t *lg_maxp) {
1687     HANDLE_PANICKED_ENV(env);
1688     return toku_logger_get_lg_max(env->i->logger, lg_maxp);
1689 }
1690 
1691 static int
env_set_lk_detect(DB_ENV * env,uint32_t UU (detect))1692 env_set_lk_detect(DB_ENV * env, uint32_t UU(detect)) {
1693     HANDLE_PANICKED_ENV(env);
1694     return toku_ydb_do_error(env, EINVAL, "PerconaFT does not (yet) support set_lk_detect\n");
1695 }
1696 
1697 static int
env_set_lk_max_memory(DB_ENV * env,uint64_t lock_memory_limit)1698 env_set_lk_max_memory(DB_ENV *env, uint64_t lock_memory_limit) {
1699     HANDLE_PANICKED_ENV(env);
1700     int r = 0;
1701     if (env_opened(env)) {
1702         r = EINVAL;
1703     } else {
1704         r = env->i->ltm.set_max_lock_memory(lock_memory_limit);
1705     }
1706     return r;
1707 }
1708 
1709 static int
env_get_lk_max_memory(DB_ENV * env,uint64_t * lk_maxp)1710 env_get_lk_max_memory(DB_ENV *env, uint64_t *lk_maxp) {
1711     HANDLE_PANICKED_ENV(env);
1712     uint32_t max_lock_memory = env->i->ltm.get_max_lock_memory();
1713     *lk_maxp = max_lock_memory;
1714     return 0;
1715 }
1716 
1717 //void toku__env_set_noticecall (DB_ENV *env, void (*noticecall)(DB_ENV *, db_notices)) {
1718 //    env->i->noticecall = noticecall;
1719 //}
1720 
1721 static int
env_set_tmp_dir(DB_ENV * env,const char * tmp_dir)1722 env_set_tmp_dir(DB_ENV * env, const char *tmp_dir) {
1723     HANDLE_PANICKED_ENV(env);
1724     if (env_opened(env)) {
1725         return toku_ydb_do_error(env, EINVAL, "Cannot set the tmp dir after opening an env\n");
1726     }
1727     if (!tmp_dir) {
1728         return toku_ydb_do_error(env, EINVAL, "Tmp dir bust be non-null\n");
1729     }
1730     if (env->i->tmp_dir)
1731         toku_free(env->i->tmp_dir);
1732     env->i->tmp_dir = toku_strdup(tmp_dir);
1733     return env->i->tmp_dir ? 0 : ENOMEM;
1734 }
1735 
1736 static int
env_set_verbose(DB_ENV * env,uint32_t UU (which),int UU (onoff))1737 env_set_verbose(DB_ENV * env, uint32_t UU(which), int UU(onoff)) {
1738     HANDLE_PANICKED_ENV(env);
1739     return 1;
1740 }
1741 
1742 static int
toku_env_txn_checkpoint(DB_ENV * env,uint32_t kbyte,uint32_t min,uint32_t flags)1743 toku_env_txn_checkpoint(DB_ENV * env, uint32_t kbyte __attribute__((__unused__)), uint32_t min __attribute__((__unused__)), uint32_t flags __attribute__((__unused__))) {
1744     CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
1745     int r = toku_checkpoint(cp, env->i->logger,
1746                             checkpoint_callback_f,  checkpoint_callback_extra,
1747                             checkpoint_callback2_f, checkpoint_callback2_extra,
1748                             CLIENT_CHECKPOINT);
1749     if (r) {
1750         // Panicking the whole environment may be overkill, but I'm not sure what else to do.
1751         env_panic(env, r, "checkpoint error\n");
1752         toku_ydb_do_error(env, r, "Checkpoint\n");
1753     }
1754     return r;
1755 }
1756 
1757 static int
env_txn_stat(DB_ENV * env,DB_TXN_STAT ** UU (statp),uint32_t UU (flags))1758 env_txn_stat(DB_ENV * env, DB_TXN_STAT ** UU(statp), uint32_t UU(flags)) {
1759     HANDLE_PANICKED_ENV(env);
1760     return 1;
1761 }
1762 
1763 //
1764 // We can assume the client calls this function right after recovery
1765 // to return a list of prepared transactions to the user. When called,
1766 // we can assume that no other work is being done in the system,
1767 // as we are in the state of being after recovery,
1768 // but before client operations should commence
1769 //
1770 static int
env_txn_xa_recover(DB_ENV * env,TOKU_XA_XID xids[],long count,long * retp,uint32_t flags)1771 env_txn_xa_recover (DB_ENV *env, TOKU_XA_XID xids[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1772     struct tokulogger_preplist *MALLOC_N(count,preps);
1773     int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1774     if (r==0) {
1775         assert(*retp<=count);
1776         for (int i=0; i<*retp; i++) {
1777             xids[i] = preps[i].xid;
1778         }
1779     }
1780     toku_free(preps);
1781     return r;
1782 }
1783 
1784 //
1785 // We can assume the client calls this function right after recovery
1786 // to return a list of prepared transactions to the user. When called,
1787 // we can assume that no other work is being done in the system,
1788 // as we are in the state of being after recovery,
1789 // but before client operations should commence
1790 //
1791 static int
env_txn_recover(DB_ENV * env,DB_PREPLIST preplist[],long count,long * retp,uint32_t flags)1792 env_txn_recover (DB_ENV *env, DB_PREPLIST preplist[/*count*/], long count, /*out*/ long *retp, uint32_t flags) {
1793     struct tokulogger_preplist *MALLOC_N(count,preps);
1794     int r = toku_logger_recover_txn(env->i->logger, preps, count, retp, flags);
1795     if (r==0) {
1796         assert(*retp<=count);
1797         for (int i=0; i<*retp; i++) {
1798             preplist[i].txn = preps[i].txn;
1799             memcpy(preplist[i].gid, preps[i].xid.data, preps[i].xid.gtrid_length + preps[i].xid.bqual_length);
1800         }
1801     }
1802     toku_free(preps);
1803     return r;
1804 }
1805 
1806 static int
env_get_txn_from_xid(DB_ENV * env,TOKU_XA_XID * xid,DB_TXN ** txnp)1807 env_get_txn_from_xid (DB_ENV *env, /*in*/ TOKU_XA_XID *xid, /*out*/ DB_TXN **txnp) {
1808     return toku_txn_manager_get_root_txn_from_xid(toku_logger_get_txn_manager(env->i->logger), xid, txnp);
1809 }
1810 
1811 static int
env_checkpointing_set_period(DB_ENV * env,uint32_t seconds)1812 env_checkpointing_set_period(DB_ENV * env, uint32_t seconds) {
1813     HANDLE_PANICKED_ENV(env);
1814     int r = 0;
1815     if (!env_opened(env)) {
1816         r = EINVAL;
1817     } else {
1818         toku_set_checkpoint_period(env->i->cachetable, seconds);
1819     }
1820     return r;
1821 }
1822 
1823 static int
env_cleaner_set_period(DB_ENV * env,uint32_t seconds)1824 env_cleaner_set_period(DB_ENV * env, uint32_t seconds) {
1825     HANDLE_PANICKED_ENV(env);
1826     int r = 0;
1827     if (!env_opened(env)) {
1828         r = EINVAL;
1829     } else {
1830         toku_set_cleaner_period(env->i->cachetable, seconds);
1831     }
1832     return r;
1833 }
1834 
1835 static int
env_cleaner_set_iterations(DB_ENV * env,uint32_t iterations)1836 env_cleaner_set_iterations(DB_ENV * env, uint32_t iterations) {
1837     HANDLE_PANICKED_ENV(env);
1838     int r = 0;
1839     if (!env_opened(env)) {
1840         r = EINVAL;
1841     } else {
1842         toku_set_cleaner_iterations(env->i->cachetable, iterations);
1843     }
1844     return r;
1845 }
1846 
1847 static int
env_create_loader(DB_ENV * env,DB_TXN * txn,DB_LOADER ** blp,DB * src_db,int N,DB * dbs[],uint32_t db_flags[],uint32_t dbt_flags[],uint32_t loader_flags)1848 env_create_loader(DB_ENV *env,
1849                   DB_TXN *txn,
1850                   DB_LOADER **blp,
1851                   DB *src_db,
1852                   int N,
1853                   DB *dbs[],
1854                   uint32_t db_flags[/*N*/],
1855                   uint32_t dbt_flags[/*N*/],
1856                   uint32_t loader_flags) {
1857     int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
1858     return r;
1859 }
1860 
1861 static int
env_checkpointing_get_period(DB_ENV * env,uint32_t * seconds)1862 env_checkpointing_get_period(DB_ENV * env, uint32_t *seconds) {
1863     HANDLE_PANICKED_ENV(env);
1864     int r = 0;
1865     if (!env_opened(env)) r = EINVAL;
1866     else
1867         *seconds = toku_get_checkpoint_period_unlocked(env->i->cachetable);
1868     return r;
1869 }
1870 
1871 static int
env_cleaner_get_period(DB_ENV * env,uint32_t * seconds)1872 env_cleaner_get_period(DB_ENV * env, uint32_t *seconds) {
1873     HANDLE_PANICKED_ENV(env);
1874     int r = 0;
1875     if (!env_opened(env)) r = EINVAL;
1876     else
1877         *seconds = toku_get_cleaner_period_unlocked(env->i->cachetable);
1878     return r;
1879 }
1880 
1881 static int
env_cleaner_get_iterations(DB_ENV * env,uint32_t * iterations)1882 env_cleaner_get_iterations(DB_ENV * env, uint32_t *iterations) {
1883     HANDLE_PANICKED_ENV(env);
1884     int r = 0;
1885     if (!env_opened(env)) r = EINVAL;
1886     else
1887         *iterations = toku_get_cleaner_iterations(env->i->cachetable);
1888     return r;
1889 }
1890 
1891 static int
env_evictor_set_enable_partial_eviction(DB_ENV * env,bool enabled)1892 env_evictor_set_enable_partial_eviction(DB_ENV* env, bool enabled) {
1893     HANDLE_PANICKED_ENV(env);
1894     int r = 0;
1895     if (!env_opened(env)) r = EINVAL;
1896     else toku_set_enable_partial_eviction(env->i->cachetable, enabled);
1897     return r;
1898 }
1899 
1900 static int
env_evictor_get_enable_partial_eviction(DB_ENV * env,bool * enabled)1901 env_evictor_get_enable_partial_eviction(DB_ENV* env, bool *enabled) {
1902     HANDLE_PANICKED_ENV(env);
1903     int r = 0;
1904     if (!env_opened(env)) r = EINVAL;
1905     else *enabled = toku_get_enable_partial_eviction(env->i->cachetable);
1906     return r;
1907 }
1908 
1909 static int
env_checkpointing_postpone(DB_ENV * env)1910 env_checkpointing_postpone(DB_ENV * env) {
1911     HANDLE_PANICKED_ENV(env);
1912     int r = 0;
1913     if (!env_opened(env)) r = EINVAL;
1914     else toku_checkpoint_safe_client_lock();
1915     return r;
1916 }
1917 
1918 static int
env_checkpointing_resume(DB_ENV * env)1919 env_checkpointing_resume(DB_ENV * env) {
1920     HANDLE_PANICKED_ENV(env);
1921     int r = 0;
1922     if (!env_opened(env)) r = EINVAL;
1923     else toku_checkpoint_safe_client_unlock();
1924     return r;
1925 }
1926 
1927 static int
env_checkpointing_begin_atomic_operation(DB_ENV * env)1928 env_checkpointing_begin_atomic_operation(DB_ENV * env) {
1929     HANDLE_PANICKED_ENV(env);
1930     int r = 0;
1931     if (!env_opened(env)) r = EINVAL;
1932     else toku_multi_operation_client_lock();
1933     return r;
1934 }
1935 
1936 static int
env_checkpointing_end_atomic_operation(DB_ENV * env)1937 env_checkpointing_end_atomic_operation(DB_ENV * env) {
1938     HANDLE_PANICKED_ENV(env);
1939     int r = 0;
1940     if (!env_opened(env)) r = EINVAL;
1941     else toku_multi_operation_client_unlock();
1942     return r;
1943 }
1944 
1945 static int
env_set_default_bt_compare(DB_ENV * env,int (* bt_compare)(DB *,const DBT *,const DBT *))1946 env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const DBT *, const DBT *)) {
1947     HANDLE_PANICKED_ENV(env);
1948     int r = 0;
1949     if (env_opened(env)) r = EINVAL;
1950     else {
1951         env->i->bt_compare = bt_compare;
1952     }
1953     return r;
1954 }
1955 
1956 static void
env_set_update(DB_ENV * env,int (* update_function)(DB *,const DBT * key,const DBT * old_val,const DBT * extra,void (* set_val)(const DBT * new_val,void * set_extra),void * set_extra))1957 env_set_update (DB_ENV *env, int (*update_function)(DB *, const DBT *key, const DBT *old_val, const DBT *extra, void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
1958     env->i->update_function = update_function;
1959 }
1960 
1961 static int
env_set_generate_row_callback_for_put(DB_ENV * env,generate_row_for_put_func generate_row_for_put)1962 env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
1963     HANDLE_PANICKED_ENV(env);
1964     int r = 0;
1965     if (env_opened(env)) r = EINVAL;
1966     else {
1967         env->i->generate_row_for_put = generate_row_for_put;
1968     }
1969     return r;
1970 }
1971 
1972 static int
env_set_generate_row_callback_for_del(DB_ENV * env,generate_row_for_del_func generate_row_for_del)1973 env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
1974     HANDLE_PANICKED_ENV(env);
1975     int r = 0;
1976     if (env_opened(env)) r = EINVAL;
1977     else {
1978         env->i->generate_row_for_del = generate_row_for_del;
1979     }
1980     return r;
1981 }
1982 static int
env_set_redzone(DB_ENV * env,int redzone)1983 env_set_redzone(DB_ENV *env, int redzone) {
1984     HANDLE_PANICKED_ENV(env);
1985     int r;
1986     if (env_opened(env))
1987         r = EINVAL;
1988     else {
1989         env->i->redzone = redzone;
1990         r = 0;
1991     }
1992     return r;
1993 }
1994 
env_get_lock_timeout(DB_ENV * env,uint64_t * lock_timeout_msec)1995 static int env_get_lock_timeout(DB_ENV *env, uint64_t *lock_timeout_msec) {
1996     uint64_t t = env->i->default_lock_timeout_msec;
1997     if (env->i->get_lock_timeout_callback)
1998         t = env->i->get_lock_timeout_callback(t);
1999     *lock_timeout_msec = t;
2000     return 0;
2001 }
2002 
env_set_lock_timeout(DB_ENV * env,uint64_t default_lock_timeout_msec,uint64_t (* get_lock_timeout_callback)(uint64_t default_lock_timeout_msec))2003 static int env_set_lock_timeout(DB_ENV *env, uint64_t default_lock_timeout_msec, uint64_t (*get_lock_timeout_callback)(uint64_t default_lock_timeout_msec)) {
2004     env->i->default_lock_timeout_msec = default_lock_timeout_msec;
2005     env->i->get_lock_timeout_callback = get_lock_timeout_callback;
2006     return 0;
2007 }
2008 
2009 static int
env_set_lock_timeout_callback(DB_ENV * env,lock_timeout_callback callback)2010 env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
2011     env->i->lock_wait_timeout_callback = callback;
2012     return 0;
2013 }
2014 
2015 static void
format_time(const time_t * timer,char * buf)2016 format_time(const time_t *timer, char *buf) {
2017     ctime_r(timer, buf);
2018     size_t len = strlen(buf);
2019     assert(len < 26);
2020     char end;
2021 
2022     assert(len>=1);
2023     end = buf[len-1];
2024     while (end == '\n' || end == '\r') {
2025         buf[len-1] = '\0';
2026         len--;
2027         assert(len>=1);
2028         end = buf[len-1];
2029     }
2030 }
2031 
2032 ////////////////////////////////////////////////////////////////////////////////////////////////
2033 // Local definition of status information from portability layer, which should not include db.h.
2034 // Local status structs are used to concentrate file system information collected from various places
2035 // and memory information collected from memory.c.
2036 //
2037 typedef enum {
2038     FS_ENOSPC_REDZONE_STATE = 0,  // possible values are enumerated by fs_redzone_state
2039     FS_ENOSPC_THREADS_BLOCKED,    // how many threads currently blocked on ENOSPC
2040     FS_ENOSPC_REDZONE_CTR,        // number of operations rejected by enospc prevention (red zone)
2041     FS_ENOSPC_MOST_RECENT,        // most recent time that file system was completely full
2042     FS_ENOSPC_COUNT,              // total number of times ENOSPC was returned from an attempt to write
2043     FS_FSYNC_TIME,
2044     FS_FSYNC_COUNT,
2045     FS_LONG_FSYNC_TIME,
2046     FS_LONG_FSYNC_COUNT,
2047     FS_STATUS_NUM_ROWS,           // must be last
2048 } fs_status_entry;
2049 
2050 typedef struct {
2051     bool initialized;
2052     TOKU_ENGINE_STATUS_ROW_S status[FS_STATUS_NUM_ROWS];
2053 } FS_STATUS_S, *FS_STATUS;
2054 
2055 static FS_STATUS_S fsstat;
2056 
2057 #define FS_STATUS_INIT(k,c,t,l,inc) TOKUFT_STATUS_INIT(fsstat, k, c, t, "filesystem: " l, inc)
2058 
2059 static void
fs_status_init(void)2060 fs_status_init(void) {
2061     FS_STATUS_INIT(FS_ENOSPC_REDZONE_STATE,   nullptr, FS_STATE, "ENOSPC redzone state", TOKU_ENGINE_STATUS);
2062     FS_STATUS_INIT(FS_ENOSPC_THREADS_BLOCKED, FILESYSTEM_THREADS_BLOCKED_BY_FULL_DISK, UINT64,   "threads currently blocked by full disk", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2063     FS_STATUS_INIT(FS_ENOSPC_REDZONE_CTR,     nullptr, UINT64,   "number of operations rejected by enospc prevention (red zone)", TOKU_ENGINE_STATUS);
2064     FS_STATUS_INIT(FS_ENOSPC_MOST_RECENT,     nullptr, UNIXTIME, "most recent disk full", TOKU_ENGINE_STATUS);
2065     FS_STATUS_INIT(FS_ENOSPC_COUNT,           nullptr, UINT64,   "number of write operations that returned ENOSPC", TOKU_ENGINE_STATUS);
2066     FS_STATUS_INIT(FS_FSYNC_TIME,             FILESYSTEM_FSYNC_TIME, UINT64,   "fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2067     FS_STATUS_INIT(FS_FSYNC_COUNT,            FILESYSTEM_FSYNC_NUM, UINT64,   "fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2068     FS_STATUS_INIT(FS_LONG_FSYNC_TIME,        FILESYSTEM_LONG_FSYNC_TIME, UINT64,   "long fsync time", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2069     FS_STATUS_INIT(FS_LONG_FSYNC_COUNT,       FILESYSTEM_LONG_FSYNC_NUM, UINT64,   "long fsync count", TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS);
2070     fsstat.initialized = true;
2071 }
2072 #undef FS_STATUS_INIT
2073 
2074 #define FS_STATUS_VALUE(x) fsstat.status[x].value.num
2075 
2076 static void
fs_get_status(DB_ENV * env,fs_redzone_state * redzone_state)2077 fs_get_status(DB_ENV * env, fs_redzone_state * redzone_state) {
2078     if (!fsstat.initialized)
2079         fs_status_init();
2080 
2081     time_t   enospc_most_recent_timestamp;
2082     uint64_t enospc_threads_blocked, enospc_total;
2083     toku_fs_get_write_info(&enospc_most_recent_timestamp, &enospc_threads_blocked, &enospc_total);
2084     if (enospc_threads_blocked)
2085         FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = FS_BLOCKED;
2086     else
2087         FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE) = env->i->fs_state;
2088     *redzone_state = (fs_redzone_state) FS_STATUS_VALUE(FS_ENOSPC_REDZONE_STATE);
2089     FS_STATUS_VALUE(FS_ENOSPC_THREADS_BLOCKED) = enospc_threads_blocked;
2090     FS_STATUS_VALUE(FS_ENOSPC_REDZONE_CTR) = env->i->enospc_redzone_ctr;
2091     FS_STATUS_VALUE(FS_ENOSPC_MOST_RECENT) = enospc_most_recent_timestamp;
2092     FS_STATUS_VALUE(FS_ENOSPC_COUNT) = enospc_total;
2093 
2094     uint64_t fsync_count, fsync_time, long_fsync_threshold, long_fsync_count, long_fsync_time;
2095     toku_get_fsync_times(&fsync_count, &fsync_time, &long_fsync_threshold, &long_fsync_count, &long_fsync_time);
2096     FS_STATUS_VALUE(FS_FSYNC_COUNT) = fsync_count;
2097     FS_STATUS_VALUE(FS_FSYNC_TIME) = fsync_time;
2098     FS_STATUS_VALUE(FS_LONG_FSYNC_COUNT) = long_fsync_count;
2099     FS_STATUS_VALUE(FS_LONG_FSYNC_TIME) = long_fsync_time;
2100 }
2101 #undef FS_STATUS_VALUE
2102 
2103 // Local status struct used to get information from memory.c
2104 typedef enum {
2105     MEMORY_MALLOC_COUNT = 0,
2106     MEMORY_FREE_COUNT,
2107     MEMORY_REALLOC_COUNT,
2108     MEMORY_MALLOC_FAIL,
2109     MEMORY_REALLOC_FAIL,
2110     MEMORY_REQUESTED,
2111     MEMORY_USED,
2112     MEMORY_FREED,
2113     MEMORY_MAX_REQUESTED_SIZE,
2114     MEMORY_LAST_FAILED_SIZE,
2115     MEMORY_MAX_IN_USE,
2116     MEMORY_MALLOCATOR_VERSION,
2117     MEMORY_MMAP_THRESHOLD,
2118     MEMORY_STATUS_NUM_ROWS
2119 } memory_status_entry;
2120 
2121 typedef struct {
2122     bool initialized;
2123     TOKU_ENGINE_STATUS_ROW_S status[MEMORY_STATUS_NUM_ROWS];
2124 } MEMORY_STATUS_S, *MEMORY_STATUS;
2125 
2126 static MEMORY_STATUS_S memory_status;
2127 
2128 #define STATUS_INIT(k,c,t,l) TOKUFT_STATUS_INIT(memory_status, k, c, t, "memory: " l, TOKU_ENGINE_STATUS|TOKU_GLOBAL_STATUS)
2129 
2130 static void
memory_status_init(void)2131 memory_status_init(void) {
2132     // Note, this function initializes the keyname, type, and legend fields.
2133     // Value fields are initialized to zero by compiler.
2134     STATUS_INIT(MEMORY_MALLOC_COUNT,        MEMORY_MALLOC_COUNT,        UINT64, "number of malloc operations");
2135     STATUS_INIT(MEMORY_FREE_COUNT,          MEMORY_FREE_COUNT,          UINT64, "number of free operations");
2136     STATUS_INIT(MEMORY_REALLOC_COUNT,       MEMORY_REALLOC_COUNT,       UINT64, "number of realloc operations");
2137     STATUS_INIT(MEMORY_MALLOC_FAIL,         MEMORY_MALLOC_FAIL,         UINT64, "number of malloc operations that failed");
2138     STATUS_INIT(MEMORY_REALLOC_FAIL,        MEMORY_REALLOC_FAIL,        UINT64, "number of realloc operations that failed" );
2139     STATUS_INIT(MEMORY_REQUESTED,           MEMORY_REQUESTED,           UINT64, "number of bytes requested");
2140     STATUS_INIT(MEMORY_USED,                MEMORY_USED,                UINT64, "number of bytes used (requested + overhead)");
2141     STATUS_INIT(MEMORY_FREED,               MEMORY_FREED,               UINT64, "number of bytes freed");
2142     STATUS_INIT(MEMORY_MAX_REQUESTED_SIZE,  MEMORY_MAX_REQUESTED_SIZE,  UINT64, "largest attempted allocation size");
2143     STATUS_INIT(MEMORY_LAST_FAILED_SIZE,    MEMORY_LAST_FAILED_SIZE,    UINT64, "size of the last failed allocation attempt");
2144     STATUS_INIT(MEMORY_MAX_IN_USE,          MEM_ESTIMATED_MAXIMUM_MEMORY_FOOTPRINT, UINT64, "estimated maximum memory footprint");
2145     STATUS_INIT(MEMORY_MALLOCATOR_VERSION,  MEMORY_MALLOCATOR_VERSION,  CHARSTR, "mallocator version");
2146     STATUS_INIT(MEMORY_MMAP_THRESHOLD,      MEMORY_MMAP_THRESHOLD,      UINT64, "mmap threshold");
2147     memory_status.initialized = true;
2148 }
2149 #undef STATUS_INIT
2150 
2151 #define MEMORY_STATUS_VALUE(x) memory_status.status[x].value.num
2152 
2153 static void
memory_get_status(void)2154 memory_get_status(void) {
2155     if (!memory_status.initialized)
2156         memory_status_init();
2157     LOCAL_MEMORY_STATUS_S local_memstat;
2158     toku_memory_get_status(&local_memstat);
2159     MEMORY_STATUS_VALUE(MEMORY_MALLOC_COUNT) = local_memstat.malloc_count;
2160     MEMORY_STATUS_VALUE(MEMORY_FREE_COUNT) = local_memstat.free_count;
2161     MEMORY_STATUS_VALUE(MEMORY_REALLOC_COUNT) = local_memstat.realloc_count;
2162     MEMORY_STATUS_VALUE(MEMORY_MALLOC_FAIL) = local_memstat.malloc_fail;
2163     MEMORY_STATUS_VALUE(MEMORY_REALLOC_FAIL) = local_memstat.realloc_fail;
2164     MEMORY_STATUS_VALUE(MEMORY_REQUESTED) = local_memstat.requested;
2165     MEMORY_STATUS_VALUE(MEMORY_USED) = local_memstat.used;
2166     MEMORY_STATUS_VALUE(MEMORY_FREED) = local_memstat.freed;
2167     MEMORY_STATUS_VALUE(MEMORY_MAX_IN_USE) = local_memstat.max_in_use;
2168     MEMORY_STATUS_VALUE(MEMORY_MMAP_THRESHOLD) = local_memstat.mmap_threshold;
2169     memory_status.status[MEMORY_MALLOCATOR_VERSION].value.str = local_memstat.mallocator_version;
2170 }
2171 #undef MEMORY_STATUS_VALUE
2172 
2173 // how many rows are in engine status?
2174 static int
env_get_engine_status_num_rows(DB_ENV * UU (env),uint64_t * num_rowsp)2175 env_get_engine_status_num_rows (DB_ENV * UU(env), uint64_t * num_rowsp) {
2176     uint64_t num_rows = 0;
2177     num_rows += YDB_LAYER_STATUS_NUM_ROWS;
2178     num_rows += YDB_C_LAYER_STATUS_NUM_ROWS;
2179     num_rows += YDB_WRITE_LAYER_STATUS_NUM_ROWS;
2180     num_rows += LE_STATUS_S::LE_STATUS_NUM_ROWS;
2181     num_rows += CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS;
2182     num_rows += CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS;
2183     num_rows += LTM_STATUS_S::LTM_STATUS_NUM_ROWS;
2184     num_rows += FT_STATUS_S::FT_STATUS_NUM_ROWS;
2185     num_rows += FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS;
2186     num_rows += FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS;
2187     num_rows += TXN_STATUS_S::TXN_STATUS_NUM_ROWS;
2188     num_rows += LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS;
2189     num_rows += MEMORY_STATUS_NUM_ROWS;
2190     num_rows += FS_STATUS_NUM_ROWS;
2191     num_rows += INDEXER_STATUS_NUM_ROWS;
2192     num_rows += LOADER_STATUS_NUM_ROWS;
2193     num_rows += CTX_STATUS_NUM_ROWS;
2194 #if 0
2195     // enable when upgrade is supported
2196     num_rows += FT_UPGRADE_STATUS_NUM_ROWS;
2197     num_rows += PERSISTENT_UPGRADE_STATUS_NUM_ROWS;
2198 #endif
2199     *num_rowsp = num_rows;
2200     return 0;
2201 }
2202 
2203 // Do not take ydb lock or any other lock around or in this function.
2204 // If the engine is blocked because some thread is holding a lock, this function
2205 // can help diagnose the problem.
2206 // This function only collects information, and it does not matter if something gets garbled
2207 // because of a race condition.
2208 // Note, engine status is still collected even if the environment or logger is panicked
2209 static int
env_get_engine_status(DB_ENV * env,TOKU_ENGINE_STATUS_ROW engstat,uint64_t maxrows,uint64_t * num_rows,fs_redzone_state * redzone_state,uint64_t * env_panicp,char * env_panic_string_buf,int env_panic_string_length,toku_engine_status_include_type include_flags)2210 env_get_engine_status (DB_ENV * env, TOKU_ENGINE_STATUS_ROW engstat, uint64_t maxrows,  uint64_t *num_rows, fs_redzone_state* redzone_state, uint64_t * env_panicp, char * env_panic_string_buf, int env_panic_string_length, toku_engine_status_include_type include_flags) {
2211     int r;
2212 
2213     if (env_panic_string_buf) {
2214         if (env && env->i && env->i->is_panicked && env->i->panic_string) {
2215             strncpy(env_panic_string_buf, env->i->panic_string, env_panic_string_length);
2216             env_panic_string_buf[env_panic_string_length - 1] = '\0';  // just in case
2217         }
2218         else
2219             *env_panic_string_buf = '\0';
2220     }
2221 
2222     if ( !(env)     ||
2223          !(env->i)  ||
2224          !(env_opened(env)) ||
2225          !num_rows ||
2226          !include_flags)
2227         r = EINVAL;
2228     else {
2229         r = 0;
2230         uint64_t row = 0;  // which row to fill next
2231         *env_panicp = env->i->is_panicked;
2232 
2233         {
2234             YDB_LAYER_STATUS_S ydb_stat;
2235             ydb_layer_get_status(env, &ydb_stat);
2236             for (int i = 0; i < YDB_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2237                 if (ydb_stat.status[i].include & include_flags) {
2238                     engstat[row++] = ydb_stat.status[i];
2239                 }
2240             }
2241         }
2242         {
2243             YDB_C_LAYER_STATUS_S ydb_c_stat;
2244             ydb_c_layer_get_status(&ydb_c_stat);
2245             for (int i = 0; i < YDB_C_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2246                 if (ydb_c_stat.status[i].include & include_flags) {
2247                     engstat[row++] = ydb_c_stat.status[i];
2248                 }
2249             }
2250         }
2251         {
2252             YDB_WRITE_LAYER_STATUS_S ydb_write_stat;
2253             ydb_write_layer_get_status(&ydb_write_stat);
2254             for (int i = 0; i < YDB_WRITE_LAYER_STATUS_NUM_ROWS && row < maxrows; i++) {
2255                 if (ydb_write_stat.status[i].include & include_flags) {
2256                     engstat[row++] = ydb_write_stat.status[i];
2257                 }
2258             }
2259         }
2260         {
2261             LE_STATUS_S lestat;                    // Rice's vampire
2262             toku_le_get_status(&lestat);
2263             for (int i = 0; i < LE_STATUS_S::LE_STATUS_NUM_ROWS && row < maxrows; i++) {
2264                 if (lestat.status[i].include & include_flags) {
2265                     engstat[row++] = lestat.status[i];
2266                 }
2267             }
2268         }
2269         {
2270             CHECKPOINT_STATUS_S cpstat;
2271             toku_checkpoint_get_status(env->i->cachetable, &cpstat);
2272             for (int i = 0; i < CHECKPOINT_STATUS_S::CP_STATUS_NUM_ROWS && row < maxrows; i++) {
2273                 if (cpstat.status[i].include & include_flags) {
2274                     engstat[row++] = cpstat.status[i];
2275                 }
2276             }
2277         }
2278         {
2279             CACHETABLE_STATUS_S ctstat;
2280             toku_cachetable_get_status(env->i->cachetable, &ctstat);
2281             for (int i = 0; i < CACHETABLE_STATUS_S::CT_STATUS_NUM_ROWS && row < maxrows; i++) {
2282                 if (ctstat.status[i].include & include_flags) {
2283                     engstat[row++] = ctstat.status[i];
2284                 }
2285             }
2286         }
2287         {
2288             LTM_STATUS_S ltmstat;
2289             env->i->ltm.get_status(&ltmstat);
2290             for (int i = 0; i < LTM_STATUS_S::LTM_STATUS_NUM_ROWS && row < maxrows; i++) {
2291                 if (ltmstat.status[i].include & include_flags) {
2292                     engstat[row++] = ltmstat.status[i];
2293                 }
2294             }
2295         }
2296         {
2297             FT_STATUS_S ftstat;
2298             toku_ft_get_status(&ftstat);
2299             for (int i = 0; i < FT_STATUS_S::FT_STATUS_NUM_ROWS && row < maxrows; i++) {
2300                 if (ftstat.status[i].include & include_flags) {
2301                     engstat[row++] = ftstat.status[i];
2302                 }
2303             }
2304         }
2305         {
2306             FT_FLUSHER_STATUS_S flusherstat;
2307             toku_ft_flusher_get_status(&flusherstat);
2308             for (int i = 0; i < FT_FLUSHER_STATUS_S::FT_FLUSHER_STATUS_NUM_ROWS && row < maxrows; i++) {
2309                 if (flusherstat.status[i].include & include_flags) {
2310                     engstat[row++] = flusherstat.status[i];
2311                 }
2312             }
2313         }
2314         {
2315             FT_HOT_STATUS_S hotstat;
2316             toku_ft_hot_get_status(&hotstat);
2317             for (int i = 0; i < FT_HOT_STATUS_S::FT_HOT_STATUS_NUM_ROWS && row < maxrows; i++) {
2318                 if (hotstat.status[i].include & include_flags) {
2319                     engstat[row++] = hotstat.status[i];
2320                 }
2321             }
2322         }
2323         {
2324             TXN_STATUS_S txnstat;
2325             toku_txn_get_status(&txnstat);
2326             for (int i = 0; i < TXN_STATUS_S::TXN_STATUS_NUM_ROWS && row < maxrows; i++) {
2327                 if (txnstat.status[i].include & include_flags) {
2328                     engstat[row++] = txnstat.status[i];
2329                 }
2330             }
2331         }
2332         {
2333             LOGGER_STATUS_S loggerstat;
2334             toku_logger_get_status(env->i->logger, &loggerstat);
2335             for (int i = 0; i < LOGGER_STATUS_S::LOGGER_STATUS_NUM_ROWS && row < maxrows; i++) {
2336                 if (loggerstat.status[i].include & include_flags) {
2337                     engstat[row++] = loggerstat.status[i];
2338                 }
2339             }
2340         }
2341 
2342         {
2343             INDEXER_STATUS_S indexerstat;
2344             toku_indexer_get_status(&indexerstat);
2345             for (int i = 0; i < INDEXER_STATUS_NUM_ROWS && row < maxrows; i++) {
2346                 if (indexerstat.status[i].include & include_flags) {
2347                     engstat[row++] = indexerstat.status[i];
2348                 }
2349             }
2350         }
2351         {
2352             LOADER_STATUS_S loaderstat;
2353             toku_loader_get_status(&loaderstat);
2354             for (int i = 0; i < LOADER_STATUS_NUM_ROWS && row < maxrows; i++) {
2355                 if (loaderstat.status[i].include & include_flags) {
2356                     engstat[row++] = loaderstat.status[i];
2357                 }
2358             }
2359         }
2360 
2361         {
2362             // memory_status is local to this file
2363             memory_get_status();
2364             for (int i = 0; i < MEMORY_STATUS_NUM_ROWS && row < maxrows; i++) {
2365                 if (memory_status.status[i].include & include_flags) {
2366                     engstat[row++] = memory_status.status[i];
2367                 }
2368             }
2369         }
2370         {
2371             // Note, fs_get_status() and the fsstat structure are local to this file because they
2372             // are used to concentrate file system information collected from various places.
2373             fs_get_status(env, redzone_state);
2374             for (int i = 0; i < FS_STATUS_NUM_ROWS && row < maxrows; i++) {
2375                 if (fsstat.status[i].include & include_flags) {
2376                     engstat[row++] = fsstat.status[i];
2377                 }
2378             }
2379         }
2380         {
2381             struct context_status ctxstatus;
2382             toku_context_get_status(&ctxstatus);
2383             for (int i = 0; i < CTX_STATUS_NUM_ROWS && row < maxrows; i++) {
2384                 if (ctxstatus.status[i].include & include_flags) {
2385                     engstat[row++] = ctxstatus.status[i];
2386                 }
2387             }
2388         }
2389 #if 0
2390         // enable when upgrade is supported
2391         {
2392             for (int i = 0; i < PERSISTENT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2393                 if (persistent_upgrade_status.status[i].include & include_flags) {
2394                     engstat[row++] = persistent_upgrade_status.status[i];
2395                 }
2396             }
2397             FT_UPGRADE_STATUS_S ft_upgradestat;
2398             toku_ft_upgrade_get_status(&ft_upgradestat);
2399             for (int i = 0; i < FT_UPGRADE_STATUS_NUM_ROWS && row < maxrows; i++) {
2400                 if (ft_upgradestat.status[i].include & include_flags) {
2401                     engstat[row++] = ft_upgradestat.status[i];
2402                 }
2403             }
2404 
2405         }
2406 #endif
2407         if (r==0) {
2408             *num_rows = row;
2409         }
2410     }
2411     return r;
2412 }
2413 
2414 // Fill buff with text description of engine status up to bufsiz bytes.
2415 // Intended for use by test programs that do not have the handlerton available,
2416 // and for use by toku_assert logic to print diagnostic info on crash.
2417 static int
env_get_engine_status_text(DB_ENV * env,char * buff,int bufsiz)2418 env_get_engine_status_text(DB_ENV * env, char * buff, int bufsiz) {
2419     uint32_t stringsize = 1024;
2420     uint64_t panic;
2421     char panicstring[stringsize];
2422     int n = 0;  // number of characters printed so far
2423     uint64_t num_rows;
2424     uint64_t max_rows;
2425     fs_redzone_state redzone_state;
2426 
2427     n = snprintf(buff, bufsiz - n, "BUILD_ID = %d\n", BUILD_ID);
2428 
2429     (void) env_get_engine_status_num_rows (env, &max_rows);
2430     TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2431     int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2432 
2433     if (r) {
2434         n += snprintf(buff + n, bufsiz - n, "Engine status not available: ");
2435         if (!env) {
2436             n += snprintf(buff + n, bufsiz - n, "no environment\n");
2437         }
2438         else if (!(env->i)) {
2439             n += snprintf(buff + n, bufsiz - n, "environment internal struct is null\n");
2440         }
2441         else if (!env_opened(env)) {
2442             n += snprintf(buff + n, bufsiz - n, "environment is not open\n");
2443         }
2444     }
2445     else {
2446         if (panic) {
2447             n += snprintf(buff + n, bufsiz - n, "Env panic code: %" PRIu64 "\n", panic);
2448             if (strlen(panicstring)) {
2449                 invariant(strlen(panicstring) <= stringsize);
2450                 n += snprintf(buff + n, bufsiz - n, "Env panic string: %s\n", panicstring);
2451             }
2452         }
2453 
2454         for (uint64_t row = 0; row < num_rows; row++) {
2455             n += snprintf(buff + n, bufsiz - n, "%s: ", mystat[row].legend);
2456             switch (mystat[row].type) {
2457             case FS_STATE:
2458                 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2459                 break;
2460             case UINT64:
2461                 n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", mystat[row].value.num);
2462                 break;
2463             case CHARSTR:
2464                 n += snprintf(buff + n, bufsiz - n, "%s\n", mystat[row].value.str);
2465                 break;
2466             case UNIXTIME:
2467                 {
2468                     char tbuf[26];
2469                     format_time((time_t*)&mystat[row].value.num, tbuf);
2470                     n += snprintf(buff + n, bufsiz - n, "%s\n", tbuf);
2471                 }
2472                 break;
2473             case TOKUTIME:
2474                 {
2475                     double t = tokutime_to_seconds(mystat[row].value.num);
2476                     n += snprintf(buff + n, bufsiz - n, "%.6f\n", t);
2477                 }
2478                 break;
2479             case PARCOUNT:
2480                 {
2481                     uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2482                     n += snprintf(buff + n, bufsiz - n, "%" PRIu64 "\n", v);
2483                 }
2484                 break;
2485             default:
2486                 n += snprintf(buff + n, bufsiz - n, "UNKNOWN STATUS TYPE: %d\n", mystat[row].type);
2487                 break;
2488             }
2489         }
2490     }
2491 
2492     if (n > bufsiz) {
2493         const char * errmsg = "BUFFER TOO SMALL\n";
2494         int len = strlen(errmsg) + 1;
2495         (void) snprintf(buff + (bufsiz - 1) - len, len, "%s", errmsg);
2496     }
2497 
2498     return r;
2499 }
2500 
2501 // prints engine status using toku_env_err line-by-line
2502 static int
env_err_engine_status(DB_ENV * env)2503 env_err_engine_status(DB_ENV * env) {
2504     uint32_t stringsize = 1024;
2505     uint64_t panic;
2506     char panicstring[stringsize];
2507     uint64_t num_rows;
2508     uint64_t max_rows;
2509     fs_redzone_state redzone_state;
2510 
2511     toku_env_err(env, 0, "BUILD_ID = %d", BUILD_ID);
2512 
2513     (void) env_get_engine_status_num_rows (env, &max_rows);
2514     TOKU_ENGINE_STATUS_ROW_S mystat[max_rows];
2515     int r = env->get_engine_status (env, mystat, max_rows, &num_rows, &redzone_state, &panic, panicstring, stringsize, TOKU_ENGINE_STATUS);
2516 
2517     if (r) {
2518         toku_env_err(env, 0, "Engine status not available: ");
2519         if (!env) {
2520             toku_env_err(env, 0, "no environment");
2521         }
2522         else if (!(env->i)) {
2523             toku_env_err(env, 0, "environment internal struct is null");
2524         }
2525         else if (!env_opened(env)) {
2526             toku_env_err(env, 0, "environment is not open");
2527         }
2528     }
2529     else {
2530         if (panic) {
2531             toku_env_err(env, 0, "Env panic code: %" PRIu64, panic);
2532             if (strlen(panicstring)) {
2533                 invariant(strlen(panicstring) <= stringsize);
2534                 toku_env_err(env, 0, "Env panic string: %s", panicstring);
2535             }
2536         }
2537 
2538         for (uint64_t row = 0; row < num_rows; row++) {
2539             switch (mystat[row].type) {
2540             case FS_STATE:
2541                 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2542                 break;
2543             case UINT64:
2544                 toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, mystat[row].value.num);
2545                 break;
2546             case CHARSTR:
2547                 toku_env_err(env, 0, "%s: %s", mystat[row].legend, mystat[row].value.str);
2548                 break;
2549             case UNIXTIME:
2550                 {
2551                     char tbuf[26];
2552                     format_time((time_t*)&mystat[row].value.num, tbuf);
2553                     toku_env_err(env, 0, "%s: %s", mystat[row].legend, tbuf);
2554                 }
2555                 break;
2556             case TOKUTIME:
2557                 {
2558                     double t = tokutime_to_seconds(mystat[row].value.num);
2559                     toku_env_err(env, 0, "%s: %.6f", mystat[row].legend, t);
2560                 }
2561                 break;
2562             case PARCOUNT:
2563                 {
2564                     uint64_t v = read_partitioned_counter(mystat[row].value.parcount);
2565                     toku_env_err(env, 0, "%s: %" PRIu64, mystat[row].legend, v);
2566                 }
2567                 break;
2568             default:
2569                 toku_env_err(env, 0, "%s: UNKNOWN STATUS TYPE: %d", mystat[row].legend, mystat[row].type);
2570                 break;
2571             }
2572         }
2573     }
2574 
2575     return r;
2576 }
2577 
2578 // intended for use by toku_assert logic, when env is not known
2579 static int
toku_maybe_get_engine_status_text(char * buff,int buffsize)2580 toku_maybe_get_engine_status_text (char * buff, int buffsize) {
2581     DB_ENV * env = most_recent_env;
2582     int r;
2583     if (engine_status_enable && env != NULL) {
2584         r = env_get_engine_status_text(env, buff, buffsize);
2585     }
2586     else {
2587         r = EOPNOTSUPP;
2588         snprintf(buff, buffsize, "Engine status not available: disabled by user.  This should only happen in test programs.\n");
2589     }
2590     return r;
2591 }
2592 
2593 static int
toku_maybe_err_engine_status(void)2594 toku_maybe_err_engine_status (void) {
2595     DB_ENV * env = most_recent_env;
2596     int r;
2597     if (engine_status_enable && env != NULL) {
2598         r = env_err_engine_status(env);
2599     }
2600     else {
2601         r = EOPNOTSUPP;
2602     }
2603     return r;
2604 }
2605 
2606 // Set panic code and panic string if not already panicked,
2607 // intended for use by toku_assert when about to abort().
2608 static void
toku_maybe_set_env_panic(int code,const char * msg)2609 toku_maybe_set_env_panic(int code, const char * msg) {
2610     if (code == 0)
2611         code = -1;
2612     if (msg == NULL)
2613         msg = "Unknown cause from abort (failed assert)\n";
2614     env_is_panicked = code;  // disable library destructor no matter what
2615     DB_ENV * env = most_recent_env;
2616     if (env &&
2617         env->i &&
2618         (env->i->is_panicked == 0)) {
2619         env_panic(env, code, msg);
2620     }
2621 }
2622 
2623 // handlerton's call to fractal tree layer on failed assert in handlerton
2624 static int
env_crash(DB_ENV * UU (db_env),const char * msg,const char * fun,const char * file,int line,int caller_errno)2625 env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* file, int line, int caller_errno) {
2626     toku_do_assert_fail(msg, fun, file, line, caller_errno);
2627     return -1;  // placate compiler
2628 }
2629 
2630 static int
env_get_cursor_for_persistent_environment(DB_ENV * env,DB_TXN * txn,DBC ** c)2631 env_get_cursor_for_persistent_environment(DB_ENV* env, DB_TXN* txn, DBC** c) {
2632     if (!env_opened(env)) {
2633         return EINVAL;
2634     }
2635     return toku_db_cursor(env->i->persistent_environment, txn, c, 0);
2636 }
2637 
2638 static int
env_get_cursor_for_directory(DB_ENV * env,DB_TXN * txn,DBC ** c)2639 env_get_cursor_for_directory(DB_ENV* env, DB_TXN* txn, DBC** c) {
2640     if (!env_opened(env)) {
2641         return EINVAL;
2642     }
2643     return toku_db_cursor(env->i->directory, txn, c, 0);
2644 }
2645 
2646 static DB *
env_get_db_for_directory(DB_ENV * env)2647 env_get_db_for_directory(DB_ENV* env) {
2648     if (!env_opened(env)) {
2649         return NULL;
2650     }
2651     return env->i->directory;
2652 }
2653 
2654 struct ltm_iterate_requests_callback_extra {
ltm_iterate_requests_callback_extraltm_iterate_requests_callback_extra2655     ltm_iterate_requests_callback_extra(DB_ENV *e,
2656                                         iterate_requests_callback cb,
2657                                         void *ex) :
2658         env(e), callback(cb), extra(ex) {
2659     }
2660     DB_ENV *env;
2661     iterate_requests_callback callback;
2662     void *extra;
2663 };
2664 
2665 static int
find_db_by_dict_id(DB * const & db,const DICTIONARY_ID & dict_id_find)2666 find_db_by_dict_id(DB *const &db, const DICTIONARY_ID &dict_id_find) {
2667     DICTIONARY_ID dict_id = db->i->dict_id;
2668     if (dict_id.dictid < dict_id_find.dictid) {
2669         return -1;
2670     } else if (dict_id.dictid > dict_id_find.dictid) {
2671         return 1;
2672     } else {
2673         return 0;
2674     }
2675 }
2676 
2677 static DB *
locked_get_db_by_dict_id(DB_ENV * env,DICTIONARY_ID dict_id)2678 locked_get_db_by_dict_id(DB_ENV *env, DICTIONARY_ID dict_id) {
2679     DB *db;
2680     int r = env->i->open_dbs_by_dict_id->find_zero<DICTIONARY_ID, find_db_by_dict_id>(dict_id, &db, nullptr);
2681     return r == 0 ? db : nullptr;
2682 }
2683 
ltm_iterate_requests_callback(DICTIONARY_ID dict_id,TXNID txnid,const DBT * left_key,const DBT * right_key,TXNID blocking_txnid,uint64_t start_time,void * extra)2684 static int ltm_iterate_requests_callback(DICTIONARY_ID dict_id, TXNID txnid,
2685                                          const DBT *left_key,
2686                                          const DBT *right_key,
2687                                          TXNID blocking_txnid,
2688                                          uint64_t start_time,
2689                                          void *extra) {
2690     ltm_iterate_requests_callback_extra *info =
2691         reinterpret_cast<ltm_iterate_requests_callback_extra *>(extra);
2692 
2693     toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2694     int r = 0;
2695     DB *db = locked_get_db_by_dict_id(info->env, dict_id);
2696     if (db != nullptr) {
2697         r = info->callback(db, txnid, left_key, right_key,
2698                            blocking_txnid, start_time, info->extra);
2699     }
2700     toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2701     return r;
2702 }
2703 
2704 static int
env_iterate_pending_lock_requests(DB_ENV * env,iterate_requests_callback callback,void * extra)2705 env_iterate_pending_lock_requests(DB_ENV *env,
2706                                   iterate_requests_callback callback,
2707                                   void *extra) {
2708     if (!env_opened(env)) {
2709         return EINVAL;
2710     }
2711 
2712     toku::locktree_manager *mgr = &env->i->ltm;
2713     ltm_iterate_requests_callback_extra e(env, callback, extra);
2714     return mgr->iterate_pending_lock_requests(ltm_iterate_requests_callback, &e);
2715 }
2716 
2717 // for the lifetime of this object:
2718 // - open_dbs_rwlock must be read locked (or better)
2719 // - txn_mutex must be held
2720 struct iter_txn_row_locks_callback_extra {
iter_txn_row_locks_callback_extraiter_txn_row_locks_callback_extra2721     iter_txn_row_locks_callback_extra(DB_ENV *e, toku::omt<txn_lt_key_ranges> *m) :
2722         env(e), current_db(nullptr), which_lt(0), lt_map(m) {
2723         if (lt_map->size() > 0) {
2724             set_iterator_and_current_db();
2725         }
2726     }
2727 
set_iterator_and_current_dbiter_txn_row_locks_callback_extra2728     void set_iterator_and_current_db() {
2729         txn_lt_key_ranges ranges;
2730         const int r = lt_map->fetch(which_lt, &ranges);
2731         invariant_zero(r);
2732         current_db = locked_get_db_by_dict_id(env, ranges.lt->get_dict_id());
2733         iter = toku::range_buffer::iterator(ranges.buffer);
2734     }
2735 
2736     DB_ENV *env;
2737     DB *current_db;
2738     size_t which_lt;
2739     toku::omt<txn_lt_key_ranges> *lt_map;
2740     toku::range_buffer::iterator iter;
2741     toku::range_buffer::iterator::record rec;
2742 };
2743 
iter_txn_row_locks_callback(DB ** db,DBT * left_key,DBT * right_key,void * extra)2744 static int iter_txn_row_locks_callback(DB **db, DBT *left_key, DBT *right_key, void *extra) {
2745     iter_txn_row_locks_callback_extra *info =
2746         reinterpret_cast<iter_txn_row_locks_callback_extra *>(extra);
2747 
2748     while (info->which_lt < info->lt_map->size()) {
2749         const bool more = info->iter.current(&info->rec);
2750         if (more) {
2751             *db = info->current_db;
2752             // The caller should interpret data/size == 0 to mean infinity.
2753             // Therefore, when we copyref pos/neg infinity into left/right_key,
2754             // the caller knows what we're talking about.
2755             toku_copyref_dbt(left_key, *info->rec.get_left_key());
2756             toku_copyref_dbt(right_key, *info->rec.get_right_key());
2757             info->iter.next();
2758             return 0;
2759         } else {
2760             info->which_lt++;
2761             if (info->which_lt < info->lt_map->size()) {
2762                 info->set_iterator_and_current_db();
2763             }
2764         }
2765     }
2766     return DB_NOTFOUND;
2767 }
2768 
2769 struct iter_txns_callback_extra {
iter_txns_callback_extraiter_txns_callback_extra2770     iter_txns_callback_extra(DB_ENV *e, iterate_transactions_callback cb, void *ex) :
2771         env(e), callback(cb), extra(ex) {
2772     }
2773     DB_ENV *env;
2774     iterate_transactions_callback callback;
2775     void *extra;
2776 };
2777 
iter_txns_callback(TOKUTXN txn,void * extra)2778 static int iter_txns_callback(TOKUTXN txn, void *extra) {
2779     int r = 0;
2780     iter_txns_callback_extra *info =
2781         reinterpret_cast<iter_txns_callback_extra *>(extra);
2782     DB_TXN *dbtxn = toku_txn_get_container_db_txn(txn);
2783     invariant_notnull(dbtxn);
2784     struct __toku_db_txn_internal *db_txn_internal __attribute__((__unused__)) = db_txn_struct_i(dbtxn);
2785     TOKU_VALGRIND_HG_DISABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2786     if (db_txn_struct_i(dbtxn)->tokutxn == txn) { // make sure that the dbtxn is fully initialized
2787         toku_mutex_lock(&db_txn_struct_i(dbtxn)->txn_mutex);
2788         toku_pthread_rwlock_rdlock(&info->env->i->open_dbs_rwlock);
2789 
2790         iter_txn_row_locks_callback_extra e(info->env, &db_txn_struct_i(dbtxn)->lt_map);
2791         r = info->callback(dbtxn, iter_txn_row_locks_callback, &e, info->extra);
2792 
2793         toku_pthread_rwlock_rdunlock(&info->env->i->open_dbs_rwlock);
2794         toku_mutex_unlock(&db_txn_struct_i(dbtxn)->txn_mutex);
2795     }
2796     TOKU_VALGRIND_HG_ENABLE_CHECKING(db_txn_internal, sizeof *db_txn_internal);
2797 
2798     return r;
2799 }
2800 
2801 static int
env_iterate_live_transactions(DB_ENV * env,iterate_transactions_callback callback,void * extra)2802 env_iterate_live_transactions(DB_ENV *env,
2803                               iterate_transactions_callback callback,
2804                               void *extra) {
2805     if (!env_opened(env)) {
2806         return EINVAL;
2807     }
2808 
2809     TXN_MANAGER txn_manager = toku_logger_get_txn_manager(env->i->logger);
2810     iter_txns_callback_extra e(env, callback, extra);
2811     return toku_txn_manager_iter_over_live_root_txns(txn_manager, iter_txns_callback, &e);
2812 }
2813 
env_set_loader_memory_size(DB_ENV * env,uint64_t (* get_loader_memory_size_callback)(void))2814 static void env_set_loader_memory_size(DB_ENV *env, uint64_t (*get_loader_memory_size_callback)(void)) {
2815     env->i->get_loader_memory_size_callback = get_loader_memory_size_callback;
2816 }
2817 
env_get_loader_memory_size(DB_ENV * env)2818 static uint64_t env_get_loader_memory_size(DB_ENV *env) {
2819     uint64_t memory_size = 0;
2820     if (env->i->get_loader_memory_size_callback)
2821         memory_size = env->i->get_loader_memory_size_callback();
2822     return memory_size;
2823 }
2824 
env_set_killed_callback(DB_ENV * env,uint64_t default_killed_time_msec,uint64_t (* get_killed_time_callback)(uint64_t default_killed_time_msec),int (* killed_callback)(void))2825 static void env_set_killed_callback(DB_ENV *env, uint64_t default_killed_time_msec, uint64_t (*get_killed_time_callback)(uint64_t default_killed_time_msec), int (*killed_callback)(void)) {
2826     env->i->default_killed_time_msec = default_killed_time_msec;
2827     env->i->get_killed_time_callback = get_killed_time_callback;
2828     env->i->killed_callback = killed_callback;
2829 }
2830 
env_kill_waiter(DB_ENV * env,void * extra)2831 static void env_kill_waiter(DB_ENV *env, void *extra) {
2832     env->i->ltm.kill_waiter(extra);
2833 }
2834 
env_do_backtrace(DB_ENV * env)2835 static void env_do_backtrace(DB_ENV *env) {
2836     if (env->i->errcall) {
2837         db_env_do_backtrace_errfunc((toku_env_err_func) toku_env_err, (const void *) env);
2838     }
2839     if (env->i->errfile) {
2840         db_env_do_backtrace((FILE *) env->i->errfile);
2841     } else {
2842         db_env_do_backtrace(stderr);
2843     }
2844 }
2845 
2846 static int
toku_env_create(DB_ENV ** envp,uint32_t flags)2847 toku_env_create(DB_ENV ** envp, uint32_t flags) {
2848     int r = ENOSYS;
2849     DB_ENV* result = NULL;
2850 
2851     if (flags!=0)    { r = EINVAL; goto cleanup; }
2852     MALLOC(result);
2853     if (result == 0) { r = ENOMEM; goto cleanup; }
2854     memset(result, 0, sizeof *result);
2855 
2856     // locked methods
2857     result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_env_err;
2858 #define SENV(name) result->name = locked_env_ ## name
2859     SENV(dbremove);
2860     SENV(dbrename);
2861     SENV(dirtool_attach);
2862     SENV(dirtool_detach);
2863     SENV(dirtool_move);
2864     //SENV(set_noticecall);
2865 #undef SENV
2866 #define USENV(name) result->name = env_ ## name
2867     // methods with locking done internally
2868     USENV(put_multiple);
2869     USENV(del_multiple);
2870     USENV(update_multiple);
2871     // unlocked methods
2872     USENV(open);
2873     USENV(close);
2874     USENV(set_default_bt_compare);
2875     USENV(set_update);
2876     USENV(set_generate_row_callback_for_put);
2877     USENV(set_generate_row_callback_for_del);
2878     USENV(set_lg_bsize);
2879     USENV(set_lg_dir);
2880     USENV(set_lg_max);
2881     USENV(get_lg_max);
2882     USENV(set_lk_max_memory);
2883     USENV(get_lk_max_memory);
2884     USENV(get_iname);
2885     USENV(set_errcall);
2886     USENV(set_errfile);
2887     USENV(set_errpfx);
2888     USENV(set_data_dir);
2889     USENV(checkpointing_set_period);
2890     USENV(checkpointing_get_period);
2891     USENV(cleaner_set_period);
2892     USENV(cleaner_get_period);
2893     USENV(cleaner_set_iterations);
2894     USENV(cleaner_get_iterations);
2895     USENV(evictor_set_enable_partial_eviction);
2896     USENV(evictor_get_enable_partial_eviction);
2897     USENV(set_cachesize);
2898     USENV(set_client_pool_threads);
2899     USENV(set_cachetable_pool_threads);
2900     USENV(set_checkpoint_pool_threads);
2901 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
2902     USENV(get_cachesize);
2903 #endif
2904 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR <= 4
2905     USENV(set_lk_max);
2906 #endif
2907     USENV(set_lk_detect);
2908     USENV(set_flags);
2909     USENV(set_tmp_dir);
2910     USENV(set_verbose);
2911     USENV(txn_recover);
2912     USENV(txn_xa_recover);
2913     USENV(get_txn_from_xid);
2914     USENV(txn_stat);
2915     USENV(get_lock_timeout);
2916     USENV(set_lock_timeout);
2917     USENV(set_lock_timeout_callback);
2918     USENV(set_redzone);
2919     USENV(log_flush);
2920     USENV(log_archive);
2921     USENV(create_loader);
2922     USENV(get_cursor_for_persistent_environment);
2923     USENV(get_cursor_for_directory);
2924     USENV(get_db_for_directory);
2925     USENV(iterate_pending_lock_requests);
2926     USENV(iterate_live_transactions);
2927     USENV(change_fsync_log_period);
2928     USENV(set_loader_memory_size);
2929     USENV(get_loader_memory_size);
2930     USENV(set_killed_callback);
2931     USENV(do_backtrace);
2932     USENV(set_check_thp);
2933     USENV(get_check_thp);
2934     USENV(set_dir_per_db);
2935     USENV(get_dir_per_db);
2936     USENV(get_data_dir);
2937     USENV(kill_waiter);
2938 #undef USENV
2939 
2940     // unlocked methods
2941     result->create_indexer = toku_indexer_create_indexer;
2942     result->txn_checkpoint = toku_env_txn_checkpoint;
2943     result->checkpointing_postpone = env_checkpointing_postpone;
2944     result->checkpointing_resume = env_checkpointing_resume;
2945     result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation;
2946     result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation;
2947     result->get_engine_status_num_rows = env_get_engine_status_num_rows;
2948     result->get_engine_status = env_get_engine_status;
2949     result->get_engine_status_text = env_get_engine_status_text;
2950     result->crash = env_crash;  // handlerton's call to fractal tree layer on failed assert
2951     result->txn_begin = toku_txn_begin;
2952 
2953     MALLOC(result->i);
2954     if (result->i == 0) { r = ENOMEM; goto cleanup; }
2955     memset(result->i, 0, sizeof *result->i);
2956     result->i->envdir_lockfd  = -1;
2957     result->i->datadir_lockfd = -1;
2958     result->i->logdir_lockfd  = -1;
2959     result->i->tmpdir_lockfd  = -1;
2960     env_fs_init(result);
2961     env_fsync_log_init(result);
2962 
2963     result->i->check_thp = true;
2964 
2965     result->i->bt_compare = toku_builtin_compare_fun;
2966 
2967     r = toku_logger_create(&result->i->logger);
2968     invariant_zero(r);
2969     invariant_notnull(result->i->logger);
2970 
2971     // Create the locktree manager, passing in the create/destroy/escalate callbacks.
2972     // The extra parameter for escalation is simply a pointer to this environment.
2973     // The escalate callback will need it to translate txnids to DB_TXNs
2974     result->i->ltm.create(toku_db_lt_on_create_callback, toku_db_lt_on_destroy_callback, toku_db_txn_escalate_callback, result);
2975 
2976     XMALLOC(result->i->open_dbs_by_dname);
2977     result->i->open_dbs_by_dname->create();
2978     XMALLOC(result->i->open_dbs_by_dict_id);
2979     result->i->open_dbs_by_dict_id->create();
2980     toku_pthread_rwlock_init(
2981         *result_i_open_dbs_rwlock_key, &result->i->open_dbs_rwlock, nullptr);
2982 
2983     *envp = result;
2984     r = 0;
2985     toku_sync_fetch_and_add(&tokuft_num_envs, 1);
2986 cleanup:
2987     if (r!=0) {
2988         if (result) {
2989             toku_free(result->i);
2990             toku_free(result);
2991         }
2992     }
2993     return r;
2994 }
2995 
2996 int
DB_ENV_CREATE_FUN(DB_ENV ** envp,uint32_t flags)2997 DB_ENV_CREATE_FUN (DB_ENV ** envp, uint32_t flags) {
2998     int r = toku_env_create(envp, flags);
2999     return r;
3000 }
3001 
3002 // return 0 if v and dbv refer to same db (including same dname)
3003 // return <0 if v is earlier in omt than dbv
3004 // return >0 if v is later in omt than dbv
3005 static int
find_db_by_db_dname(DB * const & db,DB * const & dbfind)3006 find_db_by_db_dname(DB *const &db, DB *const &dbfind) {
3007     int cmp;
3008     const char *dname     = db->i->dname;
3009     const char *dnamefind = dbfind->i->dname;
3010     cmp = strcmp(dname, dnamefind);
3011     if (cmp != 0) return cmp;
3012     if (db < dbfind) return -1;
3013     if (db > dbfind) return  1;
3014     return 0;
3015 }
3016 
3017 static int
find_db_by_db_dict_id(DB * const & db,DB * const & dbfind)3018 find_db_by_db_dict_id(DB *const &db, DB *const &dbfind) {
3019     DICTIONARY_ID dict_id = db->i->dict_id;
3020     DICTIONARY_ID dict_id_find = dbfind->i->dict_id;
3021     if (dict_id.dictid < dict_id_find.dictid) {
3022         return -1;
3023     } else if (dict_id.dictid > dict_id_find.dictid) {
3024         return 1;
3025     } else if (db < dbfind) {
3026         return -1;
3027     } else if (db > dbfind) {
3028         return 1;
3029     } else {
3030         return 0;
3031     }
3032 }
3033 
3034 // Tell env that there is a new db handle (with non-unique dname in db->i-dname)
3035 void
env_note_db_opened(DB_ENV * env,DB * db)3036 env_note_db_opened(DB_ENV *env, DB *db) {
3037     toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3038     assert(db->i->dname); // internal (non-user) dictionary has no dname
3039 
3040     int r;
3041     uint32_t idx;
3042 
3043     r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3044     assert(r == DB_NOTFOUND);
3045     r = env->i->open_dbs_by_dname->insert_at(db, idx);
3046     assert_zero(r);
3047     r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3048     assert(r == DB_NOTFOUND);
3049     r = env->i->open_dbs_by_dict_id->insert_at(db, idx);
3050     assert_zero(r);
3051 
3052     STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3053     STATUS_VALUE(YDB_LAYER_NUM_DB_OPEN)++;
3054     if (STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) > STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS)) {
3055         STATUS_VALUE(YDB_LAYER_MAX_OPEN_DBS) = STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS);
3056     }
3057     toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3058 }
3059 
3060 // Effect: Tell the DB_ENV that the DB is no longer in use by the user of the API.  The DB may still be in use by the fractal tree internals.
3061 void
env_note_db_closed(DB_ENV * env,DB * db)3062 env_note_db_closed(DB_ENV *env, DB *db) {
3063     toku_pthread_rwlock_wrlock(&env->i->open_dbs_rwlock);
3064     assert(db->i->dname); // internal (non-user) dictionary has no dname
3065     assert(env->i->open_dbs_by_dname->size() > 0);
3066     assert(env->i->open_dbs_by_dict_id->size() > 0);
3067 
3068     int r;
3069     uint32_t idx;
3070 
3071     r = env->i->open_dbs_by_dname->find_zero<DB *, find_db_by_db_dname>(db, nullptr, &idx);
3072     assert_zero(r);
3073     r = env->i->open_dbs_by_dname->delete_at(idx);
3074     assert_zero(r);
3075     r = env->i->open_dbs_by_dict_id->find_zero<DB *, find_db_by_db_dict_id>(db, nullptr, &idx);
3076     assert_zero(r);
3077     r = env->i->open_dbs_by_dict_id->delete_at(idx);
3078     assert_zero(r);
3079 
3080     STATUS_VALUE(YDB_LAYER_NUM_DB_CLOSE)++;
3081     STATUS_VALUE(YDB_LAYER_NUM_OPEN_DBS) = env->i->open_dbs_by_dname->size();
3082     toku_pthread_rwlock_wrunlock(&env->i->open_dbs_rwlock);
3083 }
3084 
3085 static int
find_open_db_by_dname(DB * const & db,const char * const & dnamefind)3086 find_open_db_by_dname(DB *const &db, const char *const &dnamefind) {
3087     return strcmp(db->i->dname, dnamefind);
3088 }
3089 
3090 // return true if there is any db open with the given dname
3091 static bool
env_is_db_with_dname_open(DB_ENV * env,const char * dname)3092 env_is_db_with_dname_open(DB_ENV *env, const char *dname) {
3093     DB *db;
3094     toku_pthread_rwlock_rdlock(&env->i->open_dbs_rwlock);
3095     int r = env->i->open_dbs_by_dname->find_zero<const char *, find_open_db_by_dname>(dname, &db, nullptr);
3096     if (r == 0) {
3097         invariant(strcmp(dname, db->i->dname) == 0);
3098     } else {
3099         invariant(r == DB_NOTFOUND);
3100     }
3101     toku_pthread_rwlock_rdunlock(&env->i->open_dbs_rwlock);
3102     return r == 0 ? true : false;
3103 }
3104 
3105 //We do not (yet?) support deleting subdbs by deleting the enclosing 'fname'
3106 static int
env_dbremove_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,int32_t flags)3107 env_dbremove_subdb(DB_ENV * env, DB_TXN * txn, const char *fname, const char *dbname, int32_t flags) {
3108     int r;
3109     if (!fname || !dbname) r = EINVAL;
3110     else {
3111         char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3112         int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3113         assert(bytes==(int)sizeof(subdb_full_name)-1);
3114         const char *null_subdbname = NULL;
3115         r = env_dbremove(env, txn, subdb_full_name, null_subdbname, flags);
3116     }
3117     return r;
3118 }
3119 
3120 // see if we can acquire a table lock for the given dname.
3121 // requires: write lock on dname in the directory. dictionary
3122 //          open, close, and begin checkpoint cannot occur.
3123 // returns: zero if we could open, lock, and close a dictionary
3124 //          with the given dname, errno otherwise.
3125 static int
can_acquire_table_lock(DB_ENV * env,DB_TXN * txn,const char * iname_in_env)3126 can_acquire_table_lock(DB_ENV *env, DB_TXN *txn, const char *iname_in_env) {
3127     int r;
3128     DB *db;
3129 
3130     r = toku_db_create(&db, env, 0);
3131     assert_zero(r);
3132     r = toku_db_open_iname(db, txn, iname_in_env, 0, 0);
3133     if(r) {
3134 	if (r == ENAMETOOLONG)
3135 	    toku_ydb_do_error(env, r, "File name too long!\n");
3136 	goto exit;
3137     }
3138     r = toku_db_pre_acquire_table_lock(db, txn);
3139     if (r) {
3140         r = DB_LOCK_NOTGRANTED;
3141     }
3142 exit:
3143     if(db) {
3144         int r2 = toku_db_close(db);
3145         assert_zero(r2);
3146     }
3147     return r;
3148 }
3149 
3150 static int
env_dbremove(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,uint32_t flags)3151 env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbname, uint32_t flags) {
3152     int r;
3153     HANDLE_PANICKED_ENV(env);
3154     if (!env_opened(env) || flags != 0) {
3155         return EINVAL;
3156     }
3157     HANDLE_READ_ONLY_TXN(txn);
3158     if (dbname != NULL) {
3159         // env_dbremove_subdb() converts (fname, dbname) to dname
3160         return env_dbremove_subdb(env, txn, fname, dbname, flags);
3161     }
3162 
3163     const char * dname = fname;
3164     assert(dbname == NULL);
3165 
3166     // We check for an open db here as a "fast path" to error.
3167     // We'll need to check again below to be sure.
3168     if (env_is_db_with_dname_open(env, dname)) {
3169         return toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3170     }
3171 
3172     DBT dname_dbt;
3173     DBT iname_dbt;
3174     toku_fill_dbt(&dname_dbt, dname, strlen(dname)+1);
3175     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3176 
3177     // get iname
3178     r = toku_db_get(env->i->directory, txn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3179     char *iname = (char *) iname_dbt.data;
3180     DB *db = NULL;
3181     if (r != 0) {
3182         if (r == DB_NOTFOUND) {
3183             r = ENOENT;
3184         }
3185         goto exit;
3186     }
3187     // remove (dname,iname) from directory
3188     r = toku_db_del(env->i->directory, txn, &dname_dbt, DB_DELETE_ANY, true);
3189     if (r != 0) {
3190         goto exit;
3191     }
3192     r = toku_db_create(&db, env, 0);
3193     lazy_assert_zero(r);
3194     r = toku_db_open_iname(db, txn, iname, 0, 0);
3195     if (txn && r) {
3196         if (r == EMFILE || r == ENFILE)
3197             r = toku_ydb_do_error(env, r, "toku dbremove failed because open file limit reached\n");
3198         else if (r != ENOENT)
3199             r = toku_ydb_do_error(env, r, "toku dbremove failed\n");
3200         else
3201             r = 0;
3202         goto exit;
3203     }
3204     if (txn) {
3205         // Now that we have a writelock on dname, verify that there are still no handles open. (to prevent race conditions)
3206         if (env_is_db_with_dname_open(env, dname)) {
3207             r = toku_ydb_do_error(env, EINVAL, "Cannot remove dictionary with an open handle.\n");
3208             goto exit;
3209         }
3210         // we know a live db handle does not exist.
3211         //
3212         // use the internally opened db to try and get a table lock
3213         //
3214         // if we can't get it, then some txn needs the ft and we
3215         // should return lock not granted.
3216         //
3217         // otherwise, we're okay in marking this ft as remove on
3218         // commit. no new handles can open for this dictionary
3219         // because the txn has directory write locks on the dname
3220         r = toku_db_pre_acquire_table_lock(db, txn);
3221         if (r != 0) {
3222             r = DB_LOCK_NOTGRANTED;
3223             goto exit;
3224         }
3225         // The ft will be unlinked when the txn commits
3226         toku_ft_unlink_on_commit(db->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
3227     }
3228     else {
3229         // unlink the ft without a txn
3230         toku_ft_unlink(db->i->ft_handle);
3231     }
3232 
3233 exit:
3234     if (db) {
3235         int ret = toku_db_close(db);
3236         assert(ret == 0);
3237     }
3238     if (iname) {
3239         toku_free(iname);
3240     }
3241     return r;
3242 }
3243 
3244 static int
env_dbrename_subdb(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3245 env_dbrename_subdb(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3246     int r;
3247     if (!fname || !dbname || !newname) r = EINVAL;
3248     else {
3249         char subdb_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3250         {
3251             int bytes = snprintf(subdb_full_name, sizeof(subdb_full_name), "%s/%s", fname, dbname);
3252             assert(bytes==(int)sizeof(subdb_full_name)-1);
3253         }
3254         char new_full_name[strlen(fname) + sizeof("/") + strlen(dbname)];
3255         {
3256             int bytes = snprintf(new_full_name, sizeof(new_full_name), "%s/%s", fname, dbname);
3257             assert(bytes==(int)sizeof(new_full_name)-1);
3258         }
3259         const char *null_subdbname = NULL;
3260         r = env_dbrename(env, txn, subdb_full_name, null_subdbname, new_full_name, flags);
3261     }
3262     return r;
3263 }
3264 
3265 static int
env_dbrename(DB_ENV * env,DB_TXN * txn,const char * fname,const char * dbname,const char * newname,uint32_t flags)3266 env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbname, const char *newname, uint32_t flags) {
3267     int r;
3268     HANDLE_PANICKED_ENV(env);
3269     if (!env_opened(env) || flags != 0) {
3270         return EINVAL;
3271     }
3272     HANDLE_READ_ONLY_TXN(txn);
3273     if (dbname != NULL) {
3274         // env_dbrename_subdb() converts (fname, dbname) to dname and (fname, newname) to newdname
3275         return env_dbrename_subdb(env, txn, fname, dbname, newname, flags);
3276     }
3277 
3278     const char * dname = fname;
3279     assert(dbname == NULL);
3280 
3281     // We check for open dnames for the old and new name as a "fast path" to error.
3282     // We will need to check these again later.
3283     if (env_is_db_with_dname_open(env, dname)) {
3284         return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3285     }
3286     if (env_is_db_with_dname_open(env, newname)) {
3287         return toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3288     }
3289 
3290     DBT old_dname_dbt;
3291     DBT new_dname_dbt;
3292     DBT iname_dbt;
3293     toku_fill_dbt(&old_dname_dbt, dname, strlen(dname)+1);
3294     toku_fill_dbt(&new_dname_dbt, newname, strlen(newname)+1);
3295     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3296 
3297     // get iname
3298     r = toku_db_get(env->i->directory, txn, &old_dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3299     char *iname = (char *) iname_dbt.data;
3300     if (r == DB_NOTFOUND) {
3301         r = ENOENT;
3302     } else if (r == 0) {
3303         // verify that newname does not already exist
3304         r = db_getf_set(env->i->directory, txn, DB_SERIALIZABLE, &new_dname_dbt, ydb_getf_do_nothing, NULL);
3305         if (r == 0) {
3306             r = EEXIST;
3307         }
3308         else if (r == DB_NOTFOUND) {
3309             DBT new_iname_dbt;
3310             // Do not rename ft file if 'dir_per_db' option is not set
3311             auto new_iname =
3312                 env->get_dir_per_db(env)
3313                     ? generate_iname_for_rename_or_open(
3314                           env, txn, newname, false)
3315                     : std::unique_ptr<char[], decltype(&toku_free)>(
3316                           toku_strdup(iname), &toku_free);
3317             toku_fill_dbt(
3318                 &new_iname_dbt, new_iname.get(), strlen(new_iname.get()) + 1);
3319 
3320             // remove old (dname,iname) and insert (newname,iname) in directory
3321             r = toku_db_del(env->i->directory, txn, &old_dname_dbt, DB_DELETE_ANY, true);
3322             if (r != 0) { goto exit; }
3323 
3324             // Do not rename ft file if 'dir_per_db' option is not set
3325             if (env->get_dir_per_db(env))
3326                 r = toku_ft_rename_iname(txn,
3327                                          env->get_data_dir(env),
3328                                          iname,
3329                                          new_iname.get(),
3330                                          env->i->cachetable);
3331 
3332             r = toku_db_put(env->i->directory,
3333                             txn,
3334                             &new_dname_dbt,
3335                             &new_iname_dbt,
3336                             0,
3337                             true);
3338             if (r != 0) { goto exit; }
3339 
3340             //Now that we have writelocks on both dnames, verify that there are still no handles open. (to prevent race conditions)
3341             if (env_is_db_with_dname_open(env, dname)) {
3342                 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary with an open handle.\n");
3343                 goto exit;
3344             }
3345             if (env_is_db_with_dname_open(env, newname)) {
3346                 r = toku_ydb_do_error(env, EINVAL, "Cannot rename dictionary; Dictionary with target name has an open handle.\n");
3347                 goto exit;
3348             }
3349 
3350             // we know a live db handle does not exist.
3351             //
3352             // use the internally opened db to try and get a table lock
3353             //
3354             // if we can't get it, then some txn needs the ft and we
3355             // should return lock not granted.
3356             //
3357             // otherwise, we're okay in marking this ft as remove on
3358             // commit. no new handles can open for this dictionary
3359             // because the txn has directory write locks on the dname
3360             if (txn) {
3361                 r = can_acquire_table_lock(env, txn, new_iname.get());
3362             }
3363             // We don't do anything at the ft or cachetable layer for rename.
3364             // We just update entries in the environment's directory.
3365         }
3366     }
3367 
3368 exit:
3369     if (iname) {
3370         toku_free(iname);
3371     }
3372     return r;
3373 }
3374 
3375 int
DB_CREATE_FUN(DB ** db,DB_ENV * env,uint32_t flags)3376 DB_CREATE_FUN (DB ** db, DB_ENV * env, uint32_t flags) {
3377     int r = toku_db_create(db, env, flags);
3378     return r;
3379 }
3380 
3381 /* need db_strerror_r for multiple threads */
3382 
3383 const char *
db_strerror(int error)3384 db_strerror(int error) {
3385     char *errorstr;
3386     if (error >= 0) {
3387         errorstr = strerror(error);
3388         if (errorstr)
3389             return errorstr;
3390     }
3391 
3392     switch (error) {
3393         case DB_BADFORMAT:
3394             return "Database Bad Format (probably a corrupted database)";
3395         case DB_NOTFOUND:
3396             return "Not found";
3397         case TOKUDB_OUT_OF_LOCKS:
3398             return "Out of locks";
3399         case TOKUDB_DICTIONARY_TOO_OLD:
3400             return "Dictionary too old for this version of PerconaFT";
3401         case TOKUDB_DICTIONARY_TOO_NEW:
3402             return "Dictionary too new for this version of PerconaFT";
3403         case TOKUDB_CANCELED:
3404             return "User cancelled operation";
3405         case TOKUDB_NO_DATA:
3406             return "Ran out of data (not EOF)";
3407         case TOKUDB_HUGE_PAGES_ENABLED:
3408             return "Transparent huge pages are enabled but PerconaFT's memory allocator will oversubscribe main memory with transparent huge pages.  This check can be disabled by setting the environment variable TOKU_HUGE_PAGES_OK.";
3409     }
3410 
3411     static char unknown_result[100];    // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of null-terminated string.
3412     errorstr = unknown_result;
3413     snprintf(errorstr, sizeof unknown_result, "Unknown error code: %d", error);
3414     return errorstr;
3415 }
3416 
3417 const char *
db_version(int * major,int * minor,int * patch)3418 db_version(int *major, int *minor, int *patch) {
3419     if (major)
3420         *major = DB_VERSION_MAJOR;
3421     if (minor)
3422         *minor = DB_VERSION_MINOR;
3423     if (patch)
3424         *patch = DB_VERSION_PATCH;
3425     return toku_product_name_strings.db_version;
3426 }
3427 
3428 // HACK: To ensure toku_pthread_yield gets included in the .so
3429 // non-static would require a prototype in a header
3430 // static (since unused) would give a warning
3431 // static + unused would not actually help toku_pthread_yield get in the .so
3432 // static + used avoids all the warnings and makes sure toku_pthread_yield is in the .so
3433 static void __attribute__((__used__))
include_toku_pthread_yield(void)3434 include_toku_pthread_yield (void) {
3435     toku_pthread_yield();
3436 }
3437 
3438 // For test purposes only, translate dname to iname
3439 // YDB lock is NOT held when this function is called,
3440 // as it is called by user
3441 static int
env_get_iname(DB_ENV * env,DBT * dname_dbt,DBT * iname_dbt)3442 env_get_iname(DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) {
3443     DB *directory = env->i->directory;
3444     int r = autotxn_db_get(directory, NULL, dname_dbt, iname_dbt, DB_SERIALIZABLE|DB_PRELOCKED); // allocates memory for iname
3445     return r;
3446 }
3447 
3448 // TODO 2216:  Patch out this (dangerous) function when loader is working and
3449 //             we don't need to test the low-level redirect anymore.
3450 // for use by test programs only, just a wrapper around ft call:
3451 int
toku_test_db_redirect_dictionary(DB * db,const char * dname_of_new_file,DB_TXN * dbtxn)3452 toku_test_db_redirect_dictionary(DB * db, const char * dname_of_new_file, DB_TXN *dbtxn) {
3453     int r;
3454     DBT dname_dbt;
3455     DBT iname_dbt;
3456     char * new_iname_in_env;
3457 
3458     FT_HANDLE ft_handle = db->i->ft_handle;
3459     TOKUTXN tokutxn = db_txn_struct_i(dbtxn)->tokutxn;
3460 
3461     toku_fill_dbt(&dname_dbt, dname_of_new_file, strlen(dname_of_new_file)+1);
3462     toku_init_dbt_flags(&iname_dbt, DB_DBT_REALLOC);
3463     r = toku_db_get(db->dbenv->i->directory, dbtxn, &dname_dbt, &iname_dbt, DB_SERIALIZABLE);  // allocates memory for iname
3464     assert_zero(r);
3465     new_iname_in_env = (char *) iname_dbt.data;
3466 
3467     toku_multi_operation_client_lock(); //Must hold MO lock for dictionary_redirect.
3468     r = toku_dictionary_redirect(new_iname_in_env, ft_handle, tokutxn);
3469     toku_multi_operation_client_unlock();
3470 
3471     toku_free(new_iname_in_env);
3472     return r;
3473 }
3474 
3475 //Tets only function
3476 uint64_t
toku_test_get_latest_lsn(DB_ENV * env)3477 toku_test_get_latest_lsn(DB_ENV *env) {
3478     LSN rval = ZERO_LSN;
3479     if (env && env->i->logger) {
3480         rval = toku_logger_last_lsn(env->i->logger);
3481     }
3482     return rval.lsn;
3483 }
3484 
toku_set_test_txn_sync_callback(void (* cb)(pthread_t,void *),void * extra)3485 void toku_set_test_txn_sync_callback(void (* cb) (pthread_t, void *), void * extra) {
3486     set_test_txn_sync_callback(cb, extra);
3487 }
3488 
3489 int
toku_test_get_checkpointing_user_data_status(void)3490 toku_test_get_checkpointing_user_data_status (void) {
3491     return toku_cachetable_get_checkpointing_user_data_status();
3492 }
3493 
3494 #undef STATUS_VALUE
3495 #undef PERSISTENT_UPGRADE_STATUS_VALUE
3496 
3497 #include <toku_race_tools.h>
3498 void __attribute__((constructor)) toku_ydb_helgrind_ignore(void);
3499 void
toku_ydb_helgrind_ignore(void)3500 toku_ydb_helgrind_ignore(void) {
3501     TOKU_VALGRIND_HG_DISABLE_CHECKING(&ydb_layer_status, sizeof ydb_layer_status);
3502 }
3503