1 /*
2 * Beansdb - A high available distributed key-value storage system:
3 *
4 * http://beansdb.googlecode.com
5 *
6 * Copyright 2010 Douban Inc. All rights reserved.
7 *
8 * Use and distribution licensed under the BSD license. See
9 * the LICENSE file for full text.
10 *
11 * Authors:
12 * Davies Liu <davies.liu@gmail.com>
13 * Hurricane Lee <hurricane1026@gmail.com>
14 *
15 */
16
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <stdint.h>
20 #include <string.h>
21 #include <time.h>
22
23 #include "hint.h"
24 #include "quicklz.h"
25 #include "diskmgr.h"
26 #include "fnv1a.h"
27 #include "const.h"
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33
34 #if HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37
38 #include "mfile.h"
39 #include "log.h"
40
41 // for build hint
42 struct param
43 {
44 int size;
45 int curr;
46 char *buf;
47 };
48
collect_items(Item * it,void * param)49 void collect_items(Item *it, void *param)
50 {
51 int ksize = strlen(it->key);
52 int length = sizeof(HintRecord) + ksize + 1 - NAME_IN_RECORD;
53 struct param *p = (struct param *)param;
54 if (p->size - p->curr < length)
55 {
56 p->size *= 2;
57 p->buf = (char*)safe_realloc(p->buf, p->size);
58 }
59
60 HintRecord *r = (HintRecord*)(p->buf + p->curr);
61 r->ksize = ksize;
62 r->pos = it->pos >> 8;
63 r->version = it->ver;
64 r->hash = it->hash;
65 // TODO: check ir
66 safe_memcpy(r->key, p->size - p->curr - sizeof(HintRecord) + NAME_IN_RECORD, it->key, r->ksize + 1);
67
68 p->curr += length;
69 }
70
write_hint_file(char * buf,int size,const char * path)71 void write_hint_file(char *buf, int size, const char *path)
72 {
73 // compress
74 char *dst = buf;
75 if (strcmp(path + strlen(path) - 4, ".qlz") == 0)
76 {
77 char *wbuf = (char*)safe_malloc(QLZ_SCRATCH_COMPRESS);
78 dst = (char*)safe_malloc(size + 400);
79 size = qlz_compress(buf, dst, size, wbuf);
80 free(wbuf);
81 }
82
83 char tmp[MAX_PATH_LEN];
84 safe_snprintf(tmp, MAX_PATH_LEN, "%s.tmp", path);
85 FILE *hf = fopen(tmp, "wb");
86 if (NULL == hf)
87 {
88 log_error("open %s failed", tmp);
89 return;
90 }
91 int n = fwrite(dst, 1, size, hf);
92 fclose(hf);
93 if (dst != buf) free(dst);
94
95 if (n == size)
96 {
97 mgr_unlink(path);
98 mgr_rename(tmp, path);
99 }
100 else
101 {
102 log_error("write to %s failed", tmp);
103 }
104 }
105
build_hint(HTree * tree,const char * hintpath)106 void build_hint(HTree *tree, const char *hintpath)
107 {
108 struct param p;
109 p.size = 1024 * 1024;
110 p.curr = 0;
111 p.buf = (char*)safe_malloc(p.size);
112
113 ht_visit(tree, collect_items, &p);
114 ht_destroy(tree);
115
116 write_hint_file(p.buf, p.curr, hintpath);
117 free(p.buf);
118 }
119
open_hint(const char * path,const char * new_path)120 HintFile *open_hint(const char *path, const char *new_path)
121 {
122 MFile *f = open_mfile(path);
123 if (f == NULL)
124 {
125 return NULL;
126 }
127
128 HintFile *hint = (HintFile*) safe_malloc(sizeof(HintFile));
129 hint->f = f;
130 hint->buf = f->addr;
131 hint->size = f->size;
132
133 if (strcmp(path + strlen(path) - 4, ".qlz") == 0 && hint->size > 0)
134 {
135 char wbuf[QLZ_SCRATCH_DECOMPRESS];
136 int size = qlz_size_decompressed(hint->buf);
137 char *buf = (char*)safe_malloc(size);
138 int vsize = qlz_decompress(hint->buf, buf, wbuf);
139 if (vsize != size)
140 {
141 log_fatal("decompress %s failed: %d < %d, remove it\n", path, vsize, size);
142 mgr_unlink(path);
143 exit(1);
144 }
145 hint->size = size;
146 hint->buf = buf;
147 }
148
149 if (new_path != NULL)
150 {
151 write_hint_file(hint->buf, hint->size, new_path);
152 }
153
154 return hint;
155 }
156
close_hint(HintFile * hint)157 void close_hint(HintFile *hint)
158 {
159 if (hint->buf != hint->f->addr && hint->buf != NULL)
160 {
161 free(hint->buf);
162 }
163 close_mfile(hint->f);
164 free(hint);
165 }
166
scanHintFile(HTree * tree,int bucket,const char * path,const char * new_path)167 void scanHintFile(HTree *tree, int bucket, const char *path, const char *new_path)
168 {
169 HintFile *hint = open_hint(path, new_path);
170 if (hint == NULL) return;
171
172 log_notice("scan hint: %s", path);
173
174 char *p = hint->buf, *end = hint->buf + hint->size;
175 while (p < end)
176 {
177 HintRecord *r = (HintRecord*) p;
178 p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
179 if (p > end)
180 {
181 log_error("scan %s: unexpected end, need %ld byte", path, p - end);
182 break;
183 }
184 uint32_t pos = (r->pos << 8) | (bucket & 0xff);
185 if (check_key(r->key, r->ksize))
186 {
187 if (r->version > 0)
188 ht_add2(tree, r->key, r->ksize, pos, r->hash, r->version);
189 else
190 ht_remove2(tree, r->key, r->ksize);
191 }
192 }
193
194 close_hint(hint);
195 }
196
count_deleted_record(HTree * tree,int bucket,const char * path,int * total,bool skipped)197 int count_deleted_record(HTree *tree, int bucket, const char *path, int *total, bool skipped)
198 {
199 *total = 0;
200 HintFile *hint = open_hint(path, NULL);
201 if (hint == NULL)
202 return 0;
203
204 char *p = hint->buf, *end = hint->buf + hint->size;
205 int deleted = 0;
206 while (p < end)
207 {
208 HintRecord *r = (HintRecord*) p;
209 p += sizeof(HintRecord) - NAME_IN_RECORD + r->ksize + 1;
210 if (p > end)
211 {
212 log_error("scan %s: unexpected end, need %ld byte", path, p - end);
213 break;
214 }
215 (*total)++;
216 Item *it = ht_get2(tree, r->key, r->ksize);
217 //key not exist || not used || (used && deleted && not skipped)
218 if (it == NULL || it->pos != ((r->pos << 8) | (unsigned int)bucket) || (it->ver <= 0 && !skipped))
219 {
220 deleted++;
221 }
222 if (it) free(it);
223 }
224
225 close_hint(hint);
226 return deleted;
227 }
228