1 /*
2  *  Beansdb - A high available distributed key-value storage system:
3  *
4  *      http://beansdb.googlecode.com
5  *
6  *  Copyright 2010 Douban Inc.  All rights reserved.
7  *
8  *  Use and distribution licensed under the BSD license.  See
9  *  the LICENSE file for full text.
10  *
11  *  Authors:
12  *      Davies Liu <davies.liu@gmail.com>
13  *      Hurricane Lee <hurricane1026@gmail.com>
14  *
15  */
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <stdint.h>
20 #include <string.h>
21 #include <time.h>
22 
23 #include "hint.h"
24 #include "quicklz.h"
25 #include "diskmgr.h"
26 #include "fnv1a.h"
27 #include "const.h"
28 
29 #ifdef HAVE_CONFIG_H
30 #   include "config.h"
31 #endif
32 
33 
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 
38 #include "mfile.h"
39 #include "log.h"
40 
41 // for build hint
42 struct param
43 {
44     int size;
45     int curr;
46     char *buf;
47 };
48 
collect_items(Item * it,void * param)49 void collect_items(Item *it, void *param)
50 {
51     int ksize = strlen(it->key);
52     int length = sizeof(HintRecord) + ksize + 1 - NAME_IN_RECORD;
53     struct param *p = (struct param *)param;
54     if (p->size - p->curr < length)
55     {
56         p->size *= 2;
57         p->buf = (char*)safe_realloc(p->buf, p->size);
58     }
59 
60     HintRecord *r = (HintRecord*)(p->buf + p->curr);
61     r->ksize = ksize;
62     r->pos = it->pos >> 8;
63     r->version = it->ver;
64     r->hash = it->hash;
65         // TODO: check ir
66     safe_memcpy(r->key, p->size - p->curr - sizeof(HintRecord) + NAME_IN_RECORD, it->key, r->ksize + 1);
67 
68     p->curr += length;
69 }
70 
write_hint_file(char * buf,int size,const char * path)71 void write_hint_file(char *buf, int size, const char *path)
72 {
73     // compress
74     char *dst = buf;
75     if (strcmp(path + strlen(path) - 4, ".qlz") == 0)
76     {
77         char *wbuf = (char*)safe_malloc(QLZ_SCRATCH_COMPRESS);
78         dst = (char*)safe_malloc(size + 400);
79         size = qlz_compress(buf, dst, size, wbuf);
80         free(wbuf);
81     }
82 
83     char tmp[MAX_PATH_LEN];
84     safe_snprintf(tmp, MAX_PATH_LEN, "%s.tmp", path);
85     FILE *hf = fopen(tmp, "wb");
86     if (NULL == hf)
87     {
88         log_error("open %s failed", tmp);
89         return;
90     }
91     int n = fwrite(dst, 1, size, hf);
92     fclose(hf);
93     if (dst != buf) free(dst);
94 
95     if (n == size)
96     {
97         mgr_unlink(path);
98         mgr_rename(tmp, path);
99     }
100     else
101     {
102         log_error("write to %s failed", tmp);
103     }
104 }
105 
build_hint(HTree * tree,const char * hintpath)106 void build_hint(HTree *tree, const char *hintpath)
107 {
108     struct param p;
109     p.size = 1024 * 1024;
110     p.curr = 0;
111     p.buf = (char*)safe_malloc(p.size);
112 
113     ht_visit(tree, collect_items, &p);
114     ht_destroy(tree);
115 
116     write_hint_file(p.buf, p.curr, hintpath);
117     free(p.buf);
118 }
119 
open_hint(const char * path,const char * new_path)120 HintFile *open_hint(const char *path, const char *new_path)
121 {
122     MFile *f = open_mfile(path);
123     if (f == NULL)
124     {
125         return NULL;
126     }
127 
128     HintFile *hint = (HintFile*) safe_malloc(sizeof(HintFile));
129     hint->f = f;
130     hint->buf = f->addr;
131     hint->size = f->size;
132 
133     if (strcmp(path + strlen(path) - 4, ".qlz") == 0 && hint->size > 0)
134     {
135         char wbuf[QLZ_SCRATCH_DECOMPRESS];
136         int size = qlz_size_decompressed(hint->buf);
137         char *buf = (char*)safe_malloc(size);
138         int vsize = qlz_decompress(hint->buf, buf, wbuf);
139         if (vsize != size)
140         {
141             log_fatal("decompress %s failed: %d < %d, remove it\n", path, vsize, size);
142             mgr_unlink(path);
143             exit(1);
144         }
145         hint->size = size;
146         hint->buf = buf;
147     }
148 
149     if (new_path != NULL)
150     {
151         write_hint_file(hint->buf, hint->size, new_path);
152     }
153 
154     return hint;
155 }
156 
close_hint(HintFile * hint)157 void close_hint(HintFile *hint)
158 {
159     if (hint->buf != hint->f->addr && hint->buf != NULL)
160     {
161         free(hint->buf);
162     }
163     close_mfile(hint->f);
164     free(hint);
165 }
166 
scanHintFile(HTree * tree,int bucket,const char * path,const char * new_path)167 void scanHintFile(HTree *tree, int bucket, const char *path, const char *new_path)
168 {
169     HintFile *hint = open_hint(path, new_path);
170     if (hint == NULL) return;
171 
172     log_notice("scan hint: %s", path);
173 
174     char *p = hint->buf, *end = hint->buf + hint->size;
175     while (p < end)
176     {
177         HintRecord *r = (HintRecord*) p;
178         p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
179         if (p > end)
180         {
181             log_error("scan %s: unexpected end, need %ld byte", path, p - end);
182             break;
183         }
184         uint32_t pos = (r->pos << 8) | (bucket & 0xff);
185         if (check_key(r->key, r->ksize))
186         {
187             if (r->version > 0)
188                 ht_add2(tree, r->key, r->ksize, pos, r->hash, r->version);
189             else
190                 ht_remove2(tree, r->key, r->ksize);
191         }
192     }
193 
194     close_hint(hint);
195 }
196 
count_deleted_record(HTree * tree,int bucket,const char * path,int * total,bool skipped)197 int count_deleted_record(HTree *tree, int bucket, const char *path, int *total, bool skipped)
198 {
199     *total = 0;
200     HintFile *hint = open_hint(path, NULL);
201     if (hint == NULL)
202         return 0;
203 
204     char *p = hint->buf, *end = hint->buf + hint->size;
205     int deleted = 0;
206     while (p < end)
207     {
208         HintRecord *r = (HintRecord*) p;
209         p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
210         if (p > end)
211         {
212             log_error("scan %s: unexpected end, need %ld byte", path, p - end);
213             break;
214         }
215         (*total)++;
216         Item *it = ht_get2(tree, r->key, r->ksize);
217         //key not exist || not used || (used && deleted && not skipped)
218         if (it == NULL || it->pos != ((r->pos << 8) | (unsigned int)bucket) || (it->ver <= 0 && !skipped))
219         {
220             deleted++;
221         }
222         if (it) free(it);
223     }
224 
225     close_hint(hint);
226     return deleted;
227 }
228