1 /* cyrusdb_twoskip.c - brand new twoskip implementation, not backwards anything
2 *
3 * Copyright (c) 1994-2008 Carnegie Mellon University. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in
14 * the documentation and/or other materials provided with the
15 * distribution.
16 *
17 * 3. The name "Carnegie Mellon University" must not be used to
18 * endorse or promote products derived from this software without
19 * prior written permission. For permission or any legal
20 * details, please contact
21 * Carnegie Mellon University
22 * Center for Technology Transfer and Enterprise Creation
23 * 4615 Forbes Avenue
24 * Suite 302
25 * Pittsburgh, PA 15213
26 * (412) 268-7393, fax: (412) 268-7395
27 * innovation@andrew.cmu.edu
28 *
29 * 4. Redistributions of any form whatsoever must retain the following
30 * acknowledgment:
31 * "This product includes software developed by Computing Services
32 * at Carnegie Mellon University (http://www.cmu.edu/computing/)."
33 *
34 * CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
35 * THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
36 * AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
37 * FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
38 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
39 * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
40 * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
41 */
42
43 #include <config.h>
44
45 #include <errno.h>
46 #include <limits.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <syslog.h>
50 #include <sys/types.h>
51 #ifdef HAVE_UNISTD_H
52 #include <unistd.h>
53 #endif
54
55 #include "assert.h"
56 #include "bsearch.h"
57 #include "byteorder64.h"
58 #include "cyrusdb.h"
59 #include "crc32.h"
60 #include "libcyr_cfg.h"
61 #include "mappedfile.h"
62 #include "util.h"
63 #include "xmalloc.h"
64
65 /*
66 * twoskip disk format.
67 *
68 * GOALS:
69 * a) 64 bit through
70 * b) Fast recovery after crashes
71 * c) integrity checks throughout
72 * d) simple format
73 *
74 * ACHIEVED BY:
75 * a)
76 * - 64 bit offsets for all values
77 * - smaller initial keylen and vallen, but they can
78 * can be extended up to 64 bits as well.
79 * - no timestamps stored in the file.
80 * - XXX - may behave strangely with large files on
81 * 32 bit architectures, particularly if size_t is
82 * not 64 bit.
83 *
84 * b)
85 * - "dirty flag" is always set in the header and
86 * fsynced BEFORE writing anything else.
87 * - a header field for "current size", after which
88 * all changes are considered suspect until commit.
89 * - two "lowest level" offsets, used in alternating
90 * order, so the highest value less than "current_size"
91 * is always the correct pointer - this means we
92 * never lose linkage, so never need to rewrite more
93 * than the affected records during a recovery.
94 * - all data is fsynced BEFORE rewriting the header to
95 * remove the dirty flag.
96 * - As long as the first 64 bytes of the file are
97 * guaranteed to write all together or not at all,
98 * we're crash-safe.
99 *
100 * c)
101 * - every byte in the file is covered by one of the
102 * crc32 values stored throughout.
103 * - header CRC is checked on every header read (open/lock)
104 * - record head CRCs are checked on every record read,
105 * including skiplist traverse.
106 * - record tail CRCs (key/value) are check on every exact
107 * key match result, during traverse for read or write.
108 *
109 * d)
110 * - there are no special commit, inorder, etc records.
111 * just add records and ghost "delete" records to give
112 * somewhere to point to on deletes. These are only
113 * at the lowest level, so don't have a significant
114 * seek impact.
115 * - modular code makes the logic much clearer.
116 */
117
118 /*
119 * FORMAT:
120 *
121 * HEADER: 64 bytes
122 * magic: 20 bytes: "4 bytes same as skiplist" "twoskip file\0\0\0\0"
123 * version: 4 bytes
124 * generation: 8 bytes
125 * num_records: 8 bytes
126 * repack_size: 8 bytes
127 * current_size: 8 bytes
128 * flags: 4 bytes
129 * crc32: 4 bytes
130 *
131 * RECORDS:
132 * type 1 byte
133 * level: 1 byte
134 * keylen: 2 bytes
135 * vallen: 4 bytes
136 * <optionally: 64 bit keylen if keylen == UINT16_MAX>
137 * <optionally: 64 bit vallen if vallen == UINT32_MAX>
138 * ptrs: 8 bytes * (level+1)
139 * crc32_head: 4 bytes
140 * crc32_tail: 4 bytes
141 * key: (keylen bytes)
142 * val: (vallen bytes)
143 * padding: enough zeros to round up to an 8 byte multiple
144 *
145 * defined types, in skiplist language are:
146 * '=' -> DUMMY
147 * '+' -> ADD/INORDER
148 * '-' -> DELETE (kind of)
149 * '$' -> COMMIT
150 * but note that delete records behave differently - they're
151 * part of the pointer hierarchy, so that back offsets will
152 * always point somewhere past the 'end' until commit.
153 *
154 * The DUMMY is always MAXLEVEL level, with zero keylen and vallen
155 * The DELETE is always zero level, with zero keylen and vallen
156 * crc32_head is calculated on all bytes before it in the record
157 * crc32_tail is calculated on all bytes after, INCLUDING padding
158 *
159 * The COMMIT is inserted at the end of each transaction, and its
160 * single pointer points back to the start of the transaction.
161 */
162
163 /* OPERATION:
164 *
165 * Finding a record works very much like skiplist, but we have
166 * a datastructure, 'struct skiploc', to help find it. There
167 * is one of these embedded directly in the 'struct db', and
168 * it's the only one we ever use.
169 *
170 * skiploc contains two complete sets of offsets - at every
171 * level the offset of the previous record, and the offset of
172 * the next record, in relation to the requested key. If the
173 * key is an exact match, it also contains a copy of the
174 * struct skiprecord. If not, it contains the struct
175 * skiprecord for the previous record at level zero.
176 *
177 * It also contains a 'struct buf' with a copy of the requested
178 * key, which allows for efficient relocation of the position in
179 * the file when nothing is changed.
180 *
181 * So nothing is really changed with finding, except the special
182 * "level zero" alternative pointer. We'll see that in action
183 * later.
184 *
185 * TRANSACTIONS:
186 * 1) before writing anything else, the header is updated with the
187 * DIRTY flag set, and then fdatasync is run.
188 * 2) after all changes, fdatasync is run again.
189 * 3) finally, the header is updated with a new current_size and
190 * the DIRTY flag clear, then fdatasync is run for a third time.
191 *
192 * ADDING A NEW RECORD:
193 * a new record is created with forward locations pointing to the
194 * next pointers in the skiploc. This is appended to the file.
195 * This works for either a create OR replace, since in both cases
196 * the nextlocs are correct. Level zero is set to zero on a new
197 * record.
198 *
199 * If it's not a replace, a "random" level will be chosen for the
200 * record. All update operations below apply only up to this level,
201 * pointers above are unaffected - and continue over the location
202 * of this change.
203 *
204 * For each backloc, the record at that offset is read, and the
205 * forward pointers at each level are replaced with the offset
206 * of the new record. NOTE: see special level zero logic below.
207 *
208 * Again, if this was a replace, the backlocs don't point to the
209 * current record, so it just silently disappears from the lists.
210 *
211 * DELETING A RECORD:
212 * The logic is almost identical to adding, but a delete record
213 * always has a level of zero, with only a single pointer forward
214 * to the next record.
215 *
216 * Because of this, the updates are done up to the level of the
217 * old record instead.
218 *
219 * THE SPECIAL "level zero":
220 * To allow a "fast fsck" after an aborted transaction, rather
221 * than having only a single level 1 pointer, we have two. The
222 * following algorithm is used to select which one to change.
223 *
224 * The definition of "current_size" is the size of the database
225 * at last commit, it does not get updated during a transaction.
226 *
227 * So: when updating level 1 - if either level 1 or level 0 has
228 * a value >= current_size, then that value gets updated again.
229 * otherwise, the lowest value gets replaced with the new value.
230 *
231 * when reading, the highest value is used - except during
232 * recovery when it's the highest value less than current_size,
233 * since any "future" offsets are bogus.
234 *
235 * This means that there is always at least one offset which
236 * points to the "next" record as if the current transaction
237 * had never occured - allowing recovery to find all alive
238 * records without scanning and updating the rest of the file.
239 * This guarantee exists regardless of any ordering of writes
240 * within the transaction, any page could be inconsistent and
241 * the result is still a clean recovery.
242 *
243 * CHECKPOINT:
244 * Over time, a twoskip database accumulates cruft - replaced
245 * records and delete records. Records out of order, slowing
246 * down sequential access. When the size at last repack
247 * is sufficiently smaller than the current size (see the
248 * TUNING constants below) then the file is checkpointed.
249 * A checkpoint is achieved by creating a new file, and
250 * copying all the current records, in order, into it, then
251 * renaming the new file over the old. The "generation"
252 * counter in the header is incremented to tell other users
253 * that offsets into the file are no longer valid. This is
254 * more reliable than just using the inode, because inodes
255 * can be reused.
256 *
257 * LOCATION OPTIMISATION:
258 * If the generation is unchanged AND the size of the file
259 * is unchanged, then all offsets stored in the skiploc are
260 * still valid. This is used to optimise finding the current
261 * key, advancing to the "next" key, and also to optimise
262 * regular fetches that happen to hit either the current key,
263 * the gap immediately after, or the next key. All other
264 * locations cause a full relocate.
265 */
266
267
268 /********** TUNING *************/
269
270 /* don't bother rewriting if the database has less than this much data */
271 #define MINREWRITE 16834
272 /* number of skiplist levels - 31 gives us binary search to 2^32 records.
273 * limited to 255 by file format, but skiplist had 20, and that was enough
274 * for most real uses. 31 is heaps. */
275 #define MAXLEVEL 31
276 /* should be 0.5 for binary search semantics */
277 #define PROB 0.5
278
279 /* release lock in foreach at least every N records */
280 #define FOREACH_LOCK_RELEASE 256
281
282 /* format specifics */
283 #undef VERSION /* defined in config.h */
284 #define VERSION 1
285
286 /* type aliases */
287 #define LLU long long unsigned int
288 #define LU long unsigned int
289
290 /* record types */
291 #define DUMMY '='
292 #define RECORD '+'
293 #define DELETE '-'
294 #define COMMIT '$'
295
296 /********** DATA STRUCTURES *************/
297
298 /* A single "record" in the twoskip file. This could be a
299 * DUMMY, a KEYRECORD, a VALRECORD or even a DELETE - they
300 * all read and write with the same functions */
301 struct skiprecord {
302 /* location on disk (not part of the on-disk format as such) */
303 size_t offset;
304 size_t len;
305
306 /* what are our header fields */
307 uint8_t type;
308 uint8_t level;
309 size_t keylen;
310 size_t vallen;
311
312 /* where to do we go from here? */
313 size_t nextloc[MAXLEVEL+1];
314
315 /* what do our integrity checks say? */
316 uint32_t crc32_head;
317 uint32_t crc32_tail;
318
319 /* our key and value */
320 size_t keyoffset;
321 size_t valoffset;
322 };
323
324 /* a location in the twoskip file. We always have:
325 * record: if "is_exactmatch" this points to the record
326 * with the matching key, otherwise it points to
327 * the 'compar' order previous record.
328 * backloc: the records that point TO this location
329 * at each level. If is_exactmatch, they
330 * point to the record, otherwise they are
331 * the record.
332 * forwardloc: the records pointed to by the record
333 * at 'backloc' at the same level. Kept
334 * here for efficiency
335 * keybuf: a copy of the requested key - we always keep
336 * this so we can re-seek after the file has been
337 * checkpointed under us (say a read-only foreach)
338 *
339 * generation and end can be used to see if anything in
340 * the file may have changed and needs re-reading.
341 */
342 struct skiploc {
343 /* requested, may not match actual record */
344 struct buf keybuf;
345 int is_exactmatch;
346
347 /* current or next record */
348 struct skiprecord record;
349
350 /* we need both sets of offsets to cheaply insert */
351 size_t backloc[MAXLEVEL+1];
352 size_t forwardloc[MAXLEVEL+1];
353
354 /* need a generation so we know if the location is still valid */
355 uint64_t generation;
356 size_t end;
357 };
358
359 enum {
360 UNLOCKED = 0,
361 READLOCKED = 1,
362 WRITELOCKED = 2,
363 };
364
365 #define DIRTY (1<<0)
366
367 struct txn {
368 /* logstart is where we start changes from on commit, where we truncate
369 to on abort */
370 int num;
371 };
372
373 struct db_header {
374 /* header info */
375 uint32_t version;
376 uint32_t flags;
377 uint64_t generation;
378 uint64_t num_records;
379 size_t repack_size;
380 size_t current_size;
381 };
382
383 struct dbengine {
384 /* file data */
385 struct mappedfile *mf;
386
387 struct db_header header;
388 struct skiploc loc;
389
390 /* tracking info */
391 int is_open;
392 size_t end;
393 int txn_num;
394 struct txn *current_txn;
395
396 /* comparator function to use for sorting */
397 int open_flags;
398 int (*compar) (const char *s1, int l1, const char *s2, int l2);
399 };
400
401 struct db_list {
402 struct dbengine *db;
403 struct db_list *next;
404 int refcount;
405 };
406
407 #define HEADER_MAGIC ("\241\002\213\015twoskip file\0\0\0\0")
408 #define HEADER_MAGIC_SIZE (20)
409
410 /* offsets of header files */
411 enum {
412 OFFSET_HEADER = 0,
413 OFFSET_VERSION = 20,
414 OFFSET_GENERATION = 24,
415 OFFSET_NUM_RECORDS = 32,
416 OFFSET_REPACK_SIZE = 40,
417 OFFSET_CURRENT_SIZE = 48,
418 OFFSET_FLAGS = 56,
419 OFFSET_CRC32 = 60,
420 };
421
422 #define HEADER_SIZE 64
423 #define DUMMY_OFFSET HEADER_SIZE
424 #define MAXRECORDHEAD ((MAXLEVEL + 5)*8)
425
426 /* mount a scratch monkey */
427 static union skipwritebuf {
428 uint64_t align;
429 char s[MAXRECORDHEAD];
430 } scratchspace;
431
432 static struct db_list *open_twoskip = NULL;
433
434 static int mycommit(struct dbengine *db, struct txn *tid);
435 static int myabort(struct dbengine *db, struct txn *tid);
436 static int mycheckpoint(struct dbengine *db);
437 static int myconsistent(struct dbengine *db, struct txn *tid);
438 static int recovery(struct dbengine *db);
439 static int recovery1(struct dbengine *db, int *count);
440 static int recovery2(struct dbengine *db, int *count);
441
442 /************** HELPER FUNCTIONS ****************/
443
444 #define BASE(db) mappedfile_base((db)->mf)
445 #define KEY(db, rec) (BASE(db) + (rec)->keyoffset)
446 #define VAL(db, rec) (BASE(db) + (rec)->valoffset)
447 #define SIZE(db) mappedfile_size((db)->mf)
448 #define FNAME(db) mappedfile_fname((db)->mf)
449
450 /* calculate padding size */
451 #ifdef __DragonFly__
452 #undef roundup
453 #endif
roundup(size_t record_size,int howfar)454 static size_t roundup(size_t record_size, int howfar)
455 {
456 if (record_size % howfar)
457 record_size += howfar - (record_size % howfar);
458 return record_size;
459 }
460
461 /* choose a level appropriately randomly */
randlvl(uint8_t lvl,uint8_t maxlvl)462 static uint8_t randlvl(uint8_t lvl, uint8_t maxlvl)
463 {
464 while (((float) rand() / (float) (RAND_MAX)) < PROB) {
465 lvl++;
466 if (lvl == maxlvl) break;
467 }
468 return lvl;
469 }
470
471 /************** HEADER ****************/
472
473 /* given an open, mapped db, read in the header information */
read_header(struct dbengine * db)474 static int read_header(struct dbengine *db)
475 {
476 uint32_t crc;
477
478 assert(db && db->mf && db->is_open);
479
480 if (SIZE(db) < HEADER_SIZE) {
481 syslog(LOG_ERR,
482 "twoskip: file not large enough for header: %s", FNAME(db));
483 return CYRUSDB_IOERROR;
484 }
485
486 if (memcmp(BASE(db), HEADER_MAGIC, HEADER_MAGIC_SIZE)) {
487 syslog(LOG_ERR, "twoskip: invalid magic header: %s", FNAME(db));
488 return CYRUSDB_IOERROR;
489 }
490
491 db->header.version
492 = ntohl(*((uint32_t *)(BASE(db) + OFFSET_VERSION)));
493
494 if (db->header.version > VERSION) {
495 syslog(LOG_ERR, "twoskip: version mismatch: %s has version %d",
496 FNAME(db), db->header.version);
497 return CYRUSDB_IOERROR;
498 }
499
500 db->header.generation
501 = ntohll(*((uint64_t *)(BASE(db) + OFFSET_GENERATION)));
502
503 db->header.num_records
504 = ntohll(*((uint64_t *)(BASE(db) + OFFSET_NUM_RECORDS)));
505
506 db->header.repack_size
507 = ntohll(*((uint64_t *)(BASE(db) + OFFSET_REPACK_SIZE)));
508
509 db->header.current_size
510 = ntohll(*((uint64_t *)(BASE(db) + OFFSET_CURRENT_SIZE)));
511
512 db->header.flags
513 = ntohl(*((uint32_t *)(BASE(db) + OFFSET_FLAGS)));
514
515 crc = ntohl(*((uint32_t *)(BASE(db) + OFFSET_CRC32)));
516
517 if (crc32_map(BASE(db), OFFSET_CRC32) != crc) {
518 syslog(LOG_ERR, "DBERROR: %s: twoskip header CRC failure",
519 FNAME(db));
520 return CYRUSDB_IOERROR;
521 }
522
523 db->end = db->header.current_size;
524
525 return 0;
526 }
527
528 /* given an open, mapped, locked db, write the header information */
write_header(struct dbengine * db)529 static int write_header(struct dbengine *db)
530 {
531 char *buf = scratchspace.s;
532 int n;
533
534 /* format one buffer */
535 memcpy(buf, HEADER_MAGIC, HEADER_MAGIC_SIZE);
536 *((uint32_t *)(buf + OFFSET_VERSION)) = htonl(db->header.version);
537 *((uint64_t *)(buf + OFFSET_GENERATION)) = htonll(db->header.generation);
538 *((uint64_t *)(buf + OFFSET_NUM_RECORDS)) = htonll(db->header.num_records);
539 *((uint64_t *)(buf + OFFSET_REPACK_SIZE)) = htonll(db->header.repack_size);
540 *((uint64_t *)(buf + OFFSET_CURRENT_SIZE)) = htonll(db->header.current_size);
541 *((uint32_t *)(buf + OFFSET_FLAGS)) = htonl(db->header.flags);
542 *((uint32_t *)(buf + OFFSET_CRC32)) = htonl(crc32_map(buf, OFFSET_CRC32));
543
544 /* write it out */
545 n = mappedfile_pwrite(db->mf, buf, HEADER_SIZE, 0);
546 if (n < 0) return CYRUSDB_IOERROR;
547
548 return 0;
549 }
550
551 /* simple wrapper to write with an fsync */
commit_header(struct dbengine * db)552 static int commit_header(struct dbengine *db)
553 {
554 int r = write_header(db);
555 if (!r) r = mappedfile_commit(db->mf);
556 return r;
557 }
558
559 /******************** RECORD *********************/
560
check_tailcrc(struct dbengine * db,struct skiprecord * record)561 static int check_tailcrc(struct dbengine *db, struct skiprecord *record)
562 {
563 uint32_t crc;
564
565 crc = crc32_map(BASE(db) + record->keyoffset,
566 roundup(record->keylen + record->vallen, 8));
567 if (crc != record->crc32_tail) {
568 syslog(LOG_ERR, "DBERROR: invalid tail crc %s at %llX",
569 FNAME(db), (LLU)record->offset);
570 return CYRUSDB_IOERROR;
571 }
572
573 return 0;
574 }
575
576 /* read a single skiprecord at the given offset */
read_onerecord(struct dbengine * db,size_t offset,struct skiprecord * record)577 static int read_onerecord(struct dbengine *db, size_t offset,
578 struct skiprecord *record)
579 {
580 const char *base;
581 int i;
582
583 memset(record, 0, sizeof(struct skiprecord));
584
585 if (!offset) return 0;
586
587 record->offset = offset;
588 record->len = 24; /* absolute minimum */
589
590 /* need space for at least the header plus some details */
591 if (record->offset + record->len > SIZE(db))
592 goto badsize;
593
594 base = BASE(db) + offset;
595
596 /* read in the record header */
597 record->type = base[0];
598 record->level = base[1];
599 record->keylen = ntohs(*((uint16_t *)(base + 2)));
600 record->vallen = ntohl(*((uint32_t *)(base + 4)));
601 offset += 8;
602
603 /* make sure we fit */
604 if (record->level > MAXLEVEL) {
605 syslog(LOG_ERR, "DBERROR: twoskip invalid level %d for %s at %08llX",
606 record->level, FNAME(db), (LLU)offset);
607 return CYRUSDB_IOERROR;
608 }
609
610 /* long key */
611 if (record->keylen == UINT16_MAX) {
612 base = BASE(db) + offset;
613 record->keylen = ntohll(*((uint64_t *)base));
614 offset += 8;
615 }
616
617 /* long value */
618 if (record->vallen == UINT32_MAX) {
619 base = BASE(db) + offset;
620 record->vallen = ntohll(*((uint64_t *)base));
621 offset += 8;
622 }
623
624 /* we know the length now */
625 record->len = (offset - record->offset) /* header including lengths */
626 + 8 * (1 + record->level) /* ptrs */
627 + 8 /* crc32s */
628 + roundup(record->keylen + record->vallen, 8); /* keyval */
629
630 if (record->offset + record->len > SIZE(db))
631 goto badsize;
632
633 for (i = 0; i <= record->level; i++) {
634 base = BASE(db) + offset;
635 record->nextloc[i] = ntohll(*((uint64_t *)base));
636 offset += 8;
637 }
638
639 base = BASE(db) + offset;
640 record->crc32_head = ntohl(*((uint32_t *)base));
641 if (crc32_map(BASE(db) + record->offset, (offset - record->offset))
642 != record->crc32_head) {
643 syslog(LOG_ERR, "DBERROR: twoskip checksum head error for %s at %08llX",
644 FNAME(db), (LLU)offset);
645 return CYRUSDB_IOERROR;
646 }
647
648 record->crc32_tail = ntohl(*((uint32_t *)(base+4)));
649
650 record->keyoffset = offset + 8;
651 record->valoffset = record->keyoffset + record->keylen;
652
653 return 0;
654
655 badsize:
656 syslog(LOG_ERR, "twoskip: attempt to read past end of file %s: %08llX > %08llX",
657 FNAME(db), (LLU)record->offset + record->len, (LLU)SIZE(db));
658 return CYRUSDB_IOERROR;
659 }
660
read_skipdelete(struct dbengine * db,size_t offset,struct skiprecord * record)661 static int read_skipdelete(struct dbengine *db, size_t offset,
662 struct skiprecord *record)
663 {
664 int r;
665
666 r = read_onerecord(db, offset, record);
667 if (r) return r;
668
669 if (record->type == DELETE)
670 r = read_onerecord(db, record->nextloc[0], record);
671
672 return r;
673 }
674
675 /* prepare the header part of the record (everything except the key, value
676 * and padding). Used for both writes and rewrites. */
prepare_record(struct skiprecord * record,char * buf,size_t * sizep)677 static void prepare_record(struct skiprecord *record, char *buf, size_t *sizep)
678 {
679 int len = 8;
680 int i;
681
682 assert(record->level <= MAXLEVEL);
683
684 buf[0] = record->type;
685 buf[1] = record->level;
686 if (record->keylen < UINT16_MAX) {
687 *((uint16_t *)(buf+2)) = htons(record->keylen);
688 }
689 else {
690 *((uint16_t *)(buf+2)) = htons(UINT16_MAX);
691 *((uint64_t *)(buf+len)) = htonll(record->keylen);
692 len += 8;
693 }
694
695 if (record->vallen < UINT32_MAX) {
696 *((uint32_t *)(buf+4)) = htonl(record->vallen);
697 }
698 else {
699 *((uint32_t *)(buf+4)) = htonl(UINT32_MAX);
700 *((uint64_t *)(buf+len)) = htonll(record->vallen);
701 len += 8;
702 }
703
704 /* got pointers? */
705 for (i = 0; i <= record->level; i++) {
706 *((uint64_t *)(buf+len)) = htonll(record->nextloc[i]);
707 len += 8;
708 }
709
710 /* NOTE: crc32_tail does not change */
711 record->crc32_head = crc32_map(buf, len);
712 *((uint32_t *)(buf+len)) = htonl(record->crc32_head);
713 *((uint32_t *)(buf+len+4)) = htonl(record->crc32_tail);
714 len += 8;
715
716 *sizep = len;
717 }
718
719 /* only changing the record head, so only rewrite that much */
rewrite_record(struct dbengine * db,struct skiprecord * record)720 static int rewrite_record(struct dbengine *db, struct skiprecord *record)
721 {
722 char *buf = scratchspace.s;
723 size_t len;
724 int n;
725
726 /* we must already be in a transaction before updating records */
727 assert(db->header.flags & DIRTY);
728 assert(record->offset);
729
730 prepare_record(record, buf, &len);
731
732 n = mappedfile_pwrite(db->mf, buf, len, record->offset);
733 if (n < 0) return CYRUSDB_IOERROR;
734
735 return 0;
736 }
737
738 /* you can only write records at the end */
write_record(struct dbengine * db,struct skiprecord * record,const char * key,const char * val)739 static int write_record(struct dbengine *db, struct skiprecord *record,
740 const char *key, const char *val)
741 {
742 char zeros[8] = {0, 0, 0, 0, 0, 0, 0, 0};
743 uint64_t len;
744 size_t iolen = 0;
745 struct iovec io[4];
746 int n;
747
748 assert(!record->offset);
749
750 /* we'll put the HEAD on later */
751 io[0].iov_base = scratchspace.s;
752 io[0].iov_len = 0;
753
754 io[1].iov_base = (char *)key;
755 io[1].iov_len = record->keylen;
756
757 io[2].iov_base = (char *)val;
758 io[2].iov_len = record->vallen;
759
760 /* pad to 8 bytes */
761 len = record->vallen + record->keylen;
762 io[3].iov_base = zeros;
763 io[3].iov_len = roundup(len, 8) - len;
764
765 /* calculate the CRC32 of the tail first */
766 record->crc32_tail = crc32_iovec(io+1, 3);
767
768 /* prepare the record once we know the crc32 of the tail */
769 prepare_record(record, scratchspace.s, &iolen);
770 io[0].iov_base = scratchspace.s;
771 io[0].iov_len = iolen;
772
773 /* write to the mapped file, getting the offset updated */
774 n = mappedfile_pwritev(db->mf, io, 4, db->end);
775 if (n < 0) return CYRUSDB_IOERROR;
776
777 /* locate the record */
778 record->offset = db->end;
779 record->keyoffset = db->end + io[0].iov_len;
780 record->valoffset = record->keyoffset + record->keylen;
781 record->len = n;
782
783 /* and advance the known file size */
784 db->end += n;
785
786 return 0;
787 }
788
789 /* helper to append a record, starting the transaction by dirtying the
790 * header first if required */
append_record(struct dbengine * db,struct skiprecord * record,const char * key,const char * val)791 static int append_record(struct dbengine *db, struct skiprecord *record,
792 const char *key, const char *val)
793 {
794 int r;
795
796 assert(db->current_txn);
797
798 /* dirty the header if not already dirty */
799 if (!(db->header.flags & DIRTY)) {
800 db->header.flags |= DIRTY;
801 r = commit_header(db);
802 if (r) return r;
803 }
804
805 return write_record(db, record, key, val);
806 }
807
808 /************************** LOCATION MANAGEMENT ***************************/
809
810 /* find the next record at a given level, encapsulating the
811 * level 0 magic */
_getloc(struct dbengine * db,struct skiprecord * record,uint8_t level)812 static size_t _getloc(struct dbengine *db, struct skiprecord *record,
813 uint8_t level)
814 {
815 if (level)
816 return record->nextloc[level + 1];
817
818 /* if one is past, must be the other */
819 if (record->nextloc[0] >= db->end)
820 return record->nextloc[1];
821 else if (record->nextloc[1] >= db->end)
822 return record->nextloc[0];
823
824 /* highest remaining */
825 else if (record->nextloc[0] > record->nextloc[1])
826 return record->nextloc[0];
827 else
828 return record->nextloc[1];
829 }
830
831 /* set the next record at a given level, encapsulating the
832 * level 0 magic */
_setloc(struct dbengine * db,struct skiprecord * record,uint8_t level,size_t offset)833 static void _setloc(struct dbengine *db, struct skiprecord *record,
834 uint8_t level, size_t offset)
835 {
836 if (level) {
837 record->nextloc[level+1] = offset;
838 return;
839 }
840
841 /* level zero is special */
842 /* already this transaction, update this one */
843 if (record->nextloc[0] >= db->header.current_size)
844 record->nextloc[0] = offset;
845 else if (record->nextloc[1] >= db->header.current_size)
846 record->nextloc[1] = offset;
847 /* otherwise, update older one */
848 else if (record->nextloc[1] > record->nextloc[0])
849 record->nextloc[0] = offset;
850 else
851 record->nextloc[1] = offset;
852 }
853
854 /* finds a record, either an exact match or the record
855 * immediately before */
relocate(struct dbengine * db)856 static int relocate(struct dbengine *db)
857 {
858 struct skiploc *loc = &db->loc;
859 struct skiprecord newrecord;
860 size_t offset;
861 size_t oldoffset = 0;
862 uint8_t level;
863 uint8_t i;
864 int cmp = -1; /* never found a thing! */
865 int r;
866
867 /* pointer validity */
868 loc->generation = db->header.generation;
869 loc->end = db->end;
870
871 /* start with the dummy */
872 r = read_onerecord(db, DUMMY_OFFSET, &loc->record);
873 loc->is_exactmatch = 0;
874
875 /* initialise pointers */
876 level = loc->record.level;
877 newrecord.offset = 0;
878 loc->backloc[level] = loc->record.offset;
879 loc->forwardloc[level] = 0;
880
881 /* special case start pointer for efficiency */
882 if (!loc->keybuf.len) {
883 for (i = 0; i < loc->record.level; i++) {
884 loc->backloc[i] = loc->record.offset;
885 loc->forwardloc[i] = _getloc(db, &loc->record, i);
886 }
887 return 0;
888 }
889
890 while (level) {
891 offset = _getloc(db, &loc->record, level-1);
892
893 loc->backloc[level-1] = loc->record.offset;
894 loc->forwardloc[level-1] = offset;
895
896 if (offset != oldoffset) {
897 oldoffset = offset;
898 r = read_skipdelete(db, offset, &newrecord);
899 if (r) return r;
900
901 if (newrecord.offset) {
902 assert(newrecord.level >= level);
903
904 cmp = db->compar(KEY(db, &newrecord), newrecord.keylen,
905 loc->keybuf.s, loc->keybuf.len);
906
907 /* not there? stay at this level */
908 if (cmp < 0) {
909 /* move the offset range along */
910 loc->record = newrecord;
911 continue;
912 }
913 }
914 }
915
916 level--;
917 }
918
919 if (cmp == 0) { /* we found it exactly */
920 loc->is_exactmatch = 1;
921 loc->record = newrecord;
922
923 for (i = 0; i < loc->record.level; i++)
924 loc->forwardloc[i] = _getloc(db, &loc->record, i);
925
926 /* make sure this record is complete */
927 r = check_tailcrc(db, &loc->record);
928
929 if (r) return r;
930 }
931
932 return 0;
933 }
934
935 /* helper function to find a location, either by using the existing
936 * location if it's close enough, or using the full relocate above */
find_loc(struct dbengine * db,const char * key,size_t keylen)937 static int find_loc(struct dbengine *db, const char *key, size_t keylen)
938 {
939 struct skiprecord newrecord;
940 struct skiploc *loc = &db->loc;
941 int cmp, i, r;
942
943 if (key != loc->keybuf.s)
944 buf_setmap(&loc->keybuf, key, keylen);
945 else if (keylen != loc->keybuf.len)
946 buf_truncate(&loc->keybuf, keylen);
947
948 /* can we special case advance? */
949 if (keylen && loc->end == db->end
950 && loc->generation == db->header.generation) {
951 cmp = db->compar(KEY(db, &loc->record), loc->record.keylen,
952 loc->keybuf.s, loc->keybuf.len);
953 /* same place, and was exact. Otherwise we're going back,
954 * and the reverse pointers are no longer valid... */
955 if (db->loc.is_exactmatch && cmp == 0) {
956 return 0;
957 }
958
959 /* we're looking after this record */
960 if (cmp < 0) {
961 for (i = 0; i < db->loc.record.level; i++)
962 loc->backloc[i] = db->loc.record.offset;
963
964 /* read the next record */
965 r = read_skipdelete(db, loc->forwardloc[0], &newrecord);
966 if (r) return r;
967
968 /* nothing afterwards? */
969 if (!newrecord.offset) {
970 db->loc.is_exactmatch = 0;
971 return 0;
972 }
973
974 /* now where is THIS record? */
975 cmp = db->compar(KEY(db, &newrecord), newrecord.keylen,
976 loc->keybuf.s, loc->keybuf.len);
977
978 /* exact match? */
979 if (cmp == 0) {
980 db->loc.is_exactmatch = 1;
981 db->loc.record = newrecord;
982
983 for (i = 0; i < newrecord.level; i++)
984 loc->forwardloc[i] = _getloc(db, &newrecord, i);
985
986 /* make sure this record is complete */
987 r = check_tailcrc(db, &loc->record);
988 if (r) return r;
989
990 return 0;
991 }
992
993 /* or in the gap */
994 if (cmp > 0) {
995 db->loc.is_exactmatch = 0;
996 return 0;
997 }
998 }
999 /* if we fell out here, it's not a "local" record, just search */
1000 }
1001
1002 return relocate(db);
1003 }
1004
1005 /* helper function to advance to the "next" record. Used by foreach,
1006 * fetchnext, and internal functions */
advance_loc(struct dbengine * db)1007 static int advance_loc(struct dbengine *db)
1008 {
1009 struct skiploc *loc = &db->loc;
1010 uint8_t i;
1011 int r;
1012
1013 /* has another session made changes? Need to re-find the location */
1014 if (loc->end != db->end || loc->generation != db->header.generation) {
1015 r = relocate(db);
1016 if (r) return r;
1017 }
1018
1019 /* update back pointers */
1020 for (i = 0; i < loc->record.level; i++)
1021 loc->backloc[i] = loc->record.offset;
1022
1023 /* ADVANCE */
1024 r = read_skipdelete(db, loc->forwardloc[0], &loc->record);
1025 if (r) return r;
1026
1027 /* reached the end? */
1028 if (!loc->record.offset) {
1029 buf_reset(&loc->keybuf);
1030 return relocate(db);
1031 }
1032
1033 /* update forward pointers */
1034 for (i = 0; i < loc->record.level; i++)
1035 loc->forwardloc[i] = _getloc(db, &loc->record, i);
1036
1037 /* keep our location */
1038 buf_setmap(&loc->keybuf, KEY(db, &loc->record), loc->record.keylen);
1039 loc->is_exactmatch = 1;
1040
1041 /* make sure this record is complete */
1042 r = check_tailcrc(db, &loc->record);
1043 if (r) return r;
1044
1045 return 0;
1046 }
1047
1048 /* helper function to update all the back records efficiently
1049 * after appending a new record, either create or delete. The
1050 * caller must set forwardloc[] correctly for each level it has
1051 * changed */
stitch(struct dbengine * db,uint8_t maxlevel,size_t newoffset)1052 static int stitch(struct dbengine *db, uint8_t maxlevel, size_t newoffset)
1053 {
1054 struct skiploc *loc = &db->loc;
1055 struct skiprecord oldrecord;
1056 uint8_t i;
1057 int r;
1058
1059 oldrecord.level = 0;
1060 while (oldrecord.level < maxlevel) {
1061 uint8_t level = oldrecord.level;
1062
1063 r = read_onerecord(db, loc->backloc[level], &oldrecord);
1064 if (r) return r;
1065
1066 /* always getting higher */
1067 assert(oldrecord.level > level);
1068
1069 for (i = level; i < maxlevel; i++)
1070 _setloc(db, &oldrecord, i, loc->forwardloc[i]);
1071
1072 r = rewrite_record(db, &oldrecord);
1073 if (r) return r;
1074 }
1075
1076 /* re-read the "current record" */
1077 r = read_onerecord(db, newoffset, &loc->record);
1078 if (r) return r;
1079
1080 /* and update the forward locations */
1081 for (i = 0; i < loc->record.level; i++)
1082 loc->forwardloc[i] = _getloc(db, &loc->record, i);
1083
1084 return 0;
1085 }
1086
1087 /* overall "store" function - update the value in the current loc.
1088 All new values funnel through here. Check delete_here for
1089 deletion. Force is implied here, it gets checked higher. */
store_here(struct dbengine * db,const char * val,size_t vallen)1090 static int store_here(struct dbengine *db, const char *val, size_t vallen)
1091 {
1092 struct skiploc *loc = &db->loc;
1093 struct skiprecord newrecord;
1094 uint8_t level = 0;
1095 uint8_t i;
1096 int r;
1097
1098 if (loc->is_exactmatch) {
1099 level = loc->record.level;
1100 db->header.num_records--;
1101 }
1102
1103 /* build a new record */
1104 memset(&newrecord, 0, sizeof(struct skiprecord));
1105 newrecord.type = RECORD;
1106 newrecord.level = randlvl(1, MAXLEVEL);
1107 newrecord.keylen = loc->keybuf.len;
1108 newrecord.vallen = vallen;
1109 for (i = 0; i < newrecord.level; i++)
1110 newrecord.nextloc[i+1] = loc->forwardloc[i];
1111 if (newrecord.level > level)
1112 level = newrecord.level;
1113
1114 /* append to the file */
1115 r = append_record(db, &newrecord, loc->keybuf.s, val);
1116 if (r) return r;
1117
1118 /* get the nextlevel to point here for all this record's levels */
1119 for (i = 0; i < newrecord.level; i++)
1120 loc->forwardloc[i] = newrecord.offset;
1121
1122 /* update all backpointers */
1123 r = stitch(db, level, newrecord.offset);
1124 if (r) return r;
1125
1126 /* update header to know details of new record */
1127 db->header.num_records++;
1128
1129 loc->is_exactmatch = 1;
1130 loc->end = db->end;
1131
1132 return 0;
1133 }
1134
1135 /* delete a record */
delete_here(struct dbengine * db)1136 static int delete_here(struct dbengine *db)
1137 {
1138 struct skiploc *loc = &db->loc;
1139 struct skiprecord newrecord;
1140 struct skiprecord nextrecord;
1141 int r;
1142
1143 if (!loc->is_exactmatch)
1144 return CYRUSDB_NOTFOUND;
1145
1146 db->header.num_records--;
1147
1148 /* by the magic of zeroing, this even works for zero */
1149 r = read_skipdelete(db, loc->forwardloc[0], &nextrecord);
1150 if (r) return r;
1151
1152 /* build a delete record */
1153 memset(&newrecord, 0, sizeof(struct skiprecord));
1154 newrecord.type = DELETE;
1155 newrecord.nextloc[0] = nextrecord.offset;
1156
1157 /* append to the file */
1158 r = append_record(db, &newrecord, NULL, NULL);
1159 if (r) return r;
1160
1161 /* get the nextlevel to point here */
1162 loc->forwardloc[0] = newrecord.offset;
1163
1164 /* update all backpointers right up to the old record's
1165 * level, so that they all point past */
1166 r = stitch(db, loc->record.level, loc->backloc[0]);
1167 if (r) return r;
1168
1169 /* update location */
1170 loc->is_exactmatch = 0;
1171 loc->end = db->end;
1172
1173 return 0;
1174 }
1175
1176 /************ DATABASE STRUCT AND TRANSACTION MANAGEMENT **************/
1177
db_is_clean(struct dbengine * db)1178 static int db_is_clean(struct dbengine *db)
1179 {
1180 if (db->header.current_size != SIZE(db))
1181 return 0;
1182
1183 if (db->header.flags & DIRTY)
1184 return 0;
1185
1186 return 1;
1187 }
1188
unlock(struct dbengine * db)1189 static int unlock(struct dbengine *db)
1190 {
1191 return mappedfile_unlock(db->mf);
1192 }
1193
write_lock(struct dbengine * db)1194 static int write_lock(struct dbengine *db)
1195 {
1196 int r = mappedfile_writelock(db->mf);
1197 if (r) return r;
1198
1199 /* reread header */
1200 if (db->is_open) {
1201 r = read_header(db);
1202 if (r) return r;
1203
1204 /* recovery checks for consistency */
1205 r = recovery(db);
1206 if (r) return r;
1207 }
1208
1209 return 0;
1210 }
1211
read_lock(struct dbengine * db)1212 static int read_lock(struct dbengine *db)
1213 {
1214 int r = mappedfile_readlock(db->mf);
1215 if (r) return r;
1216
1217 /* reread header */
1218 if (db->is_open) {
1219 r = read_header(db);
1220 if (r) return r;
1221
1222 /* we just take and keep a write lock if inconsistent,
1223 * the write lock will fix it up */
1224 if (!db_is_clean(db)) {
1225 unlock(db);
1226 r = write_lock(db);
1227 if (r) return r;
1228 /* downgrade to a read lock again, since that what
1229 * was requested */
1230 unlock(db);
1231 return read_lock(db);
1232 }
1233 }
1234
1235 return 0;
1236 }
1237
newtxn(struct dbengine * db,struct txn ** tidptr)1238 static int newtxn(struct dbengine *db, struct txn **tidptr)
1239 {
1240 int r;
1241
1242 assert(!db->current_txn);
1243 assert(!*tidptr);
1244
1245 /* grab a r/w lock */
1246 r = write_lock(db);
1247 if (r) return r;
1248
1249 /* create the transaction */
1250 db->txn_num++;
1251 db->current_txn = xmalloc(sizeof(struct txn));
1252 db->current_txn->num = db->txn_num;
1253
1254 /* pass it back out */
1255 *tidptr = db->current_txn;
1256
1257 return 0;
1258 }
1259
dispose_db(struct dbengine * db)1260 static void dispose_db(struct dbengine *db)
1261 {
1262 if (!db) return;
1263
1264 if (db->mf) {
1265 if (mappedfile_islocked(db->mf))
1266 unlock(db);
1267 mappedfile_close(&db->mf);
1268 }
1269
1270 buf_free(&db->loc.keybuf);
1271
1272 free(db);
1273 }
1274
1275 /************************************************************/
1276
opendb(const char * fname,int flags,struct dbengine ** ret,struct txn ** mytid)1277 static int opendb(const char *fname, int flags, struct dbengine **ret, struct txn **mytid)
1278 {
1279 struct dbengine *db;
1280 int r;
1281 int mappedfile_flags = MAPPEDFILE_RW;
1282
1283 assert(fname);
1284 assert(ret);
1285
1286 db = (struct dbengine *) xzmalloc(sizeof(struct dbengine));
1287
1288 if (flags & CYRUSDB_CREATE)
1289 mappedfile_flags |= MAPPEDFILE_CREATE;
1290
1291 db->open_flags = flags & ~CYRUSDB_CREATE;
1292 db->compar = (flags & CYRUSDB_MBOXSORT) ? bsearch_ncompare_mbox
1293 : bsearch_ncompare_raw;
1294
1295 r = mappedfile_open(&db->mf, fname, mappedfile_flags);
1296 if (r) {
1297 /* convert to CYRUSDB errors*/
1298 if (r == -ENOENT) r = CYRUSDB_NOTFOUND;
1299 else r = CYRUSDB_IOERROR;
1300 goto done;
1301 }
1302
1303 db->is_open = 0;
1304
1305 /* grab a read lock, only reading the header */
1306 r = read_lock(db);
1307 if (r) goto done;
1308
1309 /* if there's any issue which requires fixing, get a write lock */
1310 if (0) {
1311 retry_write:
1312 unlock(db);
1313 db->is_open = 0;
1314 r = write_lock(db);
1315 if (r) goto done;
1316 }
1317
1318 /* if the map size is zero, it's a new file - we need to create an
1319 * initial header */
1320 if (mappedfile_size(db->mf) == 0) {
1321 struct skiprecord dummy;
1322
1323 if (!mappedfile_iswritelocked(db->mf))
1324 goto retry_write;
1325
1326 /* create the dummy! */
1327 memset(&dummy, 0, sizeof(struct skiprecord));
1328 dummy.type = DUMMY;
1329 dummy.level = MAXLEVEL;
1330
1331 /* append dummy after header location */
1332 db->end = DUMMY_OFFSET;
1333 r = write_record(db, &dummy, NULL, NULL);
1334 if (r) {
1335 syslog(LOG_ERR, "DBERROR: writing dummy node for %s: %m",
1336 fname);
1337 goto done;
1338 }
1339
1340 /* create the header */
1341 db->header.version = VERSION;
1342 db->header.generation = 1;
1343 db->header.repack_size = db->end;
1344 db->header.current_size = db->end;
1345 r = commit_header(db);
1346 if (r) {
1347 syslog(LOG_ERR, "DBERROR: writing header for %s: %m",
1348 fname);
1349 goto done;
1350 }
1351 }
1352
1353 db->is_open = 1;
1354
1355 r = read_header(db);
1356 if (r) goto done;
1357
1358 if (!db_is_clean(db)) {
1359 if (!mappedfile_iswritelocked(db->mf))
1360 goto retry_write;
1361
1362 /* recovery will clean the flag once it's committed the fixes */
1363 r = recovery(db);
1364 if (r) goto done;
1365 }
1366
1367 /* unlock the DB */
1368 unlock(db);
1369
1370 *ret = db;
1371
1372 if (mytid) {
1373 r = newtxn(db, mytid);
1374 if (r) goto done;
1375 }
1376
1377 done:
1378 if (r) dispose_db(db);
1379 return r;
1380 }
1381
myopen(const char * fname,int flags,struct dbengine ** ret,struct txn ** mytid)1382 static int myopen(const char *fname, int flags, struct dbengine **ret, struct txn **mytid)
1383 {
1384 struct db_list *ent;
1385 struct dbengine *mydb;
1386 int r = 0;
1387
1388 /* do we already have this DB open? */
1389 for (ent = open_twoskip; ent; ent = ent->next) {
1390 if (strcmp(FNAME(ent->db), fname)) continue;
1391 if (ent->db->current_txn)
1392 return CYRUSDB_LOCKED;
1393 if (mytid) {
1394 r = newtxn(ent->db, mytid);
1395 if (r) return r;
1396 }
1397 ent->refcount++;
1398 *ret = ent->db;
1399 return 0;
1400 }
1401
1402 r = opendb(fname, flags, &mydb, mytid);
1403 if (r) return r;
1404
1405 /* track this database in the open list */
1406 ent = (struct db_list *) xzmalloc(sizeof(struct db_list));
1407 ent->db = mydb;
1408 ent->refcount = 1;
1409 ent->next = open_twoskip;
1410 open_twoskip = ent;
1411
1412 /* return the open DB */
1413 *ret = mydb;
1414
1415 return 0;
1416 }
1417
myclose(struct dbengine * db)1418 static int myclose(struct dbengine *db)
1419 {
1420 struct db_list *ent = open_twoskip;
1421 struct db_list *prev = NULL;
1422
1423 assert(db);
1424
1425 /* remove this DB from the open list */
1426 while (ent && ent->db != db) {
1427 prev = ent;
1428 ent = ent->next;
1429 }
1430 assert(ent);
1431
1432 if (--ent->refcount <= 0) {
1433 if (prev) prev->next = ent->next;
1434 else open_twoskip = ent->next;
1435 free(ent);
1436 if (mappedfile_islocked(db->mf))
1437 syslog(LOG_ERR, "twoskip: %s closed while still locked", FNAME(db));
1438 dispose_db(db);
1439 }
1440
1441 return 0;
1442 }
1443
1444 /*************** EXTERNAL APIS ***********************/
1445
myfetch(struct dbengine * db,const char * key,size_t keylen,const char ** foundkey,size_t * foundkeylen,const char ** data,size_t * datalen,struct txn ** tidptr,int fetchnext)1446 static int myfetch(struct dbengine *db,
1447 const char *key, size_t keylen,
1448 const char **foundkey, size_t *foundkeylen,
1449 const char **data, size_t *datalen,
1450 struct txn **tidptr, int fetchnext)
1451 {
1452 int r = 0;
1453
1454 assert(db);
1455 if (datalen) assert(data);
1456
1457 if (data) *data = NULL;
1458 if (datalen) *datalen = 0;
1459
1460 /* Hacky workaround:
1461 *
1462 * If no transaction was passed, but we're in a transaction,
1463 * then just do the read within that transaction.
1464 */
1465 if (!tidptr && db->current_txn)
1466 tidptr = &db->current_txn;
1467
1468 if (tidptr) {
1469 if (!*tidptr) {
1470 r = newtxn(db, tidptr);
1471 if (r) return r;
1472 }
1473 } else {
1474 /* grab a r lock */
1475 r = read_lock(db);
1476 if (r) return r;
1477 }
1478
1479 r = find_loc(db, key, keylen);
1480 if (r) goto done;
1481
1482 if (fetchnext) {
1483 r = advance_loc(db);
1484 if (r) goto done;
1485 }
1486
1487 if (foundkey) *foundkey = db->loc.keybuf.s;
1488 if (foundkeylen) *foundkeylen = db->loc.keybuf.len;
1489
1490 if (!r && db->loc.is_exactmatch) {
1491 if (data) *data = VAL(db, &db->loc.record);
1492 if (datalen) *datalen = db->loc.record.vallen;
1493 }
1494 else {
1495 /* we didn't get an exact match */
1496 r = CYRUSDB_NOTFOUND;
1497 }
1498
1499 done:
1500 if (!tidptr) {
1501 /* release read lock */
1502 int r1;
1503 if ((r1 = unlock(db)) < 0) {
1504 return r1;
1505 }
1506 }
1507
1508 return r;
1509 }
1510
1511 /* foreach allows for subsidary mailbox operations in 'cb'.
1512 if there is a txn, 'cb' must make use of it.
1513 */
myforeach(struct dbengine * db,const char * prefix,size_t prefixlen,foreach_p * goodp,foreach_cb * cb,void * rock,struct txn ** tidptr)1514 static int myforeach(struct dbengine *db,
1515 const char *prefix, size_t prefixlen,
1516 foreach_p *goodp,
1517 foreach_cb *cb, void *rock,
1518 struct txn **tidptr)
1519 {
1520 int r = 0, cb_r = 0;
1521 int num_misses = 0;
1522 int need_unlock = 0;
1523 const char *val;
1524 size_t vallen;
1525 struct buf keybuf = BUF_INITIALIZER;
1526
1527 assert(db);
1528 assert(cb);
1529 if (prefixlen) assert(prefix);
1530
1531 /* Hacky workaround:
1532 *
1533 * If no transaction was passed, but we're in a transaction,
1534 * then just do the read within that transaction.
1535 */
1536 if (!tidptr && db->current_txn)
1537 tidptr = &db->current_txn;
1538 if (tidptr) {
1539 if (!*tidptr) {
1540 r = newtxn(db, tidptr);
1541 if (r) return r;
1542 }
1543 } else {
1544 /* grab a r lock */
1545 r = read_lock(db);
1546 if (r) return r;
1547 need_unlock = 1;
1548 }
1549
1550 r = find_loc(db, prefix, prefixlen);
1551 if (r) goto done;
1552
1553 if (!db->loc.is_exactmatch) {
1554 /* advance to the first match */
1555 r = advance_loc(db);
1556 if (r) goto done;
1557 }
1558
1559 while (db->loc.is_exactmatch) {
1560 /* does it match prefix? */
1561 if (prefixlen) {
1562 if (db->loc.record.keylen < prefixlen) break;
1563 if (db->compar(KEY(db, &db->loc.record), prefixlen, prefix, prefixlen)) break;
1564 }
1565
1566 val = VAL(db, &db->loc.record);
1567 vallen = db->loc.record.vallen;
1568
1569 if (!goodp || goodp(rock, db->loc.keybuf.s, db->loc.keybuf.len,
1570 val, vallen)) {
1571 /* take a copy of they key - just in case cb does actions on this database
1572 * and clobbers loc */
1573 buf_copy(&keybuf, &db->loc.keybuf);
1574
1575 if (!tidptr) {
1576 /* release read lock */
1577 r = unlock(db);
1578 if (r) goto done;
1579 need_unlock = 0;
1580 }
1581
1582 /* make callback */
1583 cb_r = cb(rock, db->loc.keybuf.s, db->loc.keybuf.len,
1584 val, vallen);
1585 if (cb_r) break;
1586
1587 if (!tidptr) {
1588 /* grab a r lock */
1589 r = read_lock(db);
1590 if (r) goto done;
1591 need_unlock = 1;
1592
1593 num_misses = 0;
1594 }
1595
1596 /* should be cheap if we're already here */
1597 r = find_loc(db, keybuf.s, keybuf.len);
1598 if (r) goto done;
1599 }
1600 else if (!tidptr) {
1601 num_misses++;
1602 if (num_misses > FOREACH_LOCK_RELEASE) {
1603 /* take a copy of they key - just in case cb does actions on this database
1604 * and clobbers loc */
1605 buf_copy(&keybuf, &db->loc.keybuf);
1606
1607 /* release read lock */
1608 r = unlock(db);
1609 if (r) goto done;
1610 need_unlock = 0;
1611
1612 /* grab a r lock */
1613 r = read_lock(db);
1614 if (r) goto done;
1615 need_unlock = 1;
1616
1617 /* should be cheap if we're already here */
1618 r = find_loc(db, keybuf.s, keybuf.len);
1619 if (r) goto done;
1620
1621 num_misses = 0;
1622 }
1623 }
1624
1625 /* move to the next one */
1626 r = advance_loc(db);
1627 if (r) goto done;
1628 }
1629
1630 done:
1631
1632 buf_free(&keybuf);
1633
1634 if (need_unlock) {
1635 /* release read lock */
1636 int r1 = unlock(db);
1637 if (r1) return r1;
1638 }
1639
1640 return r ? r : cb_r;
1641 }
1642
1643 /* helper function for all writes - wraps create and delete and the FORCE
1644 * logic for each */
skipwrite(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,int force)1645 static int skipwrite(struct dbengine *db,
1646 const char *key, size_t keylen,
1647 const char *data, size_t datalen,
1648 int force)
1649 {
1650 int r = find_loc(db, key, keylen);
1651 if (r) return r;
1652
1653 /* could be a delete or a replace */
1654 if (db->loc.is_exactmatch) {
1655 if (!data) return delete_here(db);
1656 if (!force) return CYRUSDB_EXISTS;
1657 /* unchanged? Save the IO */
1658 if (!db->compar(data, datalen,
1659 VAL(db, &db->loc.record),
1660 db->loc.record.vallen))
1661 return 0;
1662 return store_here(db, data, datalen);
1663 }
1664
1665 /* only create if it's not a delete, obviously */
1666 if (data) return store_here(db, data, datalen);
1667
1668 /* must be a delete - are we forcing? */
1669 if (!force) return CYRUSDB_NOTFOUND;
1670
1671 return 0;
1672 }
1673
mycommit(struct dbengine * db,struct txn * tid)1674 static int mycommit(struct dbengine *db, struct txn *tid)
1675 {
1676 struct skiprecord newrecord;
1677 int r = 0;
1678
1679 assert(db);
1680 assert(tid == db->current_txn);
1681
1682 /* no need to abort if we're not dirty */
1683 if (!(db->header.flags & DIRTY))
1684 goto done;
1685
1686 /* build a commit record */
1687 memset(&newrecord, 0, sizeof(struct skiprecord));
1688 newrecord.type = COMMIT;
1689 newrecord.nextloc[0] = db->header.current_size;
1690
1691 /* append to the file */
1692 r = append_record(db, &newrecord, NULL, NULL);
1693 if (r) goto done;
1694
1695 /* commit ALL outstanding changes first, before
1696 * rewriting the header */
1697 r = mappedfile_commit(db->mf);
1698 if (r) goto done;
1699
1700 /* finally, update the header and commit again */
1701 db->header.current_size = db->end;
1702 db->header.flags &= ~DIRTY;
1703 r = commit_header(db);
1704
1705 done:
1706 if (r) {
1707 int r2;
1708
1709 /* error during commit; we must abort */
1710 r2 = myabort(db, tid);
1711 if (r2) {
1712 syslog(LOG_ERR, "DBERROR: twoskip %s: commit AND abort failed",
1713 FNAME(db));
1714 }
1715 }
1716 else {
1717 if (!(db->open_flags & CYRUSDB_NOCOMPACT)
1718 && db->header.current_size > MINREWRITE
1719 && db->header.current_size > 2 * db->header.repack_size) {
1720 int r2 = mycheckpoint(db);
1721 if (r2) {
1722 syslog(LOG_NOTICE, "twoskip: failed to checkpoint %s: %m",
1723 FNAME(db));
1724 }
1725 }
1726 else {
1727 unlock(db);
1728 }
1729
1730 free(tid);
1731 db->current_txn = NULL;
1732 }
1733
1734 return r;
1735 }
1736
myabort(struct dbengine * db,struct txn * tid)1737 static int myabort(struct dbengine *db, struct txn *tid)
1738 {
1739 int r;
1740
1741 assert(db);
1742 assert(tid == db->current_txn);
1743
1744 /* free the tid */
1745 free(tid);
1746 db->current_txn = NULL;
1747 db->end = db->header.current_size;
1748
1749 /* recovery will clean up */
1750 r = recovery1(db, NULL);
1751
1752 buf_free(&db->loc.keybuf);
1753 memset(&db->loc, 0, sizeof(struct skiploc));
1754
1755 unlock(db);
1756
1757 return r;
1758 }
1759
mystore(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tidptr,int force)1760 static int mystore(struct dbengine *db,
1761 const char *key, size_t keylen,
1762 const char *data, size_t datalen,
1763 struct txn **tidptr, int force)
1764 {
1765 struct txn *localtid = NULL;
1766 int r = 0;
1767 int r2 = 0;
1768
1769 assert(db);
1770 assert(key && keylen);
1771
1772 /* not keeping the transaction, just create one local to
1773 * this function */
1774 if (!tidptr) tidptr = &localtid;
1775
1776 /* make sure we're write locked and up to date */
1777 if (!*tidptr) {
1778 r = newtxn(db, tidptr);
1779 if (r) return r;
1780 }
1781
1782 r = skipwrite(db, key, keylen, data, datalen, force);
1783
1784 if (r) {
1785 r2 = myabort(db, *tidptr);
1786 *tidptr = NULL;
1787 }
1788 else if (localtid) {
1789 /* commit the store, which releases the write lock */
1790 r = mycommit(db, localtid);
1791 }
1792
1793 return r2 ? r2 : r;
1794 }
1795
1796 /* compress 'db', closing at the end. Uses foreach to copy into a new
1797 * database, then rewrites over the old one */
1798
1799 struct copy_rock {
1800 struct dbengine *db;
1801 struct txn *tid;
1802 };
1803
copy_cb(void * rock,const char * key,size_t keylen,const char * val,size_t vallen)1804 static int copy_cb(void *rock,
1805 const char *key, size_t keylen,
1806 const char *val, size_t vallen)
1807 {
1808 struct copy_rock *cr = (struct copy_rock *)rock;
1809
1810 return mystore(cr->db, key, keylen, val, vallen, &cr->tid, 0);
1811 }
1812
mycheckpoint(struct dbengine * db)1813 static int mycheckpoint(struct dbengine *db)
1814 {
1815 size_t old_size = db->header.current_size;
1816 char newfname[1024];
1817 clock_t start = sclock();
1818 struct copy_rock cr;
1819 int r = 0;
1820
1821 r = myconsistent(db, db->current_txn);
1822 if (r) {
1823 syslog(LOG_ERR, "db %s, inconsistent pre-checkpoint, bailing out",
1824 FNAME(db));
1825 unlock(db);
1826 return r;
1827 }
1828
1829 /* open fname.NEW */
1830 snprintf(newfname, sizeof(newfname), "%s.NEW", FNAME(db));
1831 unlink(newfname);
1832
1833 cr.db = NULL;
1834 cr.tid = NULL;
1835 r = opendb(newfname, db->open_flags | CYRUSDB_CREATE, &cr.db, &cr.tid);
1836 if (r) return r;
1837
1838 r = myforeach(db, NULL, 0, NULL, copy_cb, &cr, &db->current_txn);
1839 if (r) goto err;
1840
1841 r = myconsistent(cr.db, cr.tid);
1842 if (r) {
1843 syslog(LOG_ERR, "db %s, inconsistent post-checkpoint, bailing out",
1844 FNAME(db));
1845 goto err;
1846 }
1847
1848 /* remember the repack size */
1849 cr.db->header.repack_size = cr.db->end;
1850
1851 /* increase the generation count */
1852 cr.db->header.generation = db->header.generation + 1;
1853
1854 r = mycommit(cr.db, cr.tid);
1855 if (r) goto err;
1856
1857 /* move new file to original file name */
1858 r = mappedfile_rename(cr.db->mf, FNAME(db));
1859 if (r) goto err;
1860
1861 /* OK, we're commmitted now - clean up */
1862 unlock(db);
1863
1864 /* gotta clean it all up */
1865 mappedfile_close(&db->mf);
1866 buf_free(&db->loc.keybuf);
1867
1868 *db = *cr.db;
1869 free(cr.db); /* leaked? */
1870
1871 {
1872 syslog(LOG_INFO,
1873 "twoskip: checkpointed %s (%llu record%s, %llu => %llu bytes) in %2.3f seconds",
1874 FNAME(db), (LLU)db->header.num_records,
1875 db->header.num_records == 1 ? "" : "s", (LLU)old_size,
1876 (LLU)(db->header.current_size),
1877 (sclock() - start) / (double) CLOCKS_PER_SEC);
1878 }
1879
1880 return 0;
1881
1882 err:
1883 if (cr.tid) myabort(cr.db, cr.tid);
1884 unlink(FNAME(cr.db));
1885 dispose_db(cr.db);
1886 unlock(db);
1887 return CYRUSDB_IOERROR;
1888 }
1889
1890
1891 /* dump the database.
1892 if detail == 1, dump all records.
1893 if detail == 2, also dump pointers for active records.
1894 if detail == 3, dump all records/all pointers.
1895 */
dump(struct dbengine * db,int detail)1896 static int dump(struct dbengine *db, int detail __attribute__((unused)))
1897 {
1898 struct skiprecord record;
1899 struct buf scratch = BUF_INITIALIZER;
1900 size_t offset = DUMMY_OFFSET;
1901 int r = 0;
1902 int i;
1903
1904 printf("HEADER: v=%lu fl=%lu num=%llu sz=(%08llX/%08llX)\n",
1905 (LU)db->header.version,
1906 (LU)db->header.flags,
1907 (LLU)db->header.num_records,
1908 (LLU)db->header.current_size,
1909 (LLU)db->header.repack_size);
1910
1911 while (offset < db->header.current_size) {
1912 printf("%08llX ", (LLU)offset);
1913
1914 r = read_onerecord(db, offset, &record);
1915
1916 if (r) {
1917 printf("ERROR\n");
1918 break;
1919 }
1920
1921 switch (record.type) {
1922 case DELETE:
1923 printf("DELETE ptr=%08llX\n", (LLU)record.nextloc[0]);
1924 break;
1925
1926 case COMMIT:
1927 printf("COMMIT start=%08llX\n", (LLU)record.nextloc[0]);
1928 break;
1929
1930 case RECORD:
1931 case DUMMY:
1932 buf_setmap(&scratch, KEY(db, &record), record.keylen);
1933 buf_replace_char(&scratch, '\0', '-');
1934 printf("%s kl=%llu dl=%llu lvl=%d (%s)\n",
1935 (record.type == RECORD ? "RECORD" : "DUMMY"),
1936 (LLU)record.keylen, (LLU)record.vallen,
1937 record.level, buf_cstring(&scratch));
1938 printf("\t");
1939 for (i = 0; i <= record.level; i++) {
1940 printf("%08llX ", (LLU)record.nextloc[i]);
1941 if (!(i % 8))
1942 printf("\n\t");
1943 }
1944 printf("\n");
1945 break;
1946 }
1947
1948 offset += record.len;
1949 }
1950
1951 buf_free(&scratch);
1952
1953 return r;
1954 }
1955
consistent(struct dbengine * db)1956 static int consistent(struct dbengine *db)
1957 {
1958 int r;
1959
1960 r = read_lock(db);
1961 if (r) return r;
1962
1963 r = myconsistent(db, NULL);
1964
1965 unlock(db);
1966
1967 return r;
1968 }
1969
1970 /* perform some basic consistency checks */
myconsistent(struct dbengine * db,struct txn * tid)1971 static int myconsistent(struct dbengine *db, struct txn *tid)
1972 {
1973 struct skiprecord prevrecord;
1974 struct skiprecord record;
1975 size_t fwd[MAXLEVEL];
1976 size_t num_records = 0;
1977 int r = 0;
1978 int cmp;
1979 int i;
1980
1981 assert(db->current_txn == tid); /* could both be null */
1982
1983 /* read in the dummy */
1984 r = read_onerecord(db, DUMMY_OFFSET, &prevrecord);
1985 if (r) return r;
1986
1987 /* set up the location pointers */
1988 for (i = 0; i < MAXLEVEL; i++)
1989 fwd[i] = _getloc(db, &prevrecord, i);
1990
1991 while (fwd[0]) {
1992 r = read_onerecord(db, fwd[0], &record);
1993 if (r) return r;
1994
1995 if (record.type == DELETE) {
1996 fwd[0] = record.nextloc[0];
1997 continue;
1998 }
1999
2000 cmp = db->compar(KEY(db, &record), record.keylen,
2001 KEY(db, &prevrecord), prevrecord.keylen);
2002 if (cmp <= 0) {
2003 syslog(LOG_ERR, "DBERROR: twoskip out of order %s: %.*s (%08llX) <= %.*s (%08llX)",
2004 FNAME(db), (int)record.keylen, KEY(db, &record),
2005 (LLU)record.offset,
2006 (int)prevrecord.keylen, KEY(db, &prevrecord),
2007 (LLU)prevrecord.offset);
2008 return CYRUSDB_INTERNAL;
2009 }
2010
2011 for (i = 0; i < record.level; i++) {
2012 /* check the old pointer was to here */
2013 if (fwd[i] != record.offset) {
2014 syslog(LOG_ERR, "DBERROR: twoskip broken linkage %s: %08llX at %d, expected %08llX",
2015 FNAME(db), (LLU)record.offset, i, (LLU)fwd[i]);
2016 return CYRUSDB_INTERNAL;
2017 }
2018 /* and advance to the new pointer */
2019 fwd[i] = _getloc(db, &record, i);
2020 }
2021
2022 /* keep a copy for comparison purposes */
2023 num_records++;
2024 prevrecord = record;
2025 }
2026
2027 for (i = 0; i < MAXLEVEL; i++) {
2028 if (fwd[i]) {
2029 syslog(LOG_ERR, "DBERROR: twoskip broken tail %s: %08llX at %d",
2030 FNAME(db), (LLU)fwd[i], i);
2031 return CYRUSDB_INTERNAL;
2032 }
2033 }
2034
2035 /* we walked the whole file and saw every pointer */
2036
2037 if (num_records != db->header.num_records) {
2038 syslog(LOG_ERR, "DBERROR: twoskip record count mismatch %s: %llu should be %llu",
2039 FNAME(db), (LLU)num_records, (LLU)db->header.num_records);
2040 return CYRUSDB_INTERNAL;
2041 }
2042
2043 return 0;
2044 }
2045
_copy_commit(struct dbengine * db,struct dbengine * newdb,struct skiprecord * commit)2046 static int _copy_commit(struct dbengine *db, struct dbengine *newdb,
2047 struct skiprecord *commit)
2048 {
2049 struct txn *tid = NULL;
2050 struct skiprecord record;
2051 const char *val;
2052 size_t offset;
2053 int r = 0;
2054
2055 for (offset = commit->nextloc[0]; offset < commit->offset; offset += record.len) {
2056 r = read_onerecord(db, offset, &record);
2057 if (r) goto err;
2058 switch (record.type) {
2059 case DELETE:
2060 val = NULL;
2061 break;
2062 case RECORD:
2063 val = VAL(db, &record);
2064 break;
2065 default:
2066 r = CYRUSDB_IOERROR;
2067 goto err;
2068 }
2069
2070 /* store into the new DB */
2071 r = mystore(newdb, KEY(db, &record), record.keylen, val, record.vallen, &tid, 1);
2072 if (r) goto err;
2073 }
2074
2075 if (tid) r = mycommit(newdb, tid);
2076 if (r) return r;
2077
2078 return 0;
2079
2080 err:
2081 if (tid) myabort(newdb, tid);
2082 return r;
2083 }
2084
2085 /* recovery2 - the file is really screwed. Basically, we
2086 * failed to run recovery. Try reading out records from
2087 * the top and applying commits to a new file instead */
recovery2(struct dbengine * db,int * count)2088 static int recovery2(struct dbengine *db, int *count)
2089 {
2090 uint64_t oldcount = db->header.num_records;
2091 struct skiprecord record;
2092 struct dbengine *newdb = NULL;
2093 char newfname[1024];
2094 size_t offset;
2095 int r = 0;
2096
2097 /* open fname.NEW */
2098 snprintf(newfname, sizeof(newfname), "%s.NEW", FNAME(db));
2099 unlink(newfname);
2100
2101 r = opendb(newfname, db->open_flags | CYRUSDB_CREATE, &newdb, NULL);
2102 if (r) return r;
2103
2104 /* increase the generation count */
2105 newdb->header.generation = db->header.generation + 1;
2106
2107 /* start with the dummy */
2108 for (offset = DUMMY_OFFSET; offset < SIZE(db); offset += record.len) {
2109 r = read_onerecord(db, offset, &record);
2110 if (r) {
2111 syslog(LOG_ERR, "DBERROR: %s failed to read at %08llX in recovery2, truncating",
2112 FNAME(db), (LLU)offset);
2113 break;
2114 }
2115 if (record.type == COMMIT) {
2116 r = _copy_commit(db, newdb, &record);
2117 if (r) {
2118 syslog(LOG_ERR, "DBERROR: %s failed to apply commit at %08llX in recovery2, truncating",
2119 FNAME(db), (LLU)offset);
2120 break;
2121 }
2122 }
2123 }
2124
2125 if (!newdb->header.num_records) {
2126 /* no records found - almost certainly bogus, and even if not,
2127 * there's no point recovering a zero record file */
2128 syslog(LOG_ERR, "DBERROR: %s no records found in recovery2, aborting",
2129 FNAME(db));
2130 r = CYRUSDB_NOTFOUND;
2131 goto err;
2132 }
2133
2134 /* regardless, we had a commit during create, and in any _copy_commit, so
2135 * rename into place */
2136
2137 /* move new file to original file name */
2138 r = mappedfile_rename(newdb->mf, FNAME(db));
2139 if (r) goto err;
2140
2141 /* OK, we're commmitted now - clean up */
2142 unlock(db);
2143
2144 /* gotta clean it all up */
2145 mappedfile_close(&db->mf);
2146 buf_free(&db->loc.keybuf);
2147
2148 *db = *newdb;
2149 free(newdb); /* leaked? */
2150
2151 syslog(LOG_NOTICE, "twoskip: recovery2 %s - rescued %llu of %llu records",
2152 FNAME(db), (LLU)db->header.num_records, (LLU)oldcount);
2153
2154 if (count) *count = db->header.num_records;
2155
2156 return 0;
2157
2158 err:
2159 unlink(FNAME(newdb));
2160 myclose(newdb);
2161 return r;
2162 }
2163
2164 /* run recovery on this file.
2165 * always called with a write lock. */
recovery1(struct dbengine * db,int * count)2166 static int recovery1(struct dbengine *db, int *count)
2167 {
2168 size_t prev[MAXLEVEL+1];
2169 size_t next[MAXLEVEL+1];
2170 struct skiprecord record;
2171 struct skiprecord prevrecord;
2172 struct skiprecord fixrecord;
2173 size_t nextoffset = 0;
2174 uint64_t num_records = 0;
2175 int changed = 0;
2176 int r = 0;
2177 int cmp;
2178 int i;
2179
2180 assert(mappedfile_iswritelocked(db->mf));
2181
2182 /* no need to run recovery if we're consistent */
2183 if (db_is_clean(db))
2184 return 0;
2185
2186 /* we can't recovery a file that's not created yet */
2187 assert(db->header.current_size > HEADER_SIZE);
2188
2189 /* dirty the header if not already dirty */
2190 if (!(db->header.flags & DIRTY)) {
2191 db->header.flags |= DIRTY;
2192 r = commit_header(db);
2193 if (r) return r;
2194 }
2195
2196 /* start with the dummy */
2197 r = read_onerecord(db, DUMMY_OFFSET, &prevrecord);
2198 if (r) return r;
2199
2200 /* and pointers forwards */
2201 for (i = 2; i <= MAXLEVEL; i++) {
2202 prev[i] = prevrecord.offset;
2203 next[i] = prevrecord.nextloc[i];
2204 }
2205
2206 /* check for broken level - pointers */
2207 for (i = 0; i < 2; i++) {
2208 if (prevrecord.nextloc[i] >= db->end) {
2209 prevrecord.nextloc[i] = 0;
2210 r = rewrite_record(db, &prevrecord);
2211 changed++;
2212 }
2213 }
2214
2215 nextoffset = _getloc(db, &prevrecord, 0);
2216
2217 while (nextoffset) {
2218 r = read_onerecord(db, nextoffset, &record);
2219 if (r) return r;
2220
2221 /* just skip over delele records */
2222 if (record.type == DELETE) {
2223 nextoffset = record.nextloc[0];
2224 continue;
2225 }
2226
2227 cmp = db->compar(KEY(db, &record), record.keylen,
2228 KEY(db, &prevrecord), prevrecord.keylen);
2229 if (cmp <= 0) {
2230 syslog(LOG_ERR, "DBERROR: twoskip out of order %s: %.*s (%08llX) <= %.*s (%08llX)",
2231 FNAME(db), (int)record.keylen, KEY(db, &record),
2232 (LLU)record.offset,
2233 (int)prevrecord.keylen, KEY(db, &prevrecord),
2234 (LLU)prevrecord.offset);
2235 return CYRUSDB_INTERNAL;
2236 }
2237
2238 /* check for old offsets needing fixing */
2239 for (i = 2; i <= record.level; i++) {
2240 if (next[i] != record.offset) {
2241 /* need to fix up the previous record to point here */
2242 r = read_onerecord(db, prev[i], &fixrecord);
2243 if (r) return r;
2244
2245 /* XXX - optimise adjacent same records */
2246 fixrecord.nextloc[i] = record.offset;
2247 r = rewrite_record(db, &fixrecord);
2248 if (r) return r;
2249 changed++;
2250 }
2251 prev[i] = record.offset;
2252 next[i] = record.nextloc[i];
2253 }
2254
2255 /* check for broken level - pointers */
2256 for (i = 0; i < 2; i++) {
2257 if (record.nextloc[i] >= db->end) {
2258 record.nextloc[i] = 0;
2259 r = rewrite_record(db, &record);
2260 if (r) return r;
2261 changed++;
2262 }
2263 }
2264
2265 num_records++;
2266
2267 /* find the next record */
2268 nextoffset = _getloc(db, &record, 0);
2269
2270 prevrecord = record;
2271 }
2272
2273 /* check for remaining offsets needing fixing */
2274 for (i = 2; i <= MAXLEVEL; i++) {
2275 if (next[i]) {
2276 /* need to fix up the previous record to point to the end */
2277 r = read_onerecord(db, prev[i], &fixrecord);
2278 if (r) return r;
2279
2280 /* XXX - optimise, same as above */
2281 fixrecord.nextloc[i] = 0;
2282 r = rewrite_record(db, &fixrecord);
2283 if (r) return r;
2284 changed++;
2285 }
2286 }
2287
2288 r = mappedfile_truncate(db->mf, db->header.current_size);
2289 if (r) return r;
2290
2291 r = mappedfile_commit(db->mf);
2292 if (r) return r;
2293
2294 /* clear the dirty flag */
2295 db->header.flags &= ~DIRTY;
2296 db->header.num_records = num_records;
2297 r = commit_header(db);
2298 if (r) return r;
2299
2300 if (count) *count = changed;
2301
2302 return 0;
2303 }
2304
recovery(struct dbengine * db)2305 static int recovery(struct dbengine *db)
2306 {
2307 clock_t start = sclock();
2308 int count = 0;
2309 int r;
2310
2311 /* no need to run recovery if we're consistent */
2312 if (db_is_clean(db))
2313 return 0;
2314
2315 r = recovery1(db, &count);
2316 if (r) {
2317 syslog(LOG_ERR, "DBERROR: recovery1 failed %s, trying recovery2", FNAME(db));
2318 count = 0;
2319 r = recovery2(db, &count);
2320 if (r) return r;
2321 }
2322
2323 {
2324 syslog(LOG_INFO,
2325 "twoskip: recovered %s (%llu record%s, %llu bytes) in %2.3f seconds - fixed %d offset%s",
2326 FNAME(db), (LLU)db->header.num_records,
2327 db->header.num_records == 1 ? "" : "s",
2328 (LLU)(db->header.current_size),
2329 (sclock() - start) / (double) CLOCKS_PER_SEC,
2330 count, count == 1 ? "" : "s");
2331 }
2332
2333 return 0;
2334 }
2335
fetch(struct dbengine * mydb,const char * key,size_t keylen,const char ** data,size_t * datalen,struct txn ** tidptr)2336 static int fetch(struct dbengine *mydb,
2337 const char *key, size_t keylen,
2338 const char **data, size_t *datalen,
2339 struct txn **tidptr)
2340 {
2341 assert(key);
2342 assert(keylen);
2343 return myfetch(mydb, key, keylen, NULL, NULL,
2344 data, datalen, tidptr, 0);
2345 }
2346
fetchnext(struct dbengine * mydb,const char * key,size_t keylen,const char ** foundkey,size_t * fklen,const char ** data,size_t * datalen,struct txn ** tidptr)2347 static int fetchnext(struct dbengine *mydb,
2348 const char *key, size_t keylen,
2349 const char **foundkey, size_t *fklen,
2350 const char **data, size_t *datalen,
2351 struct txn **tidptr)
2352 {
2353 return myfetch(mydb, key, keylen, foundkey, fklen,
2354 data, datalen, tidptr, 1);
2355 }
2356
create(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tid)2357 static int create(struct dbengine *db,
2358 const char *key, size_t keylen,
2359 const char *data, size_t datalen,
2360 struct txn **tid)
2361 {
2362 if (datalen) assert(data);
2363 return mystore(db, key, keylen, data ? data : "", datalen, tid, 0);
2364 }
2365
store(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tid)2366 static int store(struct dbengine *db,
2367 const char *key, size_t keylen,
2368 const char *data, size_t datalen,
2369 struct txn **tid)
2370 {
2371 if (datalen) assert(data);
2372 return mystore(db, key, keylen, data ? data : "", datalen, tid, 1);
2373 }
2374
delete(struct dbengine * db,const char * key,size_t keylen,struct txn ** tid,int force)2375 static int delete(struct dbengine *db,
2376 const char *key, size_t keylen,
2377 struct txn **tid, int force)
2378 {
2379 return mystore(db, key, keylen, NULL, 0, tid, force);
2380 }
2381
2382 /* twoskip compar function is set at open */
mycompar(struct dbengine * db,const char * a,int alen,const char * b,int blen)2383 static int mycompar(struct dbengine *db, const char *a, int alen,
2384 const char *b, int blen)
2385 {
2386 return db->compar(a, alen, b, blen);
2387 }
2388
2389 HIDDEN struct cyrusdb_backend cyrusdb_twoskip =
2390 {
2391 "twoskip", /* name */
2392
2393 &cyrusdb_generic_init,
2394 &cyrusdb_generic_done,
2395 &cyrusdb_generic_sync,
2396 &cyrusdb_generic_archive,
2397 &cyrusdb_generic_unlink,
2398
2399 &myopen,
2400 &myclose,
2401
2402 &fetch,
2403 &fetch,
2404 &fetchnext,
2405
2406 &myforeach,
2407 &create,
2408 &store,
2409 &delete,
2410
2411 &mycommit,
2412 &myabort,
2413
2414 &dump,
2415 &consistent,
2416 &mycheckpoint,
2417 &mycompar
2418 };
2419