1 /*
2 Copyright 2021 Northern.tech AS
3
4 This file is part of CFEngine 3 - written and maintained by Northern.tech AS.
5
6 This program is free software; you can redistribute it and/or modify it
7 under the terms of the GNU General Public License as published by the
8 Free Software Foundation; version 3.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA
18
19 To the extent this program is licensed as part of the Enterprise
20 versions of CFEngine, the applicable Commercial Open Source License
21 (COSL) may apply to this file if you as a licensee so wish it. See
22 included file COSL.txt.
23 */
24
25 /*
26 * Implementation using LMDB API.
27 */
28
29 #include <cf3.defs.h>
30 #include <dbm_priv.h>
31 #include <logging.h>
32 #include <string_lib.h>
33 #include <misc_lib.h>
34 #include <file_lib.h>
35 #include <known_dirs.h>
36 #include <bootstrap.h>
37
38 #ifdef LMDB
39
40 #include <lmdb.h>
41 #include <repair.h>
42 #include <global_mutex.h> /* cf_db_corruption_lock */
43 #include <mutex.h>
44 #include <time.h> /* time() */
45
46 // Shared between threads.
47 struct DBPriv_
48 {
49 MDB_env *env;
50 MDB_dbi dbi;
51 // Used to keep track of transactions.
52 // We set this to the transaction address when a thread creates a
53 // transaction, and back to 0x0 when it is destroyed.
54 pthread_key_t txn_key;
55 };
56
57 // Not shared between threads.
58 typedef struct DBTxn_
59 {
60 MDB_txn *txn;
61 // Whether txn is a read/write (true) or read-only (false) transaction.
62 bool rw_txn;
63 bool cursor_open;
64 } DBTxn;
65
66 struct DBCursorPriv_
67 {
68 DBPriv *db;
69 MDB_cursor *mc;
70 MDB_val delkey;
71 void *curkv;
72 bool pending_delete;
73 };
74
75 static int DB_MAX_READERS = -1;
76
77 #define N_LMDB_EINVAL_RETRIES 5
78
79 /******************************************************************************/
80
81 static void HandleLMDBCorruption(MDB_env *env, const char *msg);
82
CheckLMDBCorrupted(int rc,MDB_env * env)83 static inline void CheckLMDBCorrupted(int rc, MDB_env *env)
84 {
85 if (rc == MDB_CORRUPTED)
86 {
87 HandleLMDBCorruption(env, "");
88 }
89 }
90
GetReadTransaction(DBPriv * const db,DBTxn ** const txn)91 static int GetReadTransaction(DBPriv *const db, DBTxn **const txn)
92 {
93 assert(db != NULL);
94 assert(txn != NULL);
95
96 DBTxn *db_txn = pthread_getspecific(db->txn_key);
97 int rc = MDB_SUCCESS;
98
99 if (db_txn == NULL)
100 {
101 db_txn = xcalloc(1, sizeof(DBTxn));
102 pthread_setspecific(db->txn_key, db_txn);
103 }
104
105 if (db_txn->txn == NULL)
106 {
107 rc = mdb_txn_begin(db->env, NULL, MDB_RDONLY, &db_txn->txn);
108 if (rc != MDB_SUCCESS)
109 {
110 Log(LOG_LEVEL_ERR, "Unable to open read transaction in '%s': %s",
111 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
112 }
113 }
114
115 *txn = db_txn;
116
117 return rc;
118 }
119
GetWriteTransaction(DBPriv * const db,DBTxn ** const txn)120 static int GetWriteTransaction(DBPriv *const db, DBTxn **const txn)
121 {
122 assert(db != NULL);
123 assert(txn != NULL);
124
125 DBTxn *db_txn = pthread_getspecific(db->txn_key);
126 int rc = MDB_SUCCESS;
127
128 if (db_txn == NULL)
129 {
130 db_txn = xcalloc(1, sizeof(DBTxn));
131 pthread_setspecific(db->txn_key, db_txn);
132 }
133
134 if (db_txn->txn != NULL && !db_txn->rw_txn)
135 {
136 rc = mdb_txn_commit(db_txn->txn);
137 CheckLMDBCorrupted(rc, db->env);
138 if (rc != MDB_SUCCESS)
139 {
140 Log(LOG_LEVEL_ERR, "Unable to close read-only transaction in '%s': %s",
141 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
142 }
143 db_txn->txn = NULL;
144 }
145
146 if (db_txn->txn == NULL)
147 {
148 rc = mdb_txn_begin(db->env, NULL, 0, &db_txn->txn);
149 CheckLMDBCorrupted(rc, db->env);
150 if (rc == MDB_SUCCESS)
151 {
152 db_txn->rw_txn = true;
153 }
154 else
155 {
156 Log(LOG_LEVEL_ERR, "Unable to open write transaction in '%s': %s",
157 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
158 }
159 }
160
161 *txn = db_txn;
162
163 return rc;
164 }
165
AbortTransaction(DBPriv * const db)166 static void AbortTransaction(DBPriv *const db)
167 {
168 assert(db != NULL);
169
170 DBTxn *db_txn = pthread_getspecific(db->txn_key);
171 if (db_txn != NULL)
172 {
173 if (db_txn->txn != NULL)
174 {
175 mdb_txn_abort(db_txn->txn);
176 }
177
178 pthread_setspecific(db->txn_key, NULL);
179 free(db_txn);
180 }
181 }
182
DestroyTransaction(void * const ptr)183 static void DestroyTransaction(void *const ptr)
184 {
185 DBTxn *const db_txn = (DBTxn *)ptr;
186 UnexpectedError("Transaction object still exists when terminating thread");
187 if (db_txn->txn)
188 {
189 UnexpectedError("Transaction still open when terminating thread!");
190 mdb_txn_abort(db_txn->txn);
191 }
192 free(db_txn);
193 }
194
DBPrivGetFileExtension(void)195 const char *DBPrivGetFileExtension(void)
196 {
197 return "lmdb";
198 }
199
200 #ifndef LMDB_MAXSIZE
201 #define LMDB_MAXSIZE 104857600
202 #endif
203
DBPrivSetMaximumConcurrentTransactions(const int max_txn)204 void DBPrivSetMaximumConcurrentTransactions(const int max_txn)
205 {
206 DB_MAX_READERS = max_txn;
207 }
208
LmdbEnvOpen(MDB_env * const env,const char * const path,const unsigned int flags,const mdb_mode_t mode)209 static int LmdbEnvOpen(
210 MDB_env *const env,
211 const char *const path,
212 const unsigned int flags,
213 const mdb_mode_t mode)
214 {
215 assert(env != NULL); // dereferenced in lmdb (mdb_env_open)
216 assert(path != NULL); // dereferenced (strlen) in lmdb (mdb_env_open)
217 assert(mdb_env_get_maxkeysize(env) == 511); // Search for 511 in locks.c
218
219 /* There is a race condition in LMDB that will fail to open the database
220 * environment if another process is opening it at the exact same time. This
221 * condition is signaled by returning ENOENT, which we should never get
222 * otherwise. This can lead to error messages on a heavily loaded machine,
223 * so try to open it again after allowing other threads to finish their
224 * opening process. */
225 int attempts = 5;
226 while (attempts-- > 0)
227 {
228 int rc = mdb_env_open(env, path, flags, mode);
229 if (rc != ENOENT)
230 {
231 return rc;
232 }
233
234 #if HAVE_DECL_SCHED_YIELD && defined(HAVE_SCHED_YIELD)
235 // Not required for this to work, but makes it less likely that the race
236 // condition will persist.
237 sched_yield();
238 #endif
239 }
240
241 // Return EBUSY for an error message slightly more related to reality.
242 return EBUSY;
243 }
244
245 /**
246 * @warning Expects @fd_stamp to be locked.
247 */
RepairedAfterOpen(const char * lmdb_file,int fd_tstamp)248 static bool RepairedAfterOpen(const char *lmdb_file, int fd_tstamp)
249 {
250 time_t repaired_tstamp = -1;
251 ssize_t n_read = read(fd_tstamp, &repaired_tstamp, sizeof(time_t));
252 lseek(fd_tstamp, 0, SEEK_SET);
253
254 if (n_read < 0)
255 {
256 Log(LOG_LEVEL_ERR, "Failed to read %s: %s", lmdb_file, GetErrorStr());
257 }
258 else if (n_read == 0)
259 {
260 /* EOF (empty file) => never repaired */
261 Log(LOG_LEVEL_VERBOSE, "DB '%s' never repaired before", lmdb_file);
262 }
263 else if ((size_t) n_read < sizeof(time_t))
264 {
265 /* error */
266 Log(LOG_LEVEL_ERR, "Failed to read the timestamp of repair of the '%s' DB",
267 lmdb_file);
268 }
269 else
270 {
271 /* read the timestamp => Check if the LMDB file was repaired after
272 * we opened it last time. Or, IOW, if this is a new corruption or
273 * an already-handled one. */
274 DBHandle *handle = GetDBHandleFromFilename(lmdb_file);
275 if (repaired_tstamp > GetDBOpenTimestamp(handle))
276 {
277 return true;
278 }
279 }
280 return false;
281 }
282
HandleLMDBCorruption(MDB_env * env,const char * msg)283 static void HandleLMDBCorruption(MDB_env *env, const char *msg)
284 {
285 const char *lmdb_file = mdb_env_get_userctx(env);
286 Log(LOG_LEVEL_CRIT, "Corruption in the '%s' DB detected! %s",
287 lmdb_file, msg);
288
289 /* Freeze the DB ASAP. This also makes the call to exit() safe regarding
290 * this particular DB because exit handlers will ignore it. */
291 DBHandle *handle = GetDBHandleFromFilename(lmdb_file);
292 FreezeDB(handle);
293
294 #ifdef __MINGW32__
295 /* Not much we can do on Windows because there is no fork() and file locking
296 * is also not so nice. */
297 Log(LOG_LEVEL_WARNING, "Removing the corrupted DB file '%s'",
298 lmdb_file);
299 if (unlink(lmdb_file) != 0)
300 {
301 Log(LOG_LEVEL_CRIT, "Failed to remove the corrupted DB file '%s'",
302 lmdb_file);
303 exit(EC_CORRUPTION_REPAIR_FAILED);
304 }
305 exit(EC_CORRUPTION_REPAIRED);
306 #else
307 /* Try to handle the corruption gracefully by repairing the LMDB file
308 * (replacing it with a new LMDB file with all the data we managed to read
309 * from the corrupted one). */
310
311 /* To avoid two processes acting on the same corrupted file at once, file
312 * locks are involved. Looking at OpenDBInstance() and DBPathLock()
313 * in libpromises/db_api.c might also be useful.*/
314
315 /* Only allow one thread at a time to handle DB corruption. File locks are
316 * *process* specific so threads could step on each others toes. */
317 ThreadLock(cft_db_corruption_lock);
318
319 char *tstamp_file = StringFormat("%s.repaired", lmdb_file);
320 char *db_lock_file = StringFormat("%s.lock", lmdb_file);
321
322 int fd_tstamp = safe_open(tstamp_file, O_CREAT|O_RDWR);
323 if (fd_tstamp == -1)
324 {
325 Log(LOG_LEVEL_CRIT, "Failed to open the '%s' DB repair timestamp file",
326 lmdb_file);
327 ThreadUnlock(cft_db_corruption_lock);
328 free(db_lock_file);
329 free(tstamp_file);
330
331 exit(EC_CORRUPTION_REPAIR_FAILED);
332 }
333 FileLock tstamp_lock = { .fd = fd_tstamp };
334
335 int fd_db_lock = safe_open(db_lock_file, O_CREAT|O_RDWR);
336 if (fd_db_lock == -1)
337 {
338 Log(LOG_LEVEL_CRIT, "Failed to open the '%s' DB lock file",
339 lmdb_file);
340 ThreadUnlock(cft_db_corruption_lock);
341 close(fd_tstamp);
342 free(db_lock_file);
343 free(tstamp_file);
344
345 exit(EC_CORRUPTION_REPAIR_FAILED);
346 }
347 FileLock db_lock = { .fd = fd_db_lock };
348
349 int ret;
350 bool handle_corruption = true;
351
352 /* Make sure we are not holding the DB's lock (potentially needed by some
353 * other process for the repair) to avoid deadlocks. */
354 Log(LOG_LEVEL_DEBUG, "Releasing lock on the '%s' DB", lmdb_file);
355 ExclusiveFileUnlock(&db_lock, false); /* close=false */
356
357 ret = SharedFileLock(&tstamp_lock, true);
358 if (ret == 0)
359 {
360 if (RepairedAfterOpen(lmdb_file, fd_tstamp))
361 {
362 /* The corruption has already been handled. This process should
363 * just die because we have no way to return to the point where
364 * it would just open the new (repaired) LMDB file. */
365 handle_corruption = false;
366 }
367 SharedFileUnlock(&tstamp_lock, false);
368 }
369 else
370 {
371 /* should never happen (we tried to wait), but if it does, just log an
372 * error and keep going */
373 Log(LOG_LEVEL_ERR,
374 "Failed to get shared lock for the repair timestamp of the '%s' DB",
375 lmdb_file);
376 }
377
378 if (!handle_corruption)
379 {
380 /* Just clean after ourselves and terminate the process. */
381 ThreadUnlock(cft_db_corruption_lock);
382 close(fd_db_lock);
383 close(fd_tstamp);
384 free(db_lock_file);
385 free(tstamp_file);
386
387 exit(EC_CORRUPTION_REPAIRED);
388 }
389
390 /* HERE is a window for some other process to do the repair between when we
391 * checked the timestamp using the shared lock above and the attempt to get
392 * the exclusive lock right below. However, this is detected by checking the
393 * contents of the timestamp file again below, while holding the EXCLUSIVE
394 * lock. */
395
396 ret = ExclusiveFileLock(&tstamp_lock, true);
397 if (ret != 0)
398 {
399 /* should never happen (we tried to wait), but if it does, just
400 * terminate because doing the repair without the lock could be
401 * disasterous */
402 Log(LOG_LEVEL_ERR,
403 "Failed to get shared lock for the repair timestamp of the '%s' DB",
404 lmdb_file);
405
406 ThreadUnlock(cft_db_corruption_lock);
407 close(fd_db_lock);
408 close(fd_tstamp);
409 free(db_lock_file);
410 free(tstamp_file);
411
412 exit(EC_CORRUPTION_REPAIR_FAILED);
413 }
414
415 /* Cleared to resolve the corruption. */
416
417 /* 1. Acquire the lock for the DB to prevent more processes trying to use
418 * it while it is corrupted (wait till the lock is available). */
419 while (ExclusiveFileLock(&db_lock, false) == -1)
420 {
421 /* busy wait to do the logging */
422 Log(LOG_LEVEL_INFO, "Waiting for the lock on the '%s' DB",
423 lmdb_file);
424 sleep(1);
425 }
426
427 /* 2. Check the last repair timestamp again (see the big "HERE..." comment
428 * above) */
429 if (RepairedAfterOpen(lmdb_file, fd_tstamp))
430 {
431 /* Some other process repaired the DB since we checked last time,
432 * nothing more to do here. */
433 ThreadUnlock(cft_db_corruption_lock);
434 close(fd_db_lock); /* releases locks */
435 close(fd_tstamp); /* releases locks */
436 free(db_lock_file);
437 free(tstamp_file);
438
439 exit(EC_CORRUPTION_REPAIRED);
440 }
441
442 /* 3. Repair the DB or at least move it out of the way. */
443 /* repair_lmdb_file() forks so it is safe (file locks are not
444 * inherited). */
445 ret = repair_lmdb_file(lmdb_file, fd_tstamp);
446
447 /* repair_lmdb_file returns -1 in case of error, 0 in case of successfull
448 * repair, >0 in case of failed repair, but successful remove */
449 bool repair_successful = (ret != -1);
450 if (repair_successful)
451 {
452 Log(LOG_LEVEL_NOTICE, "DB '%s' successfully repaired",
453 lmdb_file);
454 }
455 else
456 {
457 Log(LOG_LEVEL_CRIT, "Failed to repair DB '%s'", lmdb_file);
458 }
459
460 /* 4. Make the repaired DB available for others. Also release the locks
461 * in the opposite order in which they were acquired to avoid
462 * deadlocks. */
463 if (ExclusiveFileUnlock(&db_lock, true) != 0)
464 {
465 Log(LOG_LEVEL_ERR, "Failed to release the acquired lock for '%s'",
466 db_lock_file);
467 }
468
469 /* 5. Signal that the repair is done (also closes fd_tstamp). */
470 if (ExclusiveFileUnlock(&tstamp_lock, true) != 0)
471 {
472 Log(LOG_LEVEL_ERR, "Failed to release the acquired lock for '%s'",
473 tstamp_file);
474 }
475
476 ThreadUnlock(cft_db_corruption_lock);
477 free(db_lock_file);
478 free(tstamp_file);
479 /* fd_db_lock and fd_tstamp are already closed by the calls to
480 * ExclusiveFileUnlock above. */
481
482 if (repair_successful)
483 {
484 exit(EC_CORRUPTION_REPAIRED);
485 }
486 else
487 {
488 exit(EC_CORRUPTION_REPAIR_FAILED);
489 }
490 #endif /* __MINGW32__ */
491 }
492
DBPrivOpenDB(const char * const dbpath,const dbid id)493 DBPriv *DBPrivOpenDB(const char *const dbpath, const dbid id)
494 {
495 DBPriv *const db = xcalloc(1, sizeof(DBPriv));
496 MDB_txn *txn = NULL;
497
498 int rc = pthread_key_create(&db->txn_key, &DestroyTransaction);
499 if (rc)
500 {
501 Log(LOG_LEVEL_ERR, "Could not create transaction key. (pthread_key_create: '%s')",
502 GetErrorStrFromCode(rc));
503 free(db);
504 return NULL;
505 }
506
507 rc = mdb_env_create(&db->env);
508 if (rc)
509 {
510 Log(LOG_LEVEL_ERR, "Could not create handle for database %s: %s",
511 dbpath, mdb_strerror(rc));
512 goto err;
513 }
514 rc = mdb_env_set_userctx(db->env, xstrdup(dbpath));
515 if (rc != MDB_SUCCESS)
516 {
517 Log(LOG_LEVEL_WARNING, "Could not store DB file path (%s) in the DB context",
518 dbpath);
519 }
520 rc = mdb_env_set_assert(db->env, (MDB_assert_func*) HandleLMDBCorruption);
521 if (rc != MDB_SUCCESS)
522 {
523 Log(LOG_LEVEL_WARNING, "Could not set the corruption handler for '%s'",
524 dbpath);
525 }
526 rc = mdb_env_set_mapsize(db->env, LMDB_MAXSIZE);
527 if (rc)
528 {
529 Log(LOG_LEVEL_ERR, "Could not set mapsize for database %s: %s",
530 dbpath, mdb_strerror(rc));
531 goto err;
532 }
533 if (DB_MAX_READERS > 0)
534 {
535 rc = mdb_env_set_maxreaders(db->env, DB_MAX_READERS);
536 if (rc)
537 {
538 Log(LOG_LEVEL_ERR, "Could not set maxreaders for database %s: %s",
539 dbpath, mdb_strerror(rc));
540 goto err;
541 }
542 }
543
544 unsigned int open_flags = MDB_NOSUBDIR;
545 #if !defined(_AIX) && !defined(__sun)
546 /* The locks and lastseen (on hubs) DBs are heavily used and using
547 * MDB_NOSYNC increases performance. However, AIX and Solaris often suffer
548 * from some serious issues with consistency (ENT-4002) so it's better to
549 * sacrifice some performance there in favor of stability. */
550 if (id == dbid_locks || (GetAmPolicyHub() && id == dbid_lastseen))
551 {
552 open_flags |= MDB_NOSYNC;
553 }
554 #endif
555
556 #ifdef __hpux
557 /*
558 * On HP-UX, a unified file cache was not introduced until version 11.31.
559 * This means that on 11.23 there are separate file caches for mmap()'ed
560 * files and open()'ed files. When these two are mixed, changes made using
561 * one mode won't be immediately seen by the other mode, which is an
562 * assumption LMDB is relying on. The MDB_WRITEMAP flag causes LMDB to use
563 * mmap() only, so that we stay within one file cache.
564 */
565 open_flags |= MDB_WRITEMAP;
566 #endif
567
568 rc = LmdbEnvOpen(db->env, dbpath, open_flags, CF_PERMS_DEFAULT);
569 if (rc)
570 {
571 Log(LOG_LEVEL_ERR, "Could not open database %s: %s",
572 dbpath, mdb_strerror(rc));
573 if (rc == MDB_CORRUPTED || rc == MDB_INVALID)
574 {
575 HandleLMDBCorruption(db->env, mdb_strerror(rc));
576 }
577 goto err;
578 }
579 if (DB_MAX_READERS > 0)
580 {
581 int max_readers;
582 rc = mdb_env_get_maxreaders(db->env, &max_readers);
583 if (rc)
584 {
585 Log(LOG_LEVEL_ERR, "Could not get maxreaders for database %s: %s",
586 dbpath, mdb_strerror(rc));
587 goto err;
588 }
589 if (max_readers < DB_MAX_READERS)
590 {
591 // LMDB will only reinitialize maxreaders if no database handles are
592 // open, including in other processes, which is how we might end up
593 // here.
594 Log(LOG_LEVEL_VERBOSE, "Failed to set LMDB max reader limit on database '%s', "
595 "consider restarting CFEngine",
596 dbpath);
597 }
598 }
599
600 /* There seems to be a race condition causing mdb_txn_begin() return
601 * EINVAL. We do a couple retries before giving up. */
602 rc = mdb_txn_begin(db->env, NULL, MDB_RDONLY, &txn);
603 int attempts = N_LMDB_EINVAL_RETRIES;
604 while ((rc != 0) && (attempts-- > 0))
605 {
606 CheckLMDBCorrupted(rc, db->env);
607 if (rc != EINVAL)
608 {
609 Log(LOG_LEVEL_ERR, "Could not open database txn %s: %s",
610 dbpath, mdb_strerror(rc));
611 goto err;
612 }
613 #if HAVE_DECL_SCHED_YIELD && defined(HAVE_SCHED_YIELD)
614 // Not required for this to work, but makes it less likely that the race
615 // condition will persist.
616 sched_yield();
617 #endif
618 rc = mdb_txn_begin(db->env, NULL, MDB_RDONLY, &txn);
619 }
620 if (rc != 0)
621 {
622 Log(LOG_LEVEL_ERR, "Could not open database txn %s: %s",
623 dbpath, mdb_strerror(rc));
624 goto err;
625 }
626 rc = mdb_open(txn, NULL, 0, &db->dbi);
627 CheckLMDBCorrupted(rc, db->env);
628 if (rc)
629 {
630 Log(LOG_LEVEL_ERR, "Could not open database dbi %s: %s",
631 dbpath, mdb_strerror(rc));
632 mdb_txn_abort(txn);
633 goto err;
634 }
635 rc = mdb_txn_commit(txn);
636 CheckLMDBCorrupted(rc, db->env);
637 if (rc)
638 {
639 Log(LOG_LEVEL_ERR, "Could not commit database dbi %s: %s",
640 dbpath, mdb_strerror(rc));
641 goto err;
642 }
643
644 return db;
645
646 err:
647 if (db->env)
648 {
649 mdb_env_close(db->env);
650 }
651 pthread_key_delete(db->txn_key);
652 free(db);
653 if (rc == MDB_INVALID)
654 {
655 return DB_PRIV_DATABASE_BROKEN;
656 }
657 return NULL;
658 }
659
DBPrivCloseDB(DBPriv * db)660 void DBPrivCloseDB(DBPriv *db)
661 {
662 assert(db != NULL);
663
664 /* Abort LMDB transaction of the current thread. There should only be some
665 * transaction open when the signal handler or atexit() hook is called. */
666 AbortTransaction(db);
667
668 char *db_path = mdb_env_get_userctx(db->env);
669 if (db_path)
670 {
671 free(db_path);
672 }
673 if (db->env)
674 {
675 mdb_env_close(db->env);
676 }
677
678 pthread_key_delete(db->txn_key);
679 free(db);
680 }
681
682 #define EMPTY_DB 0
683
DBPrivClean(DBPriv * db)684 bool DBPrivClean(DBPriv *db)
685 {
686 assert(db != NULL);
687
688 DBTxn *txn;
689 const int rc = GetWriteTransaction(db, &txn);
690
691 if (rc != MDB_SUCCESS)
692 {
693 Log(LOG_LEVEL_ERR, "Unable to get write transaction for '%s': %s",
694 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
695 return false;
696 }
697 assert(txn != NULL);
698 assert(!txn->cursor_open);
699
700 return (mdb_drop(txn->txn, db->dbi, EMPTY_DB) != 0);
701 }
702
DBPrivCommit(DBPriv * db)703 void DBPrivCommit(DBPriv *db)
704 {
705 assert(db != NULL);
706
707 DBTxn *db_txn = pthread_getspecific(db->txn_key);
708 if (db_txn != NULL && db_txn->txn != NULL)
709 {
710 assert(!db_txn->cursor_open);
711 const int rc = mdb_txn_commit(db_txn->txn);
712 CheckLMDBCorrupted(rc, db->env);
713 if (rc != MDB_SUCCESS)
714 {
715 Log(LOG_LEVEL_ERR, "Could not commit database transaction to '%s': %s",
716 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
717 }
718 }
719 pthread_setspecific(db->txn_key, NULL);
720 free(db_txn);
721 }
722
DBPrivHasKey(DBPriv * db,const void * key,int key_size)723 bool DBPrivHasKey(DBPriv *db, const void *key, int key_size)
724 {
725 assert(db != NULL);
726
727 MDB_val mkey, data;
728 DBTxn *txn;
729 // FIXME: distinguish between "entry not found" and "error occurred"
730
731 int rc = GetReadTransaction(db, &txn);
732 if (rc == MDB_SUCCESS)
733 {
734 assert(!txn->cursor_open);
735 mkey.mv_data = (void *) key;
736 mkey.mv_size = key_size;
737 rc = mdb_get(txn->txn, db->dbi, &mkey, &data);
738 CheckLMDBCorrupted(rc, db->env);
739 if (rc != 0 && rc != MDB_NOTFOUND)
740 {
741 Log(LOG_LEVEL_ERR, "Could not read database entry from '%s': %s",
742 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
743 AbortTransaction(db);
744 }
745 }
746
747 return (rc == MDB_SUCCESS);
748 }
749
DBPrivGetValueSize(DBPriv * const db,const void * const key,const int key_size)750 int DBPrivGetValueSize(DBPriv *const db, const void *const key, const int key_size)
751 {
752 assert(db != NULL);
753 assert(key_size >= 0);
754
755 MDB_val mkey, data;
756 DBTxn *txn;
757
758 data.mv_size = 0;
759
760 int rc = GetReadTransaction(db, &txn);
761 if (rc == MDB_SUCCESS)
762 {
763 assert(!txn->cursor_open);
764 mkey.mv_data = (void *) key;
765 mkey.mv_size = key_size;
766 rc = mdb_get(txn->txn, db->dbi, &mkey, &data);
767 CheckLMDBCorrupted(rc, db->env);
768 if (rc && rc != MDB_NOTFOUND)
769 {
770 Log(LOG_LEVEL_ERR, "Could not read database entry from '%s': %s",
771 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
772 AbortTransaction(db);
773 }
774 }
775
776 size_t ret = data.mv_size;
777 assert(ret <= INT_MAX);
778 return ret;
779 }
780
DBPrivRead(DBPriv * const db,const void * const key,const int key_size,void * const dest,size_t dest_size)781 bool DBPrivRead(
782 DBPriv *const db,
783 const void *const key,
784 const int key_size,
785 void *const dest,
786 size_t dest_size)
787 {
788 assert(db != NULL);
789 assert(key_size >= 0);
790
791 DBTxn *txn;
792 bool ret = false;
793
794 int rc = GetReadTransaction(db, &txn);
795 if (rc == MDB_SUCCESS)
796 {
797 MDB_val mkey, data;
798 assert(txn != NULL);
799 assert(!txn->cursor_open);
800 mkey.mv_data = (void *) key;
801 mkey.mv_size = key_size;
802 rc = mdb_get(txn->txn, db->dbi, &mkey, &data);
803 CheckLMDBCorrupted(rc, db->env);
804 if (rc == MDB_SUCCESS)
805 {
806 if (dest_size > data.mv_size)
807 {
808 dest_size = data.mv_size;
809 }
810 memcpy(dest, data.mv_data, dest_size);
811 ret = true;
812 }
813 else if (rc != MDB_NOTFOUND)
814 {
815 Log(LOG_LEVEL_ERR, "Could not read database entry from '%s': %s",
816 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
817 AbortTransaction(db);
818 }
819 }
820 return ret;
821 }
822
DBPrivWrite(DBPriv * const db,const void * const key,const int key_size,const void * const value,const int value_size)823 bool DBPrivWrite(
824 DBPriv *const db,
825 const void *const key,
826 const int key_size,
827 const void *const value,
828 const int value_size)
829 {
830 assert(db != NULL);
831 assert(key_size >= 0);
832
833 DBTxn *txn;
834 int rc = GetWriteTransaction(db, &txn);
835 if (rc == MDB_SUCCESS)
836 {
837 MDB_val mkey, data;
838 assert(txn != NULL);
839 assert(!txn->cursor_open);
840 mkey.mv_data = (void *) key;
841 mkey.mv_size = key_size;
842 data.mv_data = (void *)value;
843 data.mv_size = value_size;
844 rc = mdb_put(txn->txn, db->dbi, &mkey, &data, 0);
845 CheckLMDBCorrupted(rc, db->env);
846 if (rc != MDB_SUCCESS)
847 {
848 Log(LOG_LEVEL_ERR, "Could not write database entry to '%s': %s",
849 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
850 AbortTransaction(db);
851 }
852 }
853 return (rc == MDB_SUCCESS);
854 }
855
DBPrivOverwrite(DBPriv * db,const char * key,int key_size,const void * value,size_t value_size,OverwriteCondition Condition,void * data)856 bool DBPrivOverwrite(DBPriv *db, const char *key, int key_size, const void *value, size_t value_size,
857 OverwriteCondition Condition, void *data)
858 {
859 assert(db != NULL);
860 assert(key_size >= 0);
861 DBTxn *txn;
862 int rc = GetWriteTransaction(db, &txn);
863
864 if (rc != MDB_SUCCESS)
865 {
866 return false;
867 }
868
869 assert(txn != NULL);
870 assert(!txn->cursor_open);
871
872 MDB_val mkey, orig_data;
873 mkey.mv_data = (void *) key;
874 mkey.mv_size = key_size;
875 rc = mdb_get(txn->txn, db->dbi, &mkey, &orig_data);
876 CheckLMDBCorrupted(rc, db->env);
877 if ((rc != MDB_SUCCESS) && (rc != MDB_NOTFOUND))
878 {
879 Log(LOG_LEVEL_ERR, "Could not read database entry from '%s': %s",
880 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
881 AbortTransaction(db);
882 return false;
883 }
884
885 if (Condition != NULL)
886 {
887 if (rc == MDB_SUCCESS)
888 {
889 assert(orig_data.mv_size > 0);
890
891 /* We have to copy the data because orig_data.mv_data is a pointer to
892 * the mmap()-ed area which can potentially have bad alignment causing
893 * a SIGBUS on some architectures. */
894 unsigned char cur_val[orig_data.mv_size];
895 memcpy(cur_val, orig_data.mv_data, orig_data.mv_size);
896 if (!Condition(cur_val, orig_data.mv_size, data))
897 {
898 AbortTransaction(db);
899 return false;
900 }
901 }
902 else
903 {
904 assert(rc == MDB_NOTFOUND);
905 if (!Condition(NULL, 0, data))
906 {
907 AbortTransaction(db);
908 return false;
909 }
910 }
911 }
912
913 MDB_val new_data;
914 new_data.mv_data = (void *)value;
915 new_data.mv_size = value_size;
916 rc = mdb_put(txn->txn, db->dbi, &mkey, &new_data, 0);
917 CheckLMDBCorrupted(rc, db->env);
918 if (rc != MDB_SUCCESS)
919 {
920 Log(LOG_LEVEL_ERR, "Could not write database entry to '%s': %s",
921 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
922 AbortTransaction(db);
923 return false;
924 }
925 DBPrivCommit(db);
926 return true;
927 }
928
DBPrivDelete(DBPriv * const db,const void * const key,const int key_size)929 bool DBPrivDelete(DBPriv *const db, const void *const key, const int key_size)
930 {
931 assert(key_size >= 0);
932 assert(db != NULL);
933
934 MDB_val mkey;
935 DBTxn *txn;
936 int rc = GetWriteTransaction(db, &txn);
937 if (rc == MDB_SUCCESS)
938 {
939 assert(!txn->cursor_open);
940 mkey.mv_data = (void *) key;
941 mkey.mv_size = key_size;
942 rc = mdb_del(txn->txn, db->dbi, &mkey, NULL);
943 CheckLMDBCorrupted(rc, db->env);
944 if (rc == MDB_NOTFOUND)
945 {
946 Log(LOG_LEVEL_DEBUG, "Entry not found in '%s': %s",
947 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
948 }
949 else if (rc != MDB_SUCCESS)
950 {
951 Log(LOG_LEVEL_ERR, "Could not delete from '%s': %s",
952 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
953 AbortTransaction(db);
954 }
955 }
956 return (rc == MDB_SUCCESS);
957 }
958
DBPrivOpenCursor(DBPriv * const db)959 DBCursorPriv *DBPrivOpenCursor(DBPriv *const db)
960 {
961 assert(db != NULL);
962
963 DBCursorPriv *cursor = NULL;
964 DBTxn *txn;
965 MDB_cursor *mc;
966
967 int rc = GetWriteTransaction(db, &txn);
968 if (rc == MDB_SUCCESS)
969 {
970 assert(!txn->cursor_open);
971 rc = mdb_cursor_open(txn->txn, db->dbi, &mc);
972 CheckLMDBCorrupted(rc, db->env);
973 if (rc == MDB_SUCCESS)
974 {
975 cursor = xcalloc(1, sizeof(DBCursorPriv));
976 cursor->db = db;
977 cursor->mc = mc;
978 txn->cursor_open = true;
979 }
980 else
981 {
982 Log(LOG_LEVEL_ERR, "Could not open cursor in '%s': %s",
983 (char *) mdb_env_get_userctx(db->env), mdb_strerror(rc));
984 AbortTransaction(db);
985 }
986 /* txn remains with cursor */
987 }
988
989 return cursor;
990 }
991
DBPrivAdvanceCursor(DBCursorPriv * const cursor,void ** const key,int * const key_size,void ** const value,int * const value_size)992 bool DBPrivAdvanceCursor(
993 DBCursorPriv *const cursor,
994 void **const key,
995 int *const key_size,
996 void **const value,
997 int *const value_size)
998 {
999 assert(cursor != NULL);
1000 assert(cursor->db != NULL);
1001
1002 MDB_val mkey, data;
1003 bool retval = false;
1004
1005 if (cursor->curkv != NULL)
1006 {
1007 free(cursor->curkv);
1008 cursor->curkv = NULL;
1009 }
1010
1011 int rc = mdb_cursor_get(cursor->mc, &mkey, &data, MDB_NEXT);
1012 CheckLMDBCorrupted(rc, cursor->db->env);
1013 if (rc == MDB_SUCCESS)
1014 {
1015 // Align second buffer to 64-bit boundary, to avoid alignment errors on
1016 // certain platforms.
1017 size_t keybuf_size = mkey.mv_size;
1018 if (keybuf_size & 0x7)
1019 {
1020 keybuf_size += 8 - (keybuf_size % 8);
1021 }
1022 cursor->curkv = xmalloc(keybuf_size + data.mv_size);
1023 memcpy(cursor->curkv, mkey.mv_data, mkey.mv_size);
1024 *key = cursor->curkv;
1025 *key_size = mkey.mv_size;
1026 *value_size = data.mv_size;
1027 memcpy((char *) cursor->curkv + keybuf_size, data.mv_data, data.mv_size);
1028 *value = ((char *) cursor->curkv + keybuf_size);
1029 retval = true;
1030 }
1031 else if (rc != MDB_NOTFOUND)
1032 {
1033 Log(LOG_LEVEL_ERR, "Could not advance cursor in '%s': %s",
1034 (char *) mdb_env_get_userctx(cursor->db->env), mdb_strerror(rc));
1035 }
1036 if (cursor->pending_delete)
1037 {
1038 int r2;
1039 /* Position on key to delete */
1040 r2 = mdb_cursor_get(cursor->mc, &cursor->delkey, NULL, MDB_SET);
1041 if (r2 == MDB_SUCCESS)
1042 {
1043 r2 = mdb_cursor_del(cursor->mc, 0);
1044 // TODO: Should the return value be checked?
1045 }
1046 /* Reposition the cursor if it was valid before */
1047 if (rc == MDB_SUCCESS)
1048 {
1049 mkey.mv_data = *key;
1050 rc = mdb_cursor_get(cursor->mc, &mkey, NULL, MDB_SET);
1051 CheckLMDBCorrupted(rc, cursor->db->env);
1052 // TODO: Should the return value be checked?
1053 }
1054 cursor->pending_delete = false;
1055 }
1056 return retval;
1057 }
1058
DBPrivDeleteCursorEntry(DBCursorPriv * const cursor)1059 bool DBPrivDeleteCursorEntry(DBCursorPriv *const cursor)
1060 {
1061 assert(cursor != NULL);
1062
1063 int rc = mdb_cursor_get(cursor->mc, &cursor->delkey, NULL, MDB_GET_CURRENT);
1064 CheckLMDBCorrupted(rc, cursor->db->env);
1065 if (rc == MDB_SUCCESS)
1066 {
1067 cursor->pending_delete = true;
1068 }
1069 return (rc == MDB_SUCCESS);
1070 }
1071
DBPrivWriteCursorEntry(DBCursorPriv * const cursor,const void * const value,const int value_size)1072 bool DBPrivWriteCursorEntry(
1073 DBCursorPriv *const cursor, const void *const value, const int value_size)
1074 {
1075 assert(cursor != NULL);
1076 assert(cursor->db != NULL);
1077
1078 MDB_val data;
1079 int rc;
1080
1081 cursor->pending_delete = false;
1082 data.mv_data = (void *) value;
1083 data.mv_size = value_size;
1084
1085 if (cursor->curkv)
1086 {
1087 MDB_val curkey;
1088 curkey.mv_data = cursor->curkv;
1089 curkey.mv_size = sizeof(cursor->curkv);
1090
1091 rc = mdb_cursor_put(cursor->mc, &curkey, &data, MDB_CURRENT);
1092 CheckLMDBCorrupted(rc, cursor->db->env);
1093 if (rc != MDB_SUCCESS)
1094 {
1095 Log(LOG_LEVEL_ERR, "Could not write cursor entry to '%s': %s",
1096 (char *) mdb_env_get_userctx(cursor->db->env), mdb_strerror(rc));
1097 }
1098 }
1099 else
1100 {
1101 Log(LOG_LEVEL_ERR, "Could not write cursor entry to '%s': cannot find current key",
1102 (char *) mdb_env_get_userctx(cursor->db->env));
1103 rc = MDB_INVALID;
1104 }
1105 return (rc == MDB_SUCCESS);
1106 }
1107
DBPrivCloseCursor(DBCursorPriv * const cursor)1108 void DBPrivCloseCursor(DBCursorPriv *const cursor)
1109 {
1110 assert(cursor != NULL);
1111 assert(cursor->db != NULL);
1112
1113 DBTxn *txn;
1114 const int rc = GetWriteTransaction(cursor->db, &txn);
1115 CF_ASSERT(rc == MDB_SUCCESS, "Could not get write transaction");
1116 CF_ASSERT(txn->cursor_open, "Transaction not open");
1117 txn->cursor_open = false;
1118
1119 if (cursor->curkv)
1120 {
1121 free(cursor->curkv);
1122 }
1123
1124 if (cursor->pending_delete)
1125 {
1126 mdb_cursor_del(cursor->mc, 0);
1127 }
1128
1129 mdb_cursor_close(cursor->mc);
1130 free(cursor);
1131 }
1132
DBPrivDiagnose(const char * const dbpath)1133 char *DBPrivDiagnose(const char *const dbpath)
1134 {
1135 return StringFormat("Unable to diagnose LMDB file (not implemented) for '%s'", dbpath);
1136 }
1137 #endif
1138