1 /* cyrusdb_twoskip.c - brand new twoskip implementation, not backwards anything
2  *
3  * Copyright (c) 1994-2008 Carnegie Mellon University.  All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in
14  *    the documentation and/or other materials provided with the
15  *    distribution.
16  *
17  * 3. The name "Carnegie Mellon University" must not be used to
18  *    endorse or promote products derived from this software without
19  *    prior written permission. For permission or any legal
20  *    details, please contact
21  *      Carnegie Mellon University
22  *      Center for Technology Transfer and Enterprise Creation
23  *      4615 Forbes Avenue
24  *      Suite 302
25  *      Pittsburgh, PA  15213
26  *      (412) 268-7393, fax: (412) 268-7395
27  *      innovation@andrew.cmu.edu
28  *
29  * 4. Redistributions of any form whatsoever must retain the following
30  *    acknowledgment:
31  *    "This product includes software developed by Computing Services
32  *     at Carnegie Mellon University (http://www.cmu.edu/computing/)."
33  *
34  * CARNEGIE MELLON UNIVERSITY DISCLAIMS ALL WARRANTIES WITH REGARD TO
35  * THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
36  * AND FITNESS, IN NO EVENT SHALL CARNEGIE MELLON UNIVERSITY BE LIABLE
37  * FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
38  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
39  * AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
40  * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
41  */
42 
43 #include <config.h>
44 
45 #include <errno.h>
46 #include <limits.h>
47 #include <stdlib.h>
48 #include <string.h>
49 #include <syslog.h>
50 #include <sys/types.h>
51 #ifdef HAVE_UNISTD_H
52 #include <unistd.h>
53 #endif
54 
55 #include "assert.h"
56 #include "bsearch.h"
57 #include "byteorder64.h"
58 #include "cyrusdb.h"
59 #include "crc32.h"
60 #include "libcyr_cfg.h"
61 #include "mappedfile.h"
62 #include "util.h"
63 #include "xmalloc.h"
64 
65 /*
66  * twoskip disk format.
67  *
68  * GOALS:
69  *  a) 64 bit through
70  *  b) Fast recovery after crashes
71  *  c) integrity checks throughout
72  *  d) simple format
73  *
74  * ACHIEVED BY:
75  *  a)
76  *   - 64 bit offsets for all values
77  *   - smaller initial keylen and vallen, but they can
78  *     can be extended up to 64 bits as well.
79  *   - no timestamps stored in the file.
80  *   - XXX - may behave strangely with large files on
81  *     32 bit architectures, particularly if size_t is
82  *     not 64 bit.
83  *
84  *  b)
85  *   - "dirty flag" is always set in the header and
86  *     fsynced BEFORE writing anything else.
87  *   - a header field for "current size", after which
88  *     all changes are considered suspect until commit.
89  *   - two "lowest level" offsets, used in alternating
90  *     order, so the highest value less than "current_size"
91  *     is always the correct pointer - this means we
92  *     never lose linkage, so never need to rewrite more
93  *     than the affected records during a recovery.
94  *   - all data is fsynced BEFORE rewriting the header to
95  *     remove the dirty flag.
96  *   - As long as the first 64 bytes of the file are
97  *     guaranteed to write all together or not at all,
98  *     we're crash-safe.
99  *
100  *  c)
101  *   - every byte in the file is covered by one of the
102  *     crc32 values stored throughout.
103  *   - header CRC is checked on every header read (open/lock)
104  *   - record head CRCs are checked on every record read,
105  *     including skiplist traverse.
106  *   - record tail CRCs (key/value) are check on every exact
107  *     key match result, during traverse for read or write.
108  *
109  *  d)
110  *   - there are no special commit, inorder, etc records.
111  *     just add records and ghost "delete" records to give
112  *     somewhere to point to on deletes.  These are only
113  *     at the lowest level, so don't have a significant
114  *     seek impact.
115  *   - modular code makes the logic much clearer.
116  */
117 
118 /*
119  * FORMAT:
120  *
121  * HEADER: 64 bytes
122  *  magic: 20 bytes: "4 bytes same as skiplist" "twoskip file\0\0\0\0"
123  *  version: 4 bytes
124  *  generation: 8 bytes
125  *  num_records: 8 bytes
126  *  repack_size: 8 bytes
127  *  current_size: 8 bytes
128  *  flags: 4 bytes
129  *  crc32: 4 bytes
130  *
131  * RECORDS:
132  *  type 1 byte
133  *  level: 1 byte
134  *  keylen: 2 bytes
135  *  vallen: 4 bytes
136  *  <optionally: 64 bit keylen if keylen == UINT16_MAX>
137  *  <optionally: 64 bit vallen if vallen == UINT32_MAX>
138  *  ptrs: 8 bytes * (level+1)
139  *  crc32_head: 4 bytes
140  *  crc32_tail: 4 bytes
141  *  key: (keylen bytes)
142  *  val: (vallen bytes)
143  *  padding: enough zeros to round up to an 8 byte multiple
144  *
145  * defined types, in skiplist language are:
146  * '=' -> DUMMY
147  * '+' -> ADD/INORDER
148  * '-' -> DELETE (kind of)
149  * '$' -> COMMIT
150  * but note that delete records behave differently - they're
151  * part of the pointer hierarchy, so that back offsets will
152  * always point somewhere past the 'end' until commit.
153  *
154  * The DUMMY is always MAXLEVEL level, with zero keylen and vallen
155  * The DELETE is always zero level, with zero keylen and vallen
156  * crc32_head is calculated on all bytes before it in the record
157  * crc32_tail is calculated on all bytes after, INCLUDING padding
158  *
159  * The COMMIT is inserted at the end of each transaction, and its
160  * single pointer points back to the start of the transaction.
161  */
162 
163 /* OPERATION:
164  *
165  * Finding a record works very much like skiplist, but we have
166  * a datastructure, 'struct skiploc', to help find it.  There
167  * is one of these embedded directly in the 'struct db', and
168  * it's the only one we ever use.
169  *
170  * skiploc contains two complete sets of offsets - at every
171  * level the offset of the previous record, and the offset of
172  * the next record, in relation to the requested key.  If the
173  * key is an exact match, it also contains a copy of the
174  * struct skiprecord.  If not, it contains the struct
175  * skiprecord for the previous record at level zero.
176  *
177  * It also contains a 'struct buf' with a copy of the requested
178  * key, which allows for efficient relocation of the position in
179  * the file when nothing is changed.
180  *
181  * So nothing is really changed with finding, except the special
182  * "level zero" alternative pointer.  We'll see that in action
183  * later.
184  *
185  * TRANSACTIONS:
186  * 1) before writing anything else, the header is updated with the
187  *    DIRTY flag set, and then fdatasync is run.
188  * 2) after all changes, fdatasync is run again.
189  * 3) finally, the header is updated with a new current_size and
190  *    the DIRTY flag clear, then fdatasync is run for a third time.
191  *
192  * ADDING A NEW RECORD:
193  * a new record is created with forward locations pointing to the
194  * next pointers in the skiploc.  This is appended to the file.
195  * This works for either a create OR replace, since in both cases
196  * the nextlocs are correct.  Level zero is set to zero on a new
197  * record.
198  *
199  * If it's not a replace, a "random" level will be chosen for the
200  * record.  All update operations below apply only up to this level,
201  * pointers above are unaffected - and continue over the location
202  * of this change.
203  *
204  * For each backloc, the record at that offset is read, and the
205  * forward pointers at each level are replaced with the offset
206  * of the new record.  NOTE: see special level zero logic below.
207  *
208  * Again, if this was a replace, the backlocs don't point to the
209  * current record, so it just silently disappears from the lists.
210  *
211  * DELETING A RECORD:
212  * The logic is almost identical to adding, but a delete record
213  * always has a level of zero, with only a single pointer forward
214  * to the next record.
215  *
216  * Because of this, the updates are done up to the level of the
217  * old record instead.
218  *
219  * THE SPECIAL "level zero":
220  * To allow a "fast fsck" after an aborted transaction, rather
221  * than having only a single level 1 pointer, we have two.  The
222  * following algorithm is used to select which one to change.
223  *
224  * The definition of "current_size" is the size of the database
225  * at last commit, it does not get updated during a transaction.
226  *
227  * So: when updating level 1 - if either level 1 or level 0 has
228  * a value >= current_size, then that value gets updated again.
229  * otherwise, the lowest value gets replaced with the new value.
230  *
231  * when reading, the highest value is used - except during
232  * recovery when it's the highest value less than current_size,
233  * since any "future" offsets are bogus.
234  *
235  * This means that there is always at least one offset which
236  * points to the "next" record as if the current transaction
237  * had never occured - allowing recovery to find all alive
238  * records without scanning and updating the rest of the file.
239  * This guarantee exists regardless of any ordering of writes
240  * within the transaction, any page could be inconsistent and
241  * the result is still a clean recovery.
242  *
243  * CHECKPOINT:
244  * Over time, a twoskip database accumulates cruft - replaced
245  * records and delete records.  Records out of order, slowing
246  * down sequential access.  When the size at last repack
247  * is sufficiently smaller than the current size (see the
248  * TUNING constants below) then the file is checkpointed.
249  * A checkpoint is achieved by creating a new file, and
250  * copying all the current records, in order, into it, then
251  * renaming the new file over the old.  The "generation"
252  * counter in the header is incremented to tell other users
253  * that offsets into the file are no longer valid.  This is
254  * more reliable than just using the inode, because inodes
255  * can be reused.
256  *
257  * LOCATION OPTIMISATION:
258  * If the generation is unchanged AND the size of the file
259  * is unchanged, then all offsets stored in the skiploc are
260  * still valid.  This is used to optimise finding the current
261  * key, advancing to the "next" key, and also to optimise
262  * regular fetches that happen to hit either the current key,
263  * the gap immediately after, or the next key.  All other
264  * locations cause a full relocate.
265  */
266 
267 
268 /********** TUNING *************/
269 
270 /* don't bother rewriting if the database has less than this much data */
271 #define MINREWRITE 16834
272 /* number of skiplist levels - 31 gives us binary search to 2^32 records.
273  * limited to 255 by file format, but skiplist had 20, and that was enough
274  * for most real uses.  31 is heaps. */
275 #define MAXLEVEL 31
276 /* should be 0.5 for binary search semantics */
277 #define PROB 0.5
278 
279 /* release lock in foreach at least every N records */
280 #define FOREACH_LOCK_RELEASE 256
281 
282 /* format specifics */
283 #undef VERSION /* defined in config.h */
284 #define VERSION 1
285 
286 /* type aliases */
287 #define LLU long long unsigned int
288 #define LU long unsigned int
289 
290 /* record types */
291 #define DUMMY '='
292 #define RECORD '+'
293 #define DELETE '-'
294 #define COMMIT '$'
295 
296 /********** DATA STRUCTURES *************/
297 
298 /* A single "record" in the twoskip file.  This could be a
299  * DUMMY, a KEYRECORD, a VALRECORD or even a DELETE - they
300  * all read and write with the same functions */
301 struct skiprecord {
302     /* location on disk (not part of the on-disk format as such) */
303     size_t offset;
304     size_t len;
305 
306     /* what are our header fields */
307     uint8_t type;
308     uint8_t level;
309     size_t keylen;
310     size_t vallen;
311 
312     /* where to do we go from here? */
313     size_t nextloc[MAXLEVEL+1];
314 
315     /* what do our integrity checks say? */
316     uint32_t crc32_head;
317     uint32_t crc32_tail;
318 
319     /* our key and value */
320     size_t keyoffset;
321     size_t valoffset;
322 };
323 
324 /* a location in the twoskip file.  We always have:
325  * record: if "is_exactmatch" this points to the record
326  *         with the matching key, otherwise it points to
327  *         the 'compar' order previous record.
328  * backloc: the records that point TO this location
329  *          at each level.  If is_exactmatch, they
330  *          point to the record, otherwise they are
331  *          the record.
332  * forwardloc: the records pointed to by the record
333  *             at 'backloc' at the same level.  Kept
334  *             here for efficiency
335  * keybuf: a copy of the requested key - we always keep
336  *         this so we can re-seek after the file has been
337  *         checkpointed under us (say a read-only foreach)
338  *
339  * generation and end can be used to see if anything in
340  * the file may have changed and needs re-reading.
341  */
342 struct skiploc {
343     /* requested, may not match actual record */
344     struct buf keybuf;
345     int is_exactmatch;
346 
347     /* current or next record */
348     struct skiprecord record;
349 
350     /* we need both sets of offsets to cheaply insert */
351     size_t backloc[MAXLEVEL+1];
352     size_t forwardloc[MAXLEVEL+1];
353 
354     /* need a generation so we know if the location is still valid */
355     uint64_t generation;
356     size_t end;
357 };
358 
359 enum {
360     UNLOCKED = 0,
361     READLOCKED = 1,
362     WRITELOCKED = 2,
363 };
364 
365 #define DIRTY (1<<0)
366 
367 struct txn {
368     /* logstart is where we start changes from on commit, where we truncate
369        to on abort */
370     int num;
371 };
372 
373 struct db_header {
374     /* header info */
375     uint32_t version;
376     uint32_t flags;
377     uint64_t generation;
378     uint64_t num_records;
379     size_t repack_size;
380     size_t current_size;
381 };
382 
383 struct dbengine {
384     /* file data */
385     struct mappedfile *mf;
386 
387     struct db_header header;
388     struct skiploc loc;
389 
390     /* tracking info */
391     int is_open;
392     size_t end;
393     int txn_num;
394     struct txn *current_txn;
395 
396     /* comparator function to use for sorting */
397     int open_flags;
398     int (*compar) (const char *s1, int l1, const char *s2, int l2);
399 };
400 
401 struct db_list {
402     struct dbengine *db;
403     struct db_list *next;
404     int refcount;
405 };
406 
407 #define HEADER_MAGIC ("\241\002\213\015twoskip file\0\0\0\0")
408 #define HEADER_MAGIC_SIZE (20)
409 
410 /* offsets of header files */
411 enum {
412     OFFSET_HEADER = 0,
413     OFFSET_VERSION = 20,
414     OFFSET_GENERATION = 24,
415     OFFSET_NUM_RECORDS = 32,
416     OFFSET_REPACK_SIZE = 40,
417     OFFSET_CURRENT_SIZE = 48,
418     OFFSET_FLAGS = 56,
419     OFFSET_CRC32 = 60,
420 };
421 
422 #define HEADER_SIZE 64
423 #define DUMMY_OFFSET HEADER_SIZE
424 #define MAXRECORDHEAD ((MAXLEVEL + 5)*8)
425 
426 /* mount a scratch monkey */
427 static union skipwritebuf {
428     uint64_t align;
429     char s[MAXRECORDHEAD];
430 } scratchspace;
431 
432 static struct db_list *open_twoskip = NULL;
433 
434 static int mycommit(struct dbengine *db, struct txn *tid);
435 static int myabort(struct dbengine *db, struct txn *tid);
436 static int mycheckpoint(struct dbengine *db);
437 static int myconsistent(struct dbengine *db, struct txn *tid);
438 static int recovery(struct dbengine *db);
439 static int recovery1(struct dbengine *db, int *count);
440 static int recovery2(struct dbengine *db, int *count);
441 
442 /************** HELPER FUNCTIONS ****************/
443 
444 #define BASE(db) mappedfile_base((db)->mf)
445 #define KEY(db, rec) (BASE(db) + (rec)->keyoffset)
446 #define VAL(db, rec) (BASE(db) + (rec)->valoffset)
447 #define SIZE(db) mappedfile_size((db)->mf)
448 #define FNAME(db) mappedfile_fname((db)->mf)
449 
450 /* calculate padding size */
451 #ifdef __DragonFly__
452 #undef roundup
453 #endif
roundup(size_t record_size,int howfar)454 static size_t roundup(size_t record_size, int howfar)
455 {
456     if (record_size % howfar)
457         record_size += howfar - (record_size % howfar);
458     return record_size;
459 }
460 
461 /* choose a level appropriately randomly */
randlvl(uint8_t lvl,uint8_t maxlvl)462 static uint8_t randlvl(uint8_t lvl, uint8_t maxlvl)
463 {
464     while (((float) rand() / (float) (RAND_MAX)) < PROB) {
465         lvl++;
466         if (lvl == maxlvl) break;
467     }
468     return lvl;
469 }
470 
471 /************** HEADER ****************/
472 
473 /* given an open, mapped db, read in the header information */
read_header(struct dbengine * db)474 static int read_header(struct dbengine *db)
475 {
476     uint32_t crc;
477 
478     assert(db && db->mf && db->is_open);
479 
480     if (SIZE(db) < HEADER_SIZE) {
481         syslog(LOG_ERR,
482                "twoskip: file not large enough for header: %s", FNAME(db));
483         return CYRUSDB_IOERROR;
484     }
485 
486     if (memcmp(BASE(db), HEADER_MAGIC, HEADER_MAGIC_SIZE)) {
487         syslog(LOG_ERR, "twoskip: invalid magic header: %s", FNAME(db));
488         return CYRUSDB_IOERROR;
489     }
490 
491     db->header.version
492         = ntohl(*((uint32_t *)(BASE(db) + OFFSET_VERSION)));
493 
494     if (db->header.version > VERSION) {
495         syslog(LOG_ERR, "twoskip: version mismatch: %s has version %d",
496                FNAME(db), db->header.version);
497         return CYRUSDB_IOERROR;
498     }
499 
500     db->header.generation
501         = ntohll(*((uint64_t *)(BASE(db) + OFFSET_GENERATION)));
502 
503     db->header.num_records
504         = ntohll(*((uint64_t *)(BASE(db) + OFFSET_NUM_RECORDS)));
505 
506     db->header.repack_size
507         = ntohll(*((uint64_t *)(BASE(db) + OFFSET_REPACK_SIZE)));
508 
509     db->header.current_size
510         = ntohll(*((uint64_t *)(BASE(db) + OFFSET_CURRENT_SIZE)));
511 
512     db->header.flags
513         = ntohl(*((uint32_t *)(BASE(db) + OFFSET_FLAGS)));
514 
515     crc = ntohl(*((uint32_t *)(BASE(db) + OFFSET_CRC32)));
516 
517     if (crc32_map(BASE(db), OFFSET_CRC32) != crc) {
518         syslog(LOG_ERR, "DBERROR: %s: twoskip header CRC failure",
519                FNAME(db));
520         return CYRUSDB_IOERROR;
521     }
522 
523     db->end = db->header.current_size;
524 
525     return 0;
526 }
527 
528 /* given an open, mapped, locked db, write the header information */
write_header(struct dbengine * db)529 static int write_header(struct dbengine *db)
530 {
531     char *buf = scratchspace.s;
532     int n;
533 
534     /* format one buffer */
535     memcpy(buf, HEADER_MAGIC, HEADER_MAGIC_SIZE);
536     *((uint32_t *)(buf + OFFSET_VERSION)) = htonl(db->header.version);
537     *((uint64_t *)(buf + OFFSET_GENERATION)) = htonll(db->header.generation);
538     *((uint64_t *)(buf + OFFSET_NUM_RECORDS)) = htonll(db->header.num_records);
539     *((uint64_t *)(buf + OFFSET_REPACK_SIZE)) = htonll(db->header.repack_size);
540     *((uint64_t *)(buf + OFFSET_CURRENT_SIZE)) = htonll(db->header.current_size);
541     *((uint32_t *)(buf + OFFSET_FLAGS)) = htonl(db->header.flags);
542     *((uint32_t *)(buf + OFFSET_CRC32)) = htonl(crc32_map(buf, OFFSET_CRC32));
543 
544     /* write it out */
545     n = mappedfile_pwrite(db->mf, buf, HEADER_SIZE, 0);
546     if (n < 0) return CYRUSDB_IOERROR;
547 
548     return 0;
549 }
550 
551 /* simple wrapper to write with an fsync */
commit_header(struct dbengine * db)552 static int commit_header(struct dbengine *db)
553 {
554     int r = write_header(db);
555     if (!r) r = mappedfile_commit(db->mf);
556     return r;
557 }
558 
559 /******************** RECORD *********************/
560 
check_tailcrc(struct dbengine * db,struct skiprecord * record)561 static int check_tailcrc(struct dbengine *db, struct skiprecord *record)
562 {
563     uint32_t crc;
564 
565     crc = crc32_map(BASE(db) + record->keyoffset,
566                     roundup(record->keylen + record->vallen, 8));
567     if (crc != record->crc32_tail) {
568         syslog(LOG_ERR, "DBERROR: invalid tail crc %s at %llX",
569                FNAME(db), (LLU)record->offset);
570         return CYRUSDB_IOERROR;
571     }
572 
573     return 0;
574 }
575 
576 /* read a single skiprecord at the given offset */
read_onerecord(struct dbengine * db,size_t offset,struct skiprecord * record)577 static int read_onerecord(struct dbengine *db, size_t offset,
578                           struct skiprecord *record)
579 {
580     const char *base;
581     int i;
582 
583     memset(record, 0, sizeof(struct skiprecord));
584 
585     if (!offset) return 0;
586 
587     record->offset = offset;
588     record->len = 24; /* absolute minimum */
589 
590     /* need space for at least the header plus some details */
591     if (record->offset + record->len > SIZE(db))
592         goto badsize;
593 
594     base = BASE(db) + offset;
595 
596     /* read in the record header */
597     record->type = base[0];
598     record->level = base[1];
599     record->keylen = ntohs(*((uint16_t *)(base + 2)));
600     record->vallen = ntohl(*((uint32_t *)(base + 4)));
601     offset += 8;
602 
603     /* make sure we fit */
604     if (record->level > MAXLEVEL) {
605         syslog(LOG_ERR, "DBERROR: twoskip invalid level %d for %s at %08llX",
606                record->level, FNAME(db), (LLU)offset);
607         return CYRUSDB_IOERROR;
608     }
609 
610     /* long key */
611     if (record->keylen == UINT16_MAX) {
612         base = BASE(db) + offset;
613         record->keylen = ntohll(*((uint64_t *)base));
614         offset += 8;
615     }
616 
617     /* long value */
618     if (record->vallen == UINT32_MAX) {
619         base = BASE(db) + offset;
620         record->vallen = ntohll(*((uint64_t *)base));
621         offset += 8;
622     }
623 
624     /* we know the length now */
625     record->len = (offset - record->offset) /* header including lengths */
626                 + 8 * (1 + record->level)   /* ptrs */
627                 + 8                         /* crc32s */
628                 + roundup(record->keylen + record->vallen, 8);  /* keyval */
629 
630     if (record->offset + record->len > SIZE(db))
631         goto badsize;
632 
633     for (i = 0; i <= record->level; i++) {
634         base = BASE(db) + offset;
635         record->nextloc[i] = ntohll(*((uint64_t *)base));
636         offset += 8;
637     }
638 
639     base = BASE(db) + offset;
640     record->crc32_head = ntohl(*((uint32_t *)base));
641     if (crc32_map(BASE(db) + record->offset, (offset - record->offset))
642         != record->crc32_head) {
643         syslog(LOG_ERR, "DBERROR: twoskip checksum head error for %s at %08llX",
644                FNAME(db), (LLU)offset);
645         return CYRUSDB_IOERROR;
646     }
647 
648     record->crc32_tail = ntohl(*((uint32_t *)(base+4)));
649 
650     record->keyoffset = offset + 8;
651     record->valoffset = record->keyoffset + record->keylen;
652 
653     return 0;
654 
655 badsize:
656     syslog(LOG_ERR, "twoskip: attempt to read past end of file %s: %08llX > %08llX",
657            FNAME(db), (LLU)record->offset + record->len, (LLU)SIZE(db));
658     return CYRUSDB_IOERROR;
659 }
660 
read_skipdelete(struct dbengine * db,size_t offset,struct skiprecord * record)661 static int read_skipdelete(struct dbengine *db, size_t offset,
662                            struct skiprecord *record)
663 {
664     int r;
665 
666     r = read_onerecord(db, offset, record);
667     if (r) return r;
668 
669     if (record->type == DELETE)
670         r = read_onerecord(db, record->nextloc[0], record);
671 
672     return r;
673 }
674 
675 /* prepare the header part of the record (everything except the key, value
676  * and padding).  Used for both writes and rewrites. */
prepare_record(struct skiprecord * record,char * buf,size_t * sizep)677 static void prepare_record(struct skiprecord *record, char *buf, size_t *sizep)
678 {
679     int len = 8;
680     int i;
681 
682     assert(record->level <= MAXLEVEL);
683 
684     buf[0] = record->type;
685     buf[1] = record->level;
686     if (record->keylen < UINT16_MAX) {
687         *((uint16_t *)(buf+2)) = htons(record->keylen);
688     }
689     else {
690         *((uint16_t *)(buf+2)) = htons(UINT16_MAX);
691         *((uint64_t *)(buf+len)) = htonll(record->keylen);
692         len += 8;
693     }
694 
695     if (record->vallen < UINT32_MAX) {
696         *((uint32_t *)(buf+4)) = htonl(record->vallen);
697     }
698     else {
699         *((uint32_t *)(buf+4)) = htonl(UINT32_MAX);
700         *((uint64_t *)(buf+len)) = htonll(record->vallen);
701         len += 8;
702     }
703 
704     /* got pointers? */
705     for (i = 0; i <= record->level; i++) {
706         *((uint64_t *)(buf+len)) = htonll(record->nextloc[i]);
707         len += 8;
708     }
709 
710     /* NOTE: crc32_tail does not change */
711     record->crc32_head = crc32_map(buf, len);
712     *((uint32_t *)(buf+len)) = htonl(record->crc32_head);
713     *((uint32_t *)(buf+len+4)) = htonl(record->crc32_tail);
714     len += 8;
715 
716     *sizep = len;
717 }
718 
719 /* only changing the record head, so only rewrite that much */
rewrite_record(struct dbengine * db,struct skiprecord * record)720 static int rewrite_record(struct dbengine *db, struct skiprecord *record)
721 {
722     char *buf = scratchspace.s;
723     size_t len;
724     int n;
725 
726     /* we must already be in a transaction before updating records */
727     assert(db->header.flags & DIRTY);
728     assert(record->offset);
729 
730     prepare_record(record, buf, &len);
731 
732     n = mappedfile_pwrite(db->mf, buf, len, record->offset);
733     if (n < 0) return CYRUSDB_IOERROR;
734 
735     return 0;
736 }
737 
738 /* you can only write records at the end */
write_record(struct dbengine * db,struct skiprecord * record,const char * key,const char * val)739 static int write_record(struct dbengine *db, struct skiprecord *record,
740                         const char *key, const char *val)
741 {
742     char zeros[8] = {0, 0, 0, 0, 0, 0, 0, 0};
743     uint64_t len;
744     size_t iolen = 0;
745     struct iovec io[4];
746     int n;
747 
748     assert(!record->offset);
749 
750     /* we'll put the HEAD on later */
751     io[0].iov_base = scratchspace.s;
752     io[0].iov_len = 0;
753 
754     io[1].iov_base = (char *)key;
755     io[1].iov_len = record->keylen;
756 
757     io[2].iov_base = (char *)val;
758     io[2].iov_len = record->vallen;
759 
760     /* pad to 8 bytes */
761     len = record->vallen + record->keylen;
762     io[3].iov_base = zeros;
763     io[3].iov_len = roundup(len, 8) - len;
764 
765     /* calculate the CRC32 of the tail first */
766     record->crc32_tail = crc32_iovec(io+1, 3);
767 
768     /* prepare the record once we know the crc32 of the tail */
769     prepare_record(record, scratchspace.s, &iolen);
770     io[0].iov_base = scratchspace.s;
771     io[0].iov_len = iolen;
772 
773     /* write to the mapped file, getting the offset updated */
774     n = mappedfile_pwritev(db->mf, io, 4, db->end);
775     if (n < 0) return CYRUSDB_IOERROR;
776 
777     /* locate the record */
778     record->offset = db->end;
779     record->keyoffset = db->end + io[0].iov_len;
780     record->valoffset = record->keyoffset + record->keylen;
781     record->len = n;
782 
783     /* and advance the known file size */
784     db->end += n;
785 
786     return 0;
787 }
788 
789 /* helper to append a record, starting the transaction by dirtying the
790  * header first if required */
append_record(struct dbengine * db,struct skiprecord * record,const char * key,const char * val)791 static int append_record(struct dbengine *db, struct skiprecord *record,
792                          const char *key, const char *val)
793 {
794     int r;
795 
796     assert(db->current_txn);
797 
798     /* dirty the header if not already dirty */
799     if (!(db->header.flags & DIRTY)) {
800         db->header.flags |= DIRTY;
801         r = commit_header(db);
802         if (r) return r;
803     }
804 
805     return write_record(db, record, key, val);
806 }
807 
808 /************************** LOCATION MANAGEMENT ***************************/
809 
810 /* find the next record at a given level, encapsulating the
811  * level 0 magic */
_getloc(struct dbengine * db,struct skiprecord * record,uint8_t level)812 static size_t _getloc(struct dbengine *db, struct skiprecord *record,
813                       uint8_t level)
814 {
815     if (level)
816         return record->nextloc[level + 1];
817 
818     /* if one is past, must be the other */
819     if (record->nextloc[0] >= db->end)
820         return record->nextloc[1];
821     else if (record->nextloc[1] >= db->end)
822         return record->nextloc[0];
823 
824     /* highest remaining */
825     else if (record->nextloc[0] > record->nextloc[1])
826         return record->nextloc[0];
827     else
828         return record->nextloc[1];
829 }
830 
831 /* set the next record at a given level, encapsulating the
832  * level 0 magic */
_setloc(struct dbengine * db,struct skiprecord * record,uint8_t level,size_t offset)833 static void _setloc(struct dbengine *db, struct skiprecord *record,
834                     uint8_t level, size_t offset)
835 {
836     if (level) {
837         record->nextloc[level+1] = offset;
838         return;
839     }
840 
841     /* level zero is special */
842     /* already this transaction, update this one */
843     if (record->nextloc[0] >= db->header.current_size)
844         record->nextloc[0] = offset;
845     else if (record->nextloc[1] >= db->header.current_size)
846         record->nextloc[1] = offset;
847     /* otherwise, update older one */
848     else if (record->nextloc[1] > record->nextloc[0])
849         record->nextloc[0] = offset;
850     else
851         record->nextloc[1] = offset;
852 }
853 
854 /* finds a record, either an exact match or the record
855  * immediately before */
relocate(struct dbengine * db)856 static int relocate(struct dbengine *db)
857 {
858     struct skiploc *loc = &db->loc;
859     struct skiprecord newrecord;
860     size_t offset;
861     size_t oldoffset = 0;
862     uint8_t level;
863     uint8_t i;
864     int cmp = -1; /* never found a thing! */
865     int r;
866 
867     /* pointer validity */
868     loc->generation = db->header.generation;
869     loc->end = db->end;
870 
871     /* start with the dummy */
872     r = read_onerecord(db, DUMMY_OFFSET, &loc->record);
873     loc->is_exactmatch = 0;
874 
875     /* initialise pointers */
876     level = loc->record.level;
877     newrecord.offset = 0;
878     loc->backloc[level] = loc->record.offset;
879     loc->forwardloc[level] = 0;
880 
881     /* special case start pointer for efficiency */
882     if (!loc->keybuf.len) {
883         for (i = 0; i < loc->record.level; i++) {
884             loc->backloc[i] = loc->record.offset;
885             loc->forwardloc[i] = _getloc(db, &loc->record, i);
886         }
887         return 0;
888     }
889 
890     while (level) {
891         offset = _getloc(db, &loc->record, level-1);
892 
893         loc->backloc[level-1] = loc->record.offset;
894         loc->forwardloc[level-1] = offset;
895 
896         if (offset != oldoffset) {
897             oldoffset = offset;
898             r = read_skipdelete(db, offset, &newrecord);
899             if (r) return r;
900 
901             if (newrecord.offset) {
902                 assert(newrecord.level >= level);
903 
904                 cmp = db->compar(KEY(db, &newrecord), newrecord.keylen,
905                                  loc->keybuf.s, loc->keybuf.len);
906 
907                 /* not there?  stay at this level */
908                 if (cmp < 0) {
909                     /* move the offset range along */
910                     loc->record = newrecord;
911                     continue;
912                 }
913             }
914         }
915 
916         level--;
917     }
918 
919     if (cmp == 0) { /* we found it exactly */
920         loc->is_exactmatch = 1;
921         loc->record = newrecord;
922 
923         for (i = 0; i < loc->record.level; i++)
924             loc->forwardloc[i] = _getloc(db, &loc->record, i);
925 
926         /* make sure this record is complete */
927         r = check_tailcrc(db, &loc->record);
928 
929         if (r) return r;
930     }
931 
932     return 0;
933 }
934 
935 /* helper function to find a location, either by using the existing
936  * location if it's close enough, or using the full relocate above */
find_loc(struct dbengine * db,const char * key,size_t keylen)937 static int find_loc(struct dbengine *db, const char *key, size_t keylen)
938 {
939     struct skiprecord newrecord;
940     struct skiploc *loc = &db->loc;
941     int cmp, i, r;
942 
943     if (key != loc->keybuf.s)
944         buf_setmap(&loc->keybuf, key, keylen);
945     else if (keylen != loc->keybuf.len)
946         buf_truncate(&loc->keybuf, keylen);
947 
948     /* can we special case advance? */
949     if (keylen && loc->end == db->end
950                && loc->generation == db->header.generation) {
951         cmp = db->compar(KEY(db, &loc->record), loc->record.keylen,
952                          loc->keybuf.s, loc->keybuf.len);
953         /* same place, and was exact.  Otherwise we're going back,
954          * and the reverse pointers are no longer valid... */
955         if (db->loc.is_exactmatch && cmp == 0) {
956             return 0;
957         }
958 
959         /* we're looking after this record */
960         if (cmp < 0) {
961             for (i = 0; i < db->loc.record.level; i++)
962                 loc->backloc[i] = db->loc.record.offset;
963 
964             /* read the next record */
965             r = read_skipdelete(db, loc->forwardloc[0], &newrecord);
966             if (r) return r;
967 
968             /* nothing afterwards? */
969             if (!newrecord.offset) {
970                 db->loc.is_exactmatch = 0;
971                 return 0;
972             }
973 
974             /* now where is THIS record? */
975             cmp = db->compar(KEY(db, &newrecord), newrecord.keylen,
976                              loc->keybuf.s, loc->keybuf.len);
977 
978             /* exact match? */
979             if (cmp == 0) {
980                 db->loc.is_exactmatch = 1;
981                 db->loc.record = newrecord;
982 
983                 for (i = 0; i < newrecord.level; i++)
984                     loc->forwardloc[i] = _getloc(db, &newrecord, i);
985 
986                 /* make sure this record is complete */
987                 r = check_tailcrc(db, &loc->record);
988                 if (r) return r;
989 
990                 return 0;
991             }
992 
993             /* or in the gap */
994             if (cmp > 0) {
995                 db->loc.is_exactmatch = 0;
996                 return 0;
997             }
998         }
999         /* if we fell out here, it's not a "local" record, just search */
1000     }
1001 
1002     return relocate(db);
1003 }
1004 
1005 /* helper function to advance to the "next" record.  Used by foreach,
1006  * fetchnext, and internal functions */
advance_loc(struct dbengine * db)1007 static int advance_loc(struct dbengine *db)
1008 {
1009     struct skiploc *loc = &db->loc;
1010     uint8_t i;
1011     int r;
1012 
1013     /* has another session made changes?  Need to re-find the location */
1014     if (loc->end != db->end || loc->generation != db->header.generation) {
1015         r = relocate(db);
1016         if (r) return r;
1017     }
1018 
1019     /* update back pointers */
1020     for (i = 0; i < loc->record.level; i++)
1021         loc->backloc[i] = loc->record.offset;
1022 
1023     /* ADVANCE */
1024     r = read_skipdelete(db, loc->forwardloc[0], &loc->record);
1025     if (r) return r;
1026 
1027     /* reached the end? */
1028     if (!loc->record.offset) {
1029         buf_reset(&loc->keybuf);
1030         return relocate(db);
1031     }
1032 
1033     /* update forward pointers */
1034     for (i = 0; i < loc->record.level; i++)
1035         loc->forwardloc[i] = _getloc(db, &loc->record, i);
1036 
1037     /* keep our location */
1038     buf_setmap(&loc->keybuf, KEY(db, &loc->record), loc->record.keylen);
1039     loc->is_exactmatch = 1;
1040 
1041     /* make sure this record is complete */
1042     r = check_tailcrc(db, &loc->record);
1043     if (r) return r;
1044 
1045     return 0;
1046 }
1047 
1048 /* helper function to update all the back records efficiently
1049  * after appending a new record, either create or delete.  The
1050  * caller must set forwardloc[] correctly for each level it has
1051  * changed */
stitch(struct dbengine * db,uint8_t maxlevel,size_t newoffset)1052 static int stitch(struct dbengine *db, uint8_t maxlevel, size_t newoffset)
1053 {
1054     struct skiploc *loc = &db->loc;
1055     struct skiprecord oldrecord;
1056     uint8_t i;
1057     int r;
1058 
1059     oldrecord.level = 0;
1060     while (oldrecord.level < maxlevel) {
1061         uint8_t level = oldrecord.level;
1062 
1063         r = read_onerecord(db, loc->backloc[level], &oldrecord);
1064         if (r) return r;
1065 
1066         /* always getting higher */
1067         assert(oldrecord.level > level);
1068 
1069         for (i = level; i < maxlevel; i++)
1070             _setloc(db, &oldrecord, i, loc->forwardloc[i]);
1071 
1072         r = rewrite_record(db, &oldrecord);
1073         if (r) return r;
1074     }
1075 
1076     /* re-read the "current record" */
1077     r = read_onerecord(db, newoffset, &loc->record);
1078     if (r) return r;
1079 
1080     /* and update the forward locations */
1081     for (i = 0; i < loc->record.level; i++)
1082         loc->forwardloc[i] = _getloc(db, &loc->record, i);
1083 
1084     return 0;
1085 }
1086 
1087 /* overall "store" function - update the value in the current loc.
1088    All new values funnel through here.  Check delete_here for
1089    deletion.   Force is implied here, it gets checked higher. */
store_here(struct dbengine * db,const char * val,size_t vallen)1090 static int store_here(struct dbengine *db, const char *val, size_t vallen)
1091 {
1092     struct skiploc *loc = &db->loc;
1093     struct skiprecord newrecord;
1094     uint8_t level = 0;
1095     uint8_t i;
1096     int r;
1097 
1098     if (loc->is_exactmatch) {
1099         level = loc->record.level;
1100         db->header.num_records--;
1101     }
1102 
1103     /* build a new record */
1104     memset(&newrecord, 0, sizeof(struct skiprecord));
1105     newrecord.type = RECORD;
1106     newrecord.level = randlvl(1, MAXLEVEL);
1107     newrecord.keylen = loc->keybuf.len;
1108     newrecord.vallen = vallen;
1109     for (i = 0; i < newrecord.level; i++)
1110         newrecord.nextloc[i+1] = loc->forwardloc[i];
1111     if (newrecord.level > level)
1112         level = newrecord.level;
1113 
1114     /* append to the file */
1115     r = append_record(db, &newrecord, loc->keybuf.s, val);
1116     if (r) return r;
1117 
1118     /* get the nextlevel to point here for all this record's levels */
1119     for (i = 0; i < newrecord.level; i++)
1120         loc->forwardloc[i] = newrecord.offset;
1121 
1122     /* update all backpointers */
1123     r = stitch(db, level, newrecord.offset);
1124     if (r) return r;
1125 
1126     /* update header to know details of new record */
1127     db->header.num_records++;
1128 
1129     loc->is_exactmatch = 1;
1130     loc->end = db->end;
1131 
1132     return 0;
1133 }
1134 
1135 /* delete a record */
delete_here(struct dbengine * db)1136 static int delete_here(struct dbengine *db)
1137 {
1138     struct skiploc *loc = &db->loc;
1139     struct skiprecord newrecord;
1140     struct skiprecord nextrecord;
1141     int r;
1142 
1143     if (!loc->is_exactmatch)
1144         return CYRUSDB_NOTFOUND;
1145 
1146     db->header.num_records--;
1147 
1148     /* by the magic of zeroing, this even works for zero */
1149     r = read_skipdelete(db, loc->forwardloc[0], &nextrecord);
1150     if (r) return r;
1151 
1152     /* build a delete record */
1153     memset(&newrecord, 0, sizeof(struct skiprecord));
1154     newrecord.type = DELETE;
1155     newrecord.nextloc[0] = nextrecord.offset;
1156 
1157     /* append to the file */
1158     r = append_record(db, &newrecord, NULL, NULL);
1159     if (r) return r;
1160 
1161     /* get the nextlevel to point here */
1162     loc->forwardloc[0] = newrecord.offset;
1163 
1164     /* update all backpointers right up to the old record's
1165      * level, so that they all point past */
1166     r = stitch(db, loc->record.level, loc->backloc[0]);
1167     if (r) return r;
1168 
1169     /* update location */
1170     loc->is_exactmatch = 0;
1171     loc->end = db->end;
1172 
1173     return 0;
1174 }
1175 
1176 /************ DATABASE STRUCT AND TRANSACTION MANAGEMENT **************/
1177 
db_is_clean(struct dbengine * db)1178 static int db_is_clean(struct dbengine *db)
1179 {
1180     if (db->header.current_size != SIZE(db))
1181         return 0;
1182 
1183     if (db->header.flags & DIRTY)
1184         return 0;
1185 
1186     return 1;
1187 }
1188 
unlock(struct dbengine * db)1189 static int unlock(struct dbengine *db)
1190 {
1191     return mappedfile_unlock(db->mf);
1192 }
1193 
write_lock(struct dbengine * db)1194 static int write_lock(struct dbengine *db)
1195 {
1196     int r = mappedfile_writelock(db->mf);
1197     if (r) return r;
1198 
1199     /* reread header */
1200     if (db->is_open) {
1201         r = read_header(db);
1202         if (r) return r;
1203 
1204         /* recovery checks for consistency */
1205         r = recovery(db);
1206         if (r) return r;
1207     }
1208 
1209     return 0;
1210 }
1211 
read_lock(struct dbengine * db)1212 static int read_lock(struct dbengine *db)
1213 {
1214     int r = mappedfile_readlock(db->mf);
1215     if (r) return r;
1216 
1217     /* reread header */
1218     if (db->is_open) {
1219         r = read_header(db);
1220         if (r) return r;
1221 
1222         /* we just take and keep a write lock if inconsistent,
1223          * the write lock will fix it up */
1224         if (!db_is_clean(db)) {
1225             unlock(db);
1226             r = write_lock(db);
1227             if (r) return r;
1228             /* downgrade to a read lock again, since that what
1229              * was requested */
1230             unlock(db);
1231             return read_lock(db);
1232         }
1233     }
1234 
1235     return 0;
1236 }
1237 
newtxn(struct dbengine * db,struct txn ** tidptr)1238 static int newtxn(struct dbengine *db, struct txn **tidptr)
1239 {
1240     int r;
1241 
1242     assert(!db->current_txn);
1243     assert(!*tidptr);
1244 
1245     /* grab a r/w lock */
1246     r = write_lock(db);
1247     if (r) return r;
1248 
1249     /* create the transaction */
1250     db->txn_num++;
1251     db->current_txn = xmalloc(sizeof(struct txn));
1252     db->current_txn->num = db->txn_num;
1253 
1254     /* pass it back out */
1255     *tidptr = db->current_txn;
1256 
1257     return 0;
1258 }
1259 
dispose_db(struct dbengine * db)1260 static void dispose_db(struct dbengine *db)
1261 {
1262     if (!db) return;
1263 
1264     if (db->mf) {
1265         if (mappedfile_islocked(db->mf))
1266             unlock(db);
1267         mappedfile_close(&db->mf);
1268     }
1269 
1270     buf_free(&db->loc.keybuf);
1271 
1272     free(db);
1273 }
1274 
1275 /************************************************************/
1276 
opendb(const char * fname,int flags,struct dbengine ** ret,struct txn ** mytid)1277 static int opendb(const char *fname, int flags, struct dbengine **ret, struct txn **mytid)
1278 {
1279     struct dbengine *db;
1280     int r;
1281     int mappedfile_flags = MAPPEDFILE_RW;
1282 
1283     assert(fname);
1284     assert(ret);
1285 
1286     db = (struct dbengine *) xzmalloc(sizeof(struct dbengine));
1287 
1288     if (flags & CYRUSDB_CREATE)
1289         mappedfile_flags |= MAPPEDFILE_CREATE;
1290 
1291     db->open_flags = flags & ~CYRUSDB_CREATE;
1292     db->compar = (flags & CYRUSDB_MBOXSORT) ? bsearch_ncompare_mbox
1293                                             : bsearch_ncompare_raw;
1294 
1295     r = mappedfile_open(&db->mf, fname, mappedfile_flags);
1296     if (r) {
1297         /* convert to CYRUSDB errors*/
1298         if (r == -ENOENT) r = CYRUSDB_NOTFOUND;
1299         else r = CYRUSDB_IOERROR;
1300         goto done;
1301     }
1302 
1303     db->is_open = 0;
1304 
1305     /* grab a read lock, only reading the header */
1306     r = read_lock(db);
1307     if (r) goto done;
1308 
1309     /* if there's any issue which requires fixing, get a write lock */
1310     if (0) {
1311     retry_write:
1312         unlock(db);
1313         db->is_open = 0;
1314         r = write_lock(db);
1315         if (r) goto done;
1316     }
1317 
1318     /* if the map size is zero, it's a new file - we need to create an
1319      * initial header */
1320     if (mappedfile_size(db->mf) == 0) {
1321         struct skiprecord dummy;
1322 
1323         if (!mappedfile_iswritelocked(db->mf))
1324             goto retry_write;
1325 
1326         /* create the dummy! */
1327         memset(&dummy, 0, sizeof(struct skiprecord));
1328         dummy.type = DUMMY;
1329         dummy.level = MAXLEVEL;
1330 
1331         /* append dummy after header location */
1332         db->end = DUMMY_OFFSET;
1333         r = write_record(db, &dummy, NULL, NULL);
1334         if (r) {
1335             syslog(LOG_ERR, "DBERROR: writing dummy node for %s: %m",
1336                    fname);
1337             goto done;
1338         }
1339 
1340         /* create the header */
1341         db->header.version = VERSION;
1342         db->header.generation = 1;
1343         db->header.repack_size = db->end;
1344         db->header.current_size = db->end;
1345         r = commit_header(db);
1346         if (r) {
1347             syslog(LOG_ERR, "DBERROR: writing header for %s: %m",
1348                    fname);
1349             goto done;
1350         }
1351     }
1352 
1353     db->is_open = 1;
1354 
1355     r = read_header(db);
1356     if (r) goto done;
1357 
1358     if (!db_is_clean(db)) {
1359         if (!mappedfile_iswritelocked(db->mf))
1360             goto retry_write;
1361 
1362         /* recovery will clean the flag once it's committed the fixes */
1363         r = recovery(db);
1364         if (r) goto done;
1365     }
1366 
1367     /* unlock the DB */
1368     unlock(db);
1369 
1370     *ret = db;
1371 
1372     if (mytid) {
1373         r = newtxn(db, mytid);
1374         if (r) goto done;
1375     }
1376 
1377 done:
1378     if (r) dispose_db(db);
1379     return r;
1380 }
1381 
myopen(const char * fname,int flags,struct dbengine ** ret,struct txn ** mytid)1382 static int myopen(const char *fname, int flags, struct dbengine **ret, struct txn **mytid)
1383 {
1384     struct db_list *ent;
1385     struct dbengine *mydb;
1386     int r = 0;
1387 
1388     /* do we already have this DB open? */
1389     for (ent = open_twoskip; ent; ent = ent->next) {
1390         if (strcmp(FNAME(ent->db), fname)) continue;
1391         if (ent->db->current_txn)
1392             return CYRUSDB_LOCKED;
1393         if (mytid) {
1394             r = newtxn(ent->db, mytid);
1395             if (r) return r;
1396         }
1397         ent->refcount++;
1398         *ret = ent->db;
1399         return 0;
1400     }
1401 
1402     r = opendb(fname, flags, &mydb, mytid);
1403     if (r) return r;
1404 
1405     /* track this database in the open list */
1406     ent = (struct db_list *) xzmalloc(sizeof(struct db_list));
1407     ent->db = mydb;
1408     ent->refcount = 1;
1409     ent->next = open_twoskip;
1410     open_twoskip = ent;
1411 
1412     /* return the open DB */
1413     *ret = mydb;
1414 
1415     return 0;
1416 }
1417 
myclose(struct dbengine * db)1418 static int myclose(struct dbengine *db)
1419 {
1420     struct db_list *ent = open_twoskip;
1421     struct db_list *prev = NULL;
1422 
1423     assert(db);
1424 
1425     /* remove this DB from the open list */
1426     while (ent && ent->db != db) {
1427         prev = ent;
1428         ent = ent->next;
1429     }
1430     assert(ent);
1431 
1432     if (--ent->refcount <= 0) {
1433         if (prev) prev->next = ent->next;
1434         else open_twoskip = ent->next;
1435         free(ent);
1436         if (mappedfile_islocked(db->mf))
1437             syslog(LOG_ERR, "twoskip: %s closed while still locked", FNAME(db));
1438         dispose_db(db);
1439     }
1440 
1441     return 0;
1442 }
1443 
1444 /*************** EXTERNAL APIS ***********************/
1445 
myfetch(struct dbengine * db,const char * key,size_t keylen,const char ** foundkey,size_t * foundkeylen,const char ** data,size_t * datalen,struct txn ** tidptr,int fetchnext)1446 static int myfetch(struct dbengine *db,
1447             const char *key, size_t keylen,
1448             const char **foundkey, size_t *foundkeylen,
1449             const char **data, size_t *datalen,
1450             struct txn **tidptr, int fetchnext)
1451 {
1452     int r = 0;
1453 
1454     assert(db);
1455     if (datalen) assert(data);
1456 
1457     if (data) *data = NULL;
1458     if (datalen) *datalen = 0;
1459 
1460     /* Hacky workaround:
1461      *
1462      * If no transaction was passed, but we're in a transaction,
1463      * then just do the read within that transaction.
1464      */
1465     if (!tidptr && db->current_txn)
1466         tidptr = &db->current_txn;
1467 
1468     if (tidptr) {
1469         if (!*tidptr) {
1470             r = newtxn(db, tidptr);
1471             if (r) return r;
1472         }
1473     } else {
1474         /* grab a r lock */
1475         r = read_lock(db);
1476         if (r) return r;
1477     }
1478 
1479     r = find_loc(db, key, keylen);
1480     if (r) goto done;
1481 
1482     if (fetchnext) {
1483         r = advance_loc(db);
1484         if (r) goto done;
1485     }
1486 
1487     if (foundkey) *foundkey = db->loc.keybuf.s;
1488     if (foundkeylen) *foundkeylen = db->loc.keybuf.len;
1489 
1490     if (!r && db->loc.is_exactmatch) {
1491         if (data) *data = VAL(db, &db->loc.record);
1492         if (datalen) *datalen = db->loc.record.vallen;
1493     }
1494     else {
1495         /* we didn't get an exact match */
1496         r = CYRUSDB_NOTFOUND;
1497     }
1498 
1499 done:
1500     if (!tidptr) {
1501         /* release read lock */
1502         int r1;
1503         if ((r1 = unlock(db)) < 0) {
1504             return r1;
1505         }
1506     }
1507 
1508     return r;
1509 }
1510 
1511 /* foreach allows for subsidary mailbox operations in 'cb'.
1512    if there is a txn, 'cb' must make use of it.
1513 */
myforeach(struct dbengine * db,const char * prefix,size_t prefixlen,foreach_p * goodp,foreach_cb * cb,void * rock,struct txn ** tidptr)1514 static int myforeach(struct dbengine *db,
1515                      const char *prefix, size_t prefixlen,
1516                      foreach_p *goodp,
1517                      foreach_cb *cb, void *rock,
1518                      struct txn **tidptr)
1519 {
1520     int r = 0, cb_r = 0;
1521     int num_misses = 0;
1522     int need_unlock = 0;
1523     const char *val;
1524     size_t vallen;
1525     struct buf keybuf = BUF_INITIALIZER;
1526 
1527     assert(db);
1528     assert(cb);
1529     if (prefixlen) assert(prefix);
1530 
1531     /* Hacky workaround:
1532      *
1533      * If no transaction was passed, but we're in a transaction,
1534      * then just do the read within that transaction.
1535      */
1536     if (!tidptr && db->current_txn)
1537         tidptr = &db->current_txn;
1538     if (tidptr) {
1539         if (!*tidptr) {
1540             r = newtxn(db, tidptr);
1541             if (r) return r;
1542         }
1543     } else {
1544         /* grab a r lock */
1545         r = read_lock(db);
1546         if (r) return r;
1547         need_unlock = 1;
1548     }
1549 
1550     r = find_loc(db, prefix, prefixlen);
1551     if (r) goto done;
1552 
1553     if (!db->loc.is_exactmatch) {
1554         /* advance to the first match */
1555         r = advance_loc(db);
1556         if (r) goto done;
1557     }
1558 
1559     while (db->loc.is_exactmatch) {
1560         /* does it match prefix? */
1561         if (prefixlen) {
1562             if (db->loc.record.keylen < prefixlen) break;
1563             if (db->compar(KEY(db, &db->loc.record), prefixlen, prefix, prefixlen)) break;
1564         }
1565 
1566         val = VAL(db, &db->loc.record);
1567         vallen = db->loc.record.vallen;
1568 
1569         if (!goodp || goodp(rock, db->loc.keybuf.s, db->loc.keybuf.len,
1570                                   val, vallen)) {
1571             /* take a copy of they key - just in case cb does actions on this database
1572              * and clobbers loc */
1573             buf_copy(&keybuf, &db->loc.keybuf);
1574 
1575             if (!tidptr) {
1576                 /* release read lock */
1577                 r = unlock(db);
1578                 if (r) goto done;
1579                 need_unlock = 0;
1580             }
1581 
1582             /* make callback */
1583             cb_r = cb(rock, db->loc.keybuf.s, db->loc.keybuf.len,
1584                             val, vallen);
1585             if (cb_r) break;
1586 
1587             if (!tidptr) {
1588                 /* grab a r lock */
1589                 r = read_lock(db);
1590                 if (r) goto done;
1591                 need_unlock = 1;
1592 
1593                 num_misses = 0;
1594             }
1595 
1596             /* should be cheap if we're already here */
1597             r = find_loc(db, keybuf.s, keybuf.len);
1598             if (r) goto done;
1599         }
1600         else if (!tidptr) {
1601             num_misses++;
1602             if (num_misses > FOREACH_LOCK_RELEASE) {
1603                 /* take a copy of they key - just in case cb does actions on this database
1604                  * and clobbers loc */
1605                 buf_copy(&keybuf, &db->loc.keybuf);
1606 
1607                 /* release read lock */
1608                 r = unlock(db);
1609                 if (r) goto done;
1610                 need_unlock = 0;
1611 
1612                 /* grab a r lock */
1613                 r = read_lock(db);
1614                 if (r) goto done;
1615                 need_unlock = 1;
1616 
1617                 /* should be cheap if we're already here */
1618                 r = find_loc(db, keybuf.s, keybuf.len);
1619                 if (r) goto done;
1620 
1621                 num_misses = 0;
1622             }
1623         }
1624 
1625         /* move to the next one */
1626         r = advance_loc(db);
1627         if (r) goto done;
1628     }
1629 
1630  done:
1631 
1632     buf_free(&keybuf);
1633 
1634     if (need_unlock) {
1635         /* release read lock */
1636         int r1 = unlock(db);
1637         if (r1) return r1;
1638     }
1639 
1640     return r ? r : cb_r;
1641 }
1642 
1643 /* helper function for all writes - wraps create and delete and the FORCE
1644  * logic for each */
skipwrite(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,int force)1645 static int skipwrite(struct dbengine *db,
1646                      const char *key, size_t keylen,
1647                      const char *data, size_t datalen,
1648                      int force)
1649 {
1650     int r = find_loc(db, key, keylen);
1651     if (r) return r;
1652 
1653     /* could be a delete or a replace */
1654     if (db->loc.is_exactmatch) {
1655         if (!data) return delete_here(db);
1656         if (!force) return CYRUSDB_EXISTS;
1657         /* unchanged?  Save the IO */
1658         if (!db->compar(data, datalen,
1659                         VAL(db, &db->loc.record),
1660                         db->loc.record.vallen))
1661             return 0;
1662         return store_here(db, data, datalen);
1663     }
1664 
1665     /* only create if it's not a delete, obviously */
1666     if (data) return store_here(db, data, datalen);
1667 
1668     /* must be a delete - are we forcing? */
1669     if (!force) return CYRUSDB_NOTFOUND;
1670 
1671     return 0;
1672 }
1673 
mycommit(struct dbengine * db,struct txn * tid)1674 static int mycommit(struct dbengine *db, struct txn *tid)
1675 {
1676     struct skiprecord newrecord;
1677     int r = 0;
1678 
1679     assert(db);
1680     assert(tid == db->current_txn);
1681 
1682     /* no need to abort if we're not dirty */
1683     if (!(db->header.flags & DIRTY))
1684         goto done;
1685 
1686     /* build a commit record */
1687     memset(&newrecord, 0, sizeof(struct skiprecord));
1688     newrecord.type = COMMIT;
1689     newrecord.nextloc[0] = db->header.current_size;
1690 
1691     /* append to the file */
1692     r = append_record(db, &newrecord, NULL, NULL);
1693     if (r) goto done;
1694 
1695     /* commit ALL outstanding changes first, before
1696      * rewriting the header */
1697     r = mappedfile_commit(db->mf);
1698     if (r) goto done;
1699 
1700     /* finally, update the header and commit again */
1701     db->header.current_size = db->end;
1702     db->header.flags &= ~DIRTY;
1703     r = commit_header(db);
1704 
1705  done:
1706     if (r) {
1707         int r2;
1708 
1709         /* error during commit; we must abort */
1710         r2 = myabort(db, tid);
1711         if (r2) {
1712             syslog(LOG_ERR, "DBERROR: twoskip %s: commit AND abort failed",
1713                    FNAME(db));
1714         }
1715     }
1716     else {
1717         if (!(db->open_flags & CYRUSDB_NOCOMPACT)
1718             && db->header.current_size > MINREWRITE
1719             && db->header.current_size > 2 * db->header.repack_size) {
1720             int r2 = mycheckpoint(db);
1721             if (r2) {
1722                 syslog(LOG_NOTICE, "twoskip: failed to checkpoint %s: %m",
1723                        FNAME(db));
1724             }
1725         }
1726         else {
1727             unlock(db);
1728         }
1729 
1730         free(tid);
1731         db->current_txn = NULL;
1732     }
1733 
1734     return r;
1735 }
1736 
myabort(struct dbengine * db,struct txn * tid)1737 static int myabort(struct dbengine *db, struct txn *tid)
1738 {
1739     int r;
1740 
1741     assert(db);
1742     assert(tid == db->current_txn);
1743 
1744     /* free the tid */
1745     free(tid);
1746     db->current_txn = NULL;
1747     db->end = db->header.current_size;
1748 
1749     /* recovery will clean up */
1750     r = recovery1(db, NULL);
1751 
1752     buf_free(&db->loc.keybuf);
1753     memset(&db->loc, 0, sizeof(struct skiploc));
1754 
1755     unlock(db);
1756 
1757     return r;
1758 }
1759 
mystore(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tidptr,int force)1760 static int mystore(struct dbengine *db,
1761             const char *key, size_t keylen,
1762             const char *data, size_t datalen,
1763             struct txn **tidptr, int force)
1764 {
1765     struct txn *localtid = NULL;
1766     int r = 0;
1767     int r2 = 0;
1768 
1769     assert(db);
1770     assert(key && keylen);
1771 
1772     /* not keeping the transaction, just create one local to
1773      * this function */
1774     if (!tidptr) tidptr = &localtid;
1775 
1776     /* make sure we're write locked and up to date */
1777     if (!*tidptr) {
1778         r = newtxn(db, tidptr);
1779         if (r) return r;
1780     }
1781 
1782     r = skipwrite(db, key, keylen, data, datalen, force);
1783 
1784     if (r) {
1785         r2 = myabort(db, *tidptr);
1786         *tidptr = NULL;
1787     }
1788     else if (localtid) {
1789         /* commit the store, which releases the write lock */
1790         r = mycommit(db, localtid);
1791     }
1792 
1793     return r2 ? r2 : r;
1794 }
1795 
1796 /* compress 'db', closing at the end.  Uses foreach to copy into a new
1797  * database, then rewrites over the old one */
1798 
1799 struct copy_rock {
1800     struct dbengine *db;
1801     struct txn *tid;
1802 };
1803 
copy_cb(void * rock,const char * key,size_t keylen,const char * val,size_t vallen)1804 static int copy_cb(void *rock,
1805                    const char *key, size_t keylen,
1806                    const char *val, size_t vallen)
1807 {
1808     struct copy_rock *cr = (struct copy_rock *)rock;
1809 
1810     return mystore(cr->db, key, keylen, val, vallen, &cr->tid, 0);
1811 }
1812 
mycheckpoint(struct dbengine * db)1813 static int mycheckpoint(struct dbengine *db)
1814 {
1815     size_t old_size = db->header.current_size;
1816     char newfname[1024];
1817     clock_t start = sclock();
1818     struct copy_rock cr;
1819     int r = 0;
1820 
1821     r = myconsistent(db, db->current_txn);
1822     if (r) {
1823         syslog(LOG_ERR, "db %s, inconsistent pre-checkpoint, bailing out",
1824                FNAME(db));
1825         unlock(db);
1826         return r;
1827     }
1828 
1829     /* open fname.NEW */
1830     snprintf(newfname, sizeof(newfname), "%s.NEW", FNAME(db));
1831     unlink(newfname);
1832 
1833     cr.db = NULL;
1834     cr.tid = NULL;
1835     r = opendb(newfname, db->open_flags | CYRUSDB_CREATE, &cr.db, &cr.tid);
1836     if (r) return r;
1837 
1838     r = myforeach(db, NULL, 0, NULL, copy_cb, &cr, &db->current_txn);
1839     if (r) goto err;
1840 
1841     r = myconsistent(cr.db, cr.tid);
1842     if (r) {
1843         syslog(LOG_ERR, "db %s, inconsistent post-checkpoint, bailing out",
1844                FNAME(db));
1845         goto err;
1846     }
1847 
1848     /* remember the repack size */
1849     cr.db->header.repack_size = cr.db->end;
1850 
1851     /* increase the generation count */
1852     cr.db->header.generation = db->header.generation + 1;
1853 
1854     r = mycommit(cr.db, cr.tid);
1855     if (r) goto err;
1856 
1857     /* move new file to original file name */
1858     r = mappedfile_rename(cr.db->mf, FNAME(db));
1859     if (r) goto err;
1860 
1861     /* OK, we're commmitted now - clean up */
1862     unlock(db);
1863 
1864     /* gotta clean it all up */
1865     mappedfile_close(&db->mf);
1866     buf_free(&db->loc.keybuf);
1867 
1868     *db = *cr.db;
1869     free(cr.db); /* leaked? */
1870 
1871     {
1872         syslog(LOG_INFO,
1873                "twoskip: checkpointed %s (%llu record%s, %llu => %llu bytes) in %2.3f seconds",
1874                FNAME(db), (LLU)db->header.num_records,
1875                db->header.num_records == 1 ? "" : "s", (LLU)old_size,
1876                (LLU)(db->header.current_size),
1877                (sclock() - start) / (double) CLOCKS_PER_SEC);
1878     }
1879 
1880     return 0;
1881 
1882  err:
1883     if (cr.tid) myabort(cr.db, cr.tid);
1884     unlink(FNAME(cr.db));
1885     dispose_db(cr.db);
1886     unlock(db);
1887     return CYRUSDB_IOERROR;
1888 }
1889 
1890 
1891 /* dump the database.
1892    if detail == 1, dump all records.
1893    if detail == 2, also dump pointers for active records.
1894    if detail == 3, dump all records/all pointers.
1895 */
dump(struct dbengine * db,int detail)1896 static int dump(struct dbengine *db, int detail __attribute__((unused)))
1897 {
1898     struct skiprecord record;
1899     struct buf scratch = BUF_INITIALIZER;
1900     size_t offset = DUMMY_OFFSET;
1901     int r = 0;
1902     int i;
1903 
1904     printf("HEADER: v=%lu fl=%lu num=%llu sz=(%08llX/%08llX)\n",
1905           (LU)db->header.version,
1906           (LU)db->header.flags,
1907           (LLU)db->header.num_records,
1908           (LLU)db->header.current_size,
1909           (LLU)db->header.repack_size);
1910 
1911     while (offset < db->header.current_size) {
1912         printf("%08llX ", (LLU)offset);
1913 
1914         r = read_onerecord(db, offset, &record);
1915 
1916         if (r) {
1917             printf("ERROR\n");
1918             break;
1919         }
1920 
1921         switch (record.type) {
1922         case DELETE:
1923             printf("DELETE ptr=%08llX\n", (LLU)record.nextloc[0]);
1924             break;
1925 
1926         case COMMIT:
1927             printf("COMMIT start=%08llX\n", (LLU)record.nextloc[0]);
1928             break;
1929 
1930         case RECORD:
1931         case DUMMY:
1932             buf_setmap(&scratch, KEY(db, &record), record.keylen);
1933             buf_replace_char(&scratch, '\0', '-');
1934             printf("%s kl=%llu dl=%llu lvl=%d (%s)\n",
1935                    (record.type == RECORD ? "RECORD" : "DUMMY"),
1936                    (LLU)record.keylen, (LLU)record.vallen,
1937                    record.level, buf_cstring(&scratch));
1938             printf("\t");
1939             for (i = 0; i <= record.level; i++) {
1940                 printf("%08llX ", (LLU)record.nextloc[i]);
1941                 if (!(i % 8))
1942                     printf("\n\t");
1943             }
1944             printf("\n");
1945             break;
1946         }
1947 
1948         offset += record.len;
1949     }
1950 
1951     buf_free(&scratch);
1952 
1953     return r;
1954 }
1955 
consistent(struct dbengine * db)1956 static int consistent(struct dbengine *db)
1957 {
1958     int r;
1959 
1960     r = read_lock(db);
1961     if (r) return r;
1962 
1963     r = myconsistent(db, NULL);
1964 
1965     unlock(db);
1966 
1967     return r;
1968 }
1969 
1970 /* perform some basic consistency checks */
myconsistent(struct dbengine * db,struct txn * tid)1971 static int myconsistent(struct dbengine *db, struct txn *tid)
1972 {
1973     struct skiprecord prevrecord;
1974     struct skiprecord record;
1975     size_t fwd[MAXLEVEL];
1976     size_t num_records = 0;
1977     int r = 0;
1978     int cmp;
1979     int i;
1980 
1981     assert(db->current_txn == tid); /* could both be null */
1982 
1983     /* read in the dummy */
1984     r = read_onerecord(db, DUMMY_OFFSET, &prevrecord);
1985     if (r) return r;
1986 
1987     /* set up the location pointers */
1988     for (i = 0; i < MAXLEVEL; i++)
1989         fwd[i] = _getloc(db, &prevrecord, i);
1990 
1991     while (fwd[0]) {
1992         r = read_onerecord(db, fwd[0], &record);
1993         if (r) return r;
1994 
1995         if (record.type == DELETE) {
1996             fwd[0] = record.nextloc[0];
1997             continue;
1998         }
1999 
2000         cmp = db->compar(KEY(db, &record), record.keylen,
2001                          KEY(db, &prevrecord), prevrecord.keylen);
2002         if (cmp <= 0) {
2003             syslog(LOG_ERR, "DBERROR: twoskip out of order %s: %.*s (%08llX) <= %.*s (%08llX)",
2004                    FNAME(db), (int)record.keylen, KEY(db, &record),
2005                    (LLU)record.offset,
2006                    (int)prevrecord.keylen, KEY(db, &prevrecord),
2007                    (LLU)prevrecord.offset);
2008             return CYRUSDB_INTERNAL;
2009         }
2010 
2011         for (i = 0; i < record.level; i++) {
2012             /* check the old pointer was to here */
2013             if (fwd[i] != record.offset) {
2014                 syslog(LOG_ERR, "DBERROR: twoskip broken linkage %s: %08llX at %d, expected %08llX",
2015                        FNAME(db), (LLU)record.offset, i, (LLU)fwd[i]);
2016                 return CYRUSDB_INTERNAL;
2017             }
2018             /* and advance to the new pointer */
2019             fwd[i] = _getloc(db, &record, i);
2020         }
2021 
2022         /* keep a copy for comparison purposes */
2023         num_records++;
2024         prevrecord = record;
2025     }
2026 
2027     for (i = 0; i < MAXLEVEL; i++) {
2028         if (fwd[i]) {
2029             syslog(LOG_ERR, "DBERROR: twoskip broken tail %s: %08llX at %d",
2030                    FNAME(db), (LLU)fwd[i], i);
2031             return CYRUSDB_INTERNAL;
2032         }
2033     }
2034 
2035     /* we walked the whole file and saw every pointer */
2036 
2037     if (num_records != db->header.num_records) {
2038         syslog(LOG_ERR, "DBERROR: twoskip record count mismatch %s: %llu should be %llu",
2039                FNAME(db), (LLU)num_records, (LLU)db->header.num_records);
2040         return CYRUSDB_INTERNAL;
2041     }
2042 
2043     return 0;
2044 }
2045 
_copy_commit(struct dbengine * db,struct dbengine * newdb,struct skiprecord * commit)2046 static int _copy_commit(struct dbengine *db, struct dbengine *newdb,
2047                         struct skiprecord *commit)
2048 {
2049     struct txn *tid = NULL;
2050     struct skiprecord record;
2051     const char *val;
2052     size_t offset;
2053     int r = 0;
2054 
2055     for (offset = commit->nextloc[0]; offset < commit->offset; offset += record.len) {
2056         r = read_onerecord(db, offset, &record);
2057         if (r) goto err;
2058         switch (record.type) {
2059         case DELETE:
2060             val = NULL;
2061             break;
2062         case RECORD:
2063             val = VAL(db, &record);
2064             break;
2065         default:
2066             r = CYRUSDB_IOERROR;
2067             goto err;
2068         }
2069 
2070         /* store into the new DB */
2071         r = mystore(newdb, KEY(db, &record), record.keylen, val, record.vallen, &tid, 1);
2072         if (r) goto err;
2073     }
2074 
2075     if (tid) r = mycommit(newdb, tid);
2076     if (r) return r;
2077 
2078     return 0;
2079 
2080 err:
2081     if (tid) myabort(newdb, tid);
2082     return r;
2083 }
2084 
2085 /* recovery2 - the file is really screwed.  Basically, we
2086  * failed to run recovery.  Try reading out records from
2087  * the top and applying commits to a new file instead */
recovery2(struct dbengine * db,int * count)2088 static int recovery2(struct dbengine *db, int *count)
2089 {
2090     uint64_t oldcount = db->header.num_records;
2091     struct skiprecord record;
2092     struct dbengine *newdb = NULL;
2093     char newfname[1024];
2094     size_t offset;
2095     int r = 0;
2096 
2097     /* open fname.NEW */
2098     snprintf(newfname, sizeof(newfname), "%s.NEW", FNAME(db));
2099     unlink(newfname);
2100 
2101     r = opendb(newfname, db->open_flags | CYRUSDB_CREATE, &newdb, NULL);
2102     if (r) return r;
2103 
2104     /* increase the generation count */
2105     newdb->header.generation = db->header.generation + 1;
2106 
2107     /* start with the dummy */
2108     for (offset = DUMMY_OFFSET; offset < SIZE(db); offset += record.len) {
2109         r = read_onerecord(db, offset, &record);
2110         if (r) {
2111             syslog(LOG_ERR, "DBERROR: %s failed to read at %08llX in recovery2, truncating",
2112                    FNAME(db), (LLU)offset);
2113             break;
2114         }
2115         if (record.type == COMMIT) {
2116             r = _copy_commit(db, newdb, &record);
2117             if (r) {
2118                 syslog(LOG_ERR, "DBERROR: %s failed to apply commit at %08llX in recovery2, truncating",
2119                       FNAME(db), (LLU)offset);
2120                 break;
2121             }
2122         }
2123     }
2124 
2125     if (!newdb->header.num_records) {
2126         /* no records found - almost certainly bogus, and even if not,
2127          * there's no point recovering a zero record file */
2128         syslog(LOG_ERR, "DBERROR: %s no records found in recovery2, aborting",
2129                FNAME(db));
2130         r = CYRUSDB_NOTFOUND;
2131         goto err;
2132     }
2133 
2134     /* regardless, we had a commit during create, and in any _copy_commit, so
2135      * rename into place */
2136 
2137     /* move new file to original file name */
2138     r = mappedfile_rename(newdb->mf, FNAME(db));
2139     if (r) goto err;
2140 
2141     /* OK, we're commmitted now - clean up */
2142     unlock(db);
2143 
2144     /* gotta clean it all up */
2145     mappedfile_close(&db->mf);
2146     buf_free(&db->loc.keybuf);
2147 
2148     *db = *newdb;
2149     free(newdb); /* leaked? */
2150 
2151     syslog(LOG_NOTICE, "twoskip: recovery2 %s - rescued %llu of %llu records",
2152            FNAME(db), (LLU)db->header.num_records, (LLU)oldcount);
2153 
2154     if (count) *count = db->header.num_records;
2155 
2156     return 0;
2157 
2158  err:
2159     unlink(FNAME(newdb));
2160     myclose(newdb);
2161     return r;
2162 }
2163 
2164 /* run recovery on this file.
2165  * always called with a write lock. */
recovery1(struct dbengine * db,int * count)2166 static int recovery1(struct dbengine *db, int *count)
2167 {
2168     size_t prev[MAXLEVEL+1];
2169     size_t next[MAXLEVEL+1];
2170     struct skiprecord record;
2171     struct skiprecord prevrecord;
2172     struct skiprecord fixrecord;
2173     size_t nextoffset = 0;
2174     uint64_t num_records = 0;
2175     int changed = 0;
2176     int r = 0;
2177     int cmp;
2178     int i;
2179 
2180     assert(mappedfile_iswritelocked(db->mf));
2181 
2182     /* no need to run recovery if we're consistent */
2183     if (db_is_clean(db))
2184         return 0;
2185 
2186     /* we can't recovery a file that's not created yet */
2187     assert(db->header.current_size > HEADER_SIZE);
2188 
2189     /* dirty the header if not already dirty */
2190     if (!(db->header.flags & DIRTY)) {
2191         db->header.flags |= DIRTY;
2192         r = commit_header(db);
2193         if (r) return r;
2194     }
2195 
2196     /* start with the dummy */
2197     r = read_onerecord(db, DUMMY_OFFSET, &prevrecord);
2198     if (r) return r;
2199 
2200     /* and pointers forwards */
2201     for (i = 2; i <= MAXLEVEL; i++) {
2202         prev[i] = prevrecord.offset;
2203         next[i] = prevrecord.nextloc[i];
2204     }
2205 
2206     /* check for broken level - pointers */
2207     for (i = 0; i < 2; i++) {
2208         if (prevrecord.nextloc[i] >= db->end) {
2209             prevrecord.nextloc[i] = 0;
2210             r = rewrite_record(db, &prevrecord);
2211             changed++;
2212         }
2213     }
2214 
2215     nextoffset = _getloc(db, &prevrecord, 0);
2216 
2217     while (nextoffset) {
2218         r = read_onerecord(db, nextoffset, &record);
2219         if (r) return r;
2220 
2221         /* just skip over delele records */
2222         if (record.type == DELETE) {
2223             nextoffset = record.nextloc[0];
2224             continue;
2225         }
2226 
2227         cmp = db->compar(KEY(db, &record), record.keylen,
2228                          KEY(db, &prevrecord), prevrecord.keylen);
2229         if (cmp <= 0) {
2230             syslog(LOG_ERR, "DBERROR: twoskip out of order %s: %.*s (%08llX) <= %.*s (%08llX)",
2231                    FNAME(db), (int)record.keylen, KEY(db, &record),
2232                    (LLU)record.offset,
2233                    (int)prevrecord.keylen, KEY(db, &prevrecord),
2234                    (LLU)prevrecord.offset);
2235             return CYRUSDB_INTERNAL;
2236         }
2237 
2238         /* check for old offsets needing fixing */
2239         for (i = 2; i <= record.level; i++) {
2240             if (next[i] != record.offset) {
2241                 /* need to fix up the previous record to point here */
2242                 r = read_onerecord(db, prev[i], &fixrecord);
2243                 if (r) return r;
2244 
2245                 /* XXX - optimise adjacent same records */
2246                 fixrecord.nextloc[i] = record.offset;
2247                 r = rewrite_record(db, &fixrecord);
2248                 if (r) return r;
2249                 changed++;
2250             }
2251             prev[i] = record.offset;
2252             next[i] = record.nextloc[i];
2253         }
2254 
2255         /* check for broken level - pointers */
2256         for (i = 0; i < 2; i++) {
2257             if (record.nextloc[i] >= db->end) {
2258                 record.nextloc[i] = 0;
2259                 r = rewrite_record(db, &record);
2260                 if (r) return r;
2261                 changed++;
2262             }
2263         }
2264 
2265         num_records++;
2266 
2267         /* find the next record */
2268         nextoffset = _getloc(db, &record, 0);
2269 
2270         prevrecord = record;
2271     }
2272 
2273     /* check for remaining offsets needing fixing */
2274     for (i = 2; i <= MAXLEVEL; i++) {
2275         if (next[i]) {
2276             /* need to fix up the previous record to point to the end */
2277             r = read_onerecord(db, prev[i], &fixrecord);
2278             if (r) return r;
2279 
2280             /* XXX - optimise, same as above */
2281             fixrecord.nextloc[i] = 0;
2282             r = rewrite_record(db, &fixrecord);
2283             if (r) return r;
2284             changed++;
2285         }
2286     }
2287 
2288     r = mappedfile_truncate(db->mf, db->header.current_size);
2289     if (r) return r;
2290 
2291     r = mappedfile_commit(db->mf);
2292     if (r) return r;
2293 
2294     /* clear the dirty flag */
2295     db->header.flags &= ~DIRTY;
2296     db->header.num_records = num_records;
2297     r = commit_header(db);
2298     if (r) return r;
2299 
2300     if (count) *count = changed;
2301 
2302     return 0;
2303 }
2304 
recovery(struct dbengine * db)2305 static int recovery(struct dbengine *db)
2306 {
2307     clock_t start = sclock();
2308     int count = 0;
2309     int r;
2310 
2311     /* no need to run recovery if we're consistent */
2312     if (db_is_clean(db))
2313         return 0;
2314 
2315     r = recovery1(db, &count);
2316     if (r) {
2317         syslog(LOG_ERR, "DBERROR: recovery1 failed %s, trying recovery2", FNAME(db));
2318         count = 0;
2319         r = recovery2(db, &count);
2320         if (r) return r;
2321     }
2322 
2323     {
2324         syslog(LOG_INFO,
2325                "twoskip: recovered %s (%llu record%s, %llu bytes) in %2.3f seconds - fixed %d offset%s",
2326                FNAME(db), (LLU)db->header.num_records,
2327                db->header.num_records == 1 ? "" : "s",
2328                (LLU)(db->header.current_size),
2329                (sclock() - start) / (double) CLOCKS_PER_SEC,
2330                count, count == 1 ? "" : "s");
2331     }
2332 
2333     return 0;
2334 }
2335 
fetch(struct dbengine * mydb,const char * key,size_t keylen,const char ** data,size_t * datalen,struct txn ** tidptr)2336 static int fetch(struct dbengine *mydb,
2337                  const char *key, size_t keylen,
2338                  const char **data, size_t *datalen,
2339                  struct txn **tidptr)
2340 {
2341     assert(key);
2342     assert(keylen);
2343     return myfetch(mydb, key, keylen, NULL, NULL,
2344                    data, datalen, tidptr, 0);
2345 }
2346 
fetchnext(struct dbengine * mydb,const char * key,size_t keylen,const char ** foundkey,size_t * fklen,const char ** data,size_t * datalen,struct txn ** tidptr)2347 static int fetchnext(struct dbengine *mydb,
2348                  const char *key, size_t keylen,
2349                  const char **foundkey, size_t *fklen,
2350                  const char **data, size_t *datalen,
2351                  struct txn **tidptr)
2352 {
2353     return myfetch(mydb, key, keylen, foundkey, fklen,
2354                    data, datalen, tidptr, 1);
2355 }
2356 
create(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tid)2357 static int create(struct dbengine *db,
2358                   const char *key, size_t keylen,
2359                   const char *data, size_t datalen,
2360                   struct txn **tid)
2361 {
2362     if (datalen) assert(data);
2363     return mystore(db, key, keylen, data ? data : "", datalen, tid, 0);
2364 }
2365 
store(struct dbengine * db,const char * key,size_t keylen,const char * data,size_t datalen,struct txn ** tid)2366 static int store(struct dbengine *db,
2367                  const char *key, size_t keylen,
2368                  const char *data, size_t datalen,
2369                  struct txn **tid)
2370 {
2371     if (datalen) assert(data);
2372     return mystore(db, key, keylen, data ? data : "", datalen, tid, 1);
2373 }
2374 
delete(struct dbengine * db,const char * key,size_t keylen,struct txn ** tid,int force)2375 static int delete(struct dbengine *db,
2376                  const char *key, size_t keylen,
2377                  struct txn **tid, int force)
2378 {
2379     return mystore(db, key, keylen, NULL, 0, tid, force);
2380 }
2381 
2382 /* twoskip compar function is set at open */
mycompar(struct dbengine * db,const char * a,int alen,const char * b,int blen)2383 static int mycompar(struct dbengine *db, const char *a, int alen,
2384                     const char *b, int blen)
2385 {
2386     return db->compar(a, alen, b, blen);
2387 }
2388 
2389 HIDDEN struct cyrusdb_backend cyrusdb_twoskip =
2390 {
2391     "twoskip",                  /* name */
2392 
2393     &cyrusdb_generic_init,
2394     &cyrusdb_generic_done,
2395     &cyrusdb_generic_sync,
2396     &cyrusdb_generic_archive,
2397     &cyrusdb_generic_unlink,
2398 
2399     &myopen,
2400     &myclose,
2401 
2402     &fetch,
2403     &fetch,
2404     &fetchnext,
2405 
2406     &myforeach,
2407     &create,
2408     &store,
2409     &delete,
2410 
2411     &mycommit,
2412     &myabort,
2413 
2414     &dump,
2415     &consistent,
2416     &mycheckpoint,
2417     &mycompar
2418 };
2419