1 /*
2 * Beansdb - A high available distributed key-value storage system:
3 *
4 * http://beansdb.googlecode.com
5 *
6 * Copyright 2009 Douban Inc. All rights reserved.
7 *
8 * Use and distribution licensed under the BSD license. See
9 * the LICENSE file for full text.
10 *
11 * Authors:
12 * Davies Liu <davies.liu@gmail.com>
13 * Hurricane Lee <hurricane1026@gmail.com>
14 */
15 #include<sys/time.h>
16 #include <stdio.h>
17 #include <errno.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <assert.h>
21 #include <pthread.h>
22 #include <fcntl.h>
23 #include <ctype.h>
24 #include <inttypes.h>
25
26 #include "fnv1a.h"
27 #include "mfile.h"
28 #include "htree.h"
29 #include "codec.h"
30 #include "const.h"
31 #include "log.h"
32 #include "diskmgr.h"
33
34 const int BUCKET_SIZE = 16;
35 const int SPLIT_LIMIT = 64;
36 const int MAX_DEPTH = 8;
37 static const long long g_index[] = {0, 1, 17, 273, 4369, 69905, 1118481, 17895697, 286331153, 4581298449L};
38
39 const char HTREE_VERSION[] = "HTREE001";
40 #define TREE_BUF_SIZE 512
41 #define max(a,b) ((a)>(b)?(a):(b))
42 #define INDEX(it) (0x0f & (keyhash >> ((7 - node->depth - tree->depth) * 4)))
43 #define ITEM_LENGTH(it) ((it)->ksz + sizeof(Item) - ITEM_PADDING)
44 #define HASH(it) ((it)->hash * ((it)->ver>0))
45 #define DATA_file_start(data) (&((data)->size))
46 #define DATA_HEAD_SIZE (int)(((char*)&(((Data*)0)->head)) - (char*)(0))
47 #define DATA_BLOCK_SIZE 256
48 #define DATA_BLOCK_SIZE_SMALL 1024
49
50 typedef struct t_data Data;
51 struct t_data
52 {
53 Data *next;
54 int size;
55 int used;
56 int count;
57 Item head[0];
58 };
59
60
61 typedef struct t_node Node;
62 struct t_node
63 {
64 uint16_t is_node:1;
65 uint16_t valid:1;
66 uint16_t depth:4;
67 uint16_t flag:9;
68 uint16_t hash;
69 uint32_t count;
70 Data *data;
71 };
72
73 struct t_hash_tree
74 {
75 int depth;
76 int pos;
77 int height;
78 int block_size;
79 Node *root;
80 Codec *dc;
81 pthread_mutex_t lock;
82 char buf[TREE_BUF_SIZE];
83
84 uint32_t updating_bucket;
85 HTree *updating_tree;
86 };
87
88
89 // forward dec
90 static void add_item(HTree *tree, Node *node, Item *it, uint32_t keyhash, bool enlarge);
91 static void remove_item(HTree *tree, Node *node, Item *it, uint32_t keyhash);
92 static void split_node(HTree *tree, Node *node);
93 static void merge_node(HTree *tree, Node *node);
94 static void update_node(HTree *tree, Node *node);
95
check_version(Item * oldit,Item * newit,HTree * tree,uint32_t keyhash)96 static inline bool check_version(Item *oldit, Item *newit, HTree *tree, uint32_t keyhash)
97 {
98 if (abs(newit->ver) >= abs(oldit->ver))
99 return true;
100 else
101 {
102 char key[KEY_BUF_LEN];
103 dc_decode(tree->dc, key, KEY_BUF_LEN, oldit->key, oldit->ksz);
104 log_warn("BUG: bad version, oldv=%d, newv=%d, key=%s, keyhash = 0x%x, oldpos = %u", oldit->ver, newit->ver, key, keyhash, oldit->pos);
105 return false;
106 }
107 }
108
get_pos(HTree * tree,Node * node)109 static inline uint32_t get_pos(HTree *tree, Node *node)
110 {
111 return (node - tree->root) - g_index[(int)node->depth];
112 }
113
get_child(HTree * tree,Node * node,int b)114 static inline Node *get_child(HTree *tree, Node *node, int b)
115 {
116 int i = g_index[node->depth + 1] + (get_pos(tree, node) << 4) + b;
117 return tree->root + i;
118 }
119
init_data(Data * data,int size)120 static inline void init_data(Data *data, int size)
121 {
122 data->next = NULL;
123 data->count = 0;
124 data->used = DATA_HEAD_SIZE;
125 data->size = size;
126 }
127
get_data(Node * node)128 static inline Data *get_data(Node *node)
129 {
130 return node->data;
131 }
132
set_data(Node * node,Data * data)133 static inline void set_data(Node *node, Data *data)
134 {
135 node->data = data;
136 }
137
free_data(Node * node)138 static inline void free_data(Node *node)
139 {
140 Data *d, *d0;
141 for (d = node->data; d != NULL; )
142 {
143 d0 = d;
144 d = d->next;
145 free(d0);
146 }
147 node->data = NULL;
148 }
149
150
151
key_hash(HTree * tree,Item * it)152 static inline uint32_t key_hash(HTree *tree, Item *it)
153 {
154 char buf[KEY_BUF_LEN];
155 int n = dc_decode(tree->dc, buf, KEY_BUF_LEN, it->key, it->ksz);
156 return fnv1a(buf, n);
157 }
158
create_item(HTree * tree,const char * key,int len,uint32_t pos,uint16_t hash,int32_t ver)159 static Item *create_item(HTree *tree, const char *key, int len, uint32_t pos, uint16_t hash, int32_t ver)
160 {
161 Item *it = (Item*)tree->buf;
162 it->pos = pos;
163 it->ver = ver;
164 it->hash = hash;
165 it->ksz = dc_encode(tree->dc, it->key, TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING), key, len);
166 return it;
167 }
168
enlarge_pool(HTree * tree)169 static void enlarge_pool(HTree *tree)
170 {
171 int i;
172 int old_size = g_index[tree->height];
173 int new_size = g_index[tree->height + 1];
174
175 log_notice("enlarge pool %d -> %d, new_height = %d", old_size, new_size, tree->height + 1);
176
177 tree->root = (Node*)safe_realloc(tree->root, sizeof(Node) * new_size);
178 memset(tree->root + old_size, 0, sizeof(Node) * (new_size - old_size));
179 for (i = old_size; i<new_size; i++)
180 tree->root[i].depth = tree->height;
181
182 tree->height++;
183 }
184
clear(Node * node)185 static void clear(Node *node)
186 {
187 Data *data = (Data*)safe_malloc(64);
188 init_data(data, 64);
189 set_data(node, data);
190
191 node->is_node = 0;
192 node->valid = 1;
193 node->count = 0;
194 node->hash = 0;
195 }
196
add_item(HTree * tree,Node * node,Item * it,uint32_t keyhash,bool enlarge)197 static void add_item(HTree *tree, Node *node, Item *it, uint32_t keyhash, bool enlarge)
198 {
199 int it_len = ITEM_LENGTH(it);
200 while (node->is_node)
201 {
202 node->valid = 0;
203 node = get_child(tree, node, INDEX(it));
204 }
205
206 Data *data0 = get_data(node);
207 Data *data;
208 for (data = data0; data != NULL; data = data->next)
209 {
210 Item *p = data->head;
211 int i;
212 for (i = 0; i<data->count; ++i, p = (Item*)((char*)p + ITEM_LENGTH(p)))
213 {
214 if (it->ksz == p->ksz &&
215 memcmp(it->key, p->key, it->ksz) == 0)
216 {
217 check_version(p, it, tree, keyhash);
218 node->hash += (HASH(it) - HASH(p)) * keyhash;
219 node->count += (it->ver > 0);
220 node->count -= (p->ver > 0);
221 memcpy(p, it, sizeof(Item)); // safe
222 return;
223 }
224 }
225 }
226
227 Data *last = NULL;
228 Data *llast = NULL;
229 for (data = data0; data != NULL && data->size < data->used + it_len; llast = last, last = data, data = data->next)
230 ;
231
232 if (data == NULL)
233 {
234 if (last->used + it_len > tree->block_size)
235 {
236 int size = DATA_HEAD_SIZE + it_len;
237 data = (Data*)safe_malloc(size);
238 init_data(data, size);
239 last->next = data;
240 }
241 else
242 {
243 int size = max(last->used + it_len, last->size);
244 size = min(size, tree->block_size);
245 data = (Data*)safe_realloc(last, size);
246 data->size = size;
247
248 if (llast)
249 llast->next = data;
250 else
251 set_data(node, data);
252 }
253 }
254
255 Item *p = (Item*)(((char*)data) + data->used);
256 safe_memcpy(p, data->size - data->used, it, it_len);
257 data->count++;
258 data->used += it_len;
259 node->count += (it->ver > 0);
260 node->hash += keyhash * HASH(it);
261
262 if (node->count > SPLIT_LIMIT)
263 {
264 if (node->depth == tree->height - 1)
265 {
266 if (enlarge && (tree->height + tree->depth < MAX_DEPTH) && (node->count > SPLIT_LIMIT * 4))
267 {
268 int pos = node - tree->root;
269 enlarge_pool(tree);
270 node = tree->root + pos; // reload
271 split_node(tree, node);
272 }
273 }
274 else
275 {
276 split_node(tree, node);
277 }
278 }
279 }
280
split_node(HTree * tree,Node * node)281 static void split_node(HTree *tree, Node *node)
282 {
283 Node *child = get_child(tree, node, 0);
284 int i;
285 for (i = 0; i < BUCKET_SIZE; ++i)
286 clear(child + i);
287
288 Data *data0 = get_data(node);
289 Data *data;
290 for (data = data0; data != NULL; data = data->next)
291 {
292 Item *it = data->head;
293 for (i = 0; i < data->count; ++i)
294 {
295 int32_t keyhash = key_hash(tree, it);
296 add_item(tree, child + INDEX(it), it, keyhash, false);
297 it = (Item*)((char*)it + ITEM_LENGTH(it));
298 }
299 }
300
301 free_data(node);
302
303 node->is_node = 1;
304 node->valid = 0;
305 }
306
remove_item(HTree * tree,Node * node,Item * it,uint32_t keyhash)307 static void remove_item(HTree *tree, Node *node, Item *it, uint32_t keyhash)
308 {
309 while (node->is_node)
310 {
311 node->valid = 0;
312 node = get_child(tree, node, INDEX(it));
313 }
314
315 Data *data0 = get_data(node);
316 Data *data;
317 Data *last = NULL;
318 for (data = data0; data != NULL; last = data, data = data->next)
319 {
320 Item *p = data->head;
321 int i;
322 int p_len;
323 for (i = 0; i < data->count; ++i, p = (Item*)((char*)p + p_len))
324 {
325 p_len = ITEM_LENGTH(p);
326 if (it->ksz == p->ksz &&
327 memcmp(it->key, p->key, it->ksz) == 0)
328 {
329 data->count--;
330 data->used -= p_len;
331 node->count -= p->ver > 0;
332 node->hash -= keyhash * HASH(p);
333 if (data->count > 0)
334 {
335 memmove(p, (char*)p + p_len, data->size - ((char*)p - (char*)data) - p_len);
336 }
337 else if (data != data0 && data->next != NULL) //neither first nor last
338 {
339 last->next = data->next;
340 free(data);
341 }
342 return;
343 }
344 }
345 }
346 }
347
merge_node(HTree * tree,Node * node)348 static void merge_node(HTree *tree, Node *node)
349 {
350 clear(node);
351
352 Node *child = get_child(tree, node, 0);
353 int i, j;
354 for (i = 0; i < BUCKET_SIZE; ++i)
355 {
356 Data *data0 = get_data(child + i);
357 Data *data;
358 for (data = data0; data != NULL; data = data->next)
359 {
360 Item *it = data->head;
361 for (j = 0; j < data->count; ++j, it = (Item*)((char*)it + ITEM_LENGTH(it)))
362 {
363 if (it->ver > 0)
364 {
365 add_item(tree, node, it, key_hash(tree, it), false);
366 } // drop deleted items, ver < 0
367 }
368 }
369 free_data(child + i);
370 }
371 }
372
update_node(HTree * tree,Node * node)373 static void update_node(HTree *tree, Node *node)
374 {
375 if (node->valid) return;
376
377 int i;
378 node->hash = 0;
379 if (node->is_node)
380 {
381 Node *child = get_child(tree, node, 0);
382 node->count = 0;
383 for (i = 0; i < BUCKET_SIZE; i++)
384 {
385 update_node(tree, child+i);
386 node->count += child[i].count;
387 }
388 for (i = 0; i < BUCKET_SIZE; i++)
389 {
390 if (node->count > SPLIT_LIMIT * 4)
391 {
392 node->hash *= 97;
393 }
394 node->hash += child[i].hash;
395 }
396 }
397 node->valid = 1;
398
399 // merge nodes
400 if (node->count <= SPLIT_LIMIT)
401 {
402 merge_node(tree, node);
403 }
404 }
405
get_item_hash(HTree * tree,Node * node,Item * it,uint32_t keyhash)406 static Item *get_item_hash(HTree *tree, Node *node, Item *it, uint32_t keyhash)
407 {
408 while (node->is_node)
409 node = get_child(tree, node, INDEX(it));
410
411 Item *r = NULL;
412 Data *data0 = get_data(node);
413 Data *data;
414 for (data = data0; data != NULL && r == NULL; data = data->next)
415 {
416 Item *p = data->head;
417 int i;
418 int p_len;
419 for (i = 0; i<data->count; i++, p = (Item*)((char*)p + p_len))
420 {
421 p_len = ITEM_LENGTH(p);
422 if (ITEM_LENGTH(it) == p_len &&
423 memcmp(it->key, p->key, it->ksz) == 0)
424 {
425 r = p;
426 break;
427 }
428 }
429 }
430 return r;
431 }
432
hex2int(char b)433 static inline int hex2int(char b)
434 {
435 if (('0'<=b && b<='9') || ('a'<=b && b<='f'))
436 {
437 return (b>='a') ? (b-'a'+10) : (b-'0');
438 }
439 else
440 {
441 return -1;
442 }
443 }
444
get_node_hash(HTree * tree,Node * node,const char * dir,unsigned int * count)445 static uint16_t get_node_hash(HTree *tree, Node *node, const char *dir,
446 unsigned int *count)
447 {
448 if (node->is_node && strlen(dir) > 0)
449 {
450 char i = hex2int(dir[0]);
451 if (i >= 0)
452 {
453 return get_node_hash(tree, get_child(tree, node, i), dir + 1, count);
454 }
455 else
456 {
457 if(count) *count = 0;
458 return 0;
459 }
460 }
461 update_node(tree, node);
462 if (count) *count = node->count;
463 return node->hash;
464 }
465
list_dir(HTree * tree,Node * node,const char * dir,const char * prefix)466 static char *list_dir(HTree *tree, Node *node, const char *dir, const char *prefix)
467 {
468 int dlen = strlen(dir);
469 while (node->is_node && dlen > 0)
470 {
471 int b = hex2int(dir[0]);
472 if (b >= 0 && b < BUCKET_SIZE)
473 {
474 node = get_child(tree, node, b);
475 ++dir;
476 --dlen;
477 }
478 else
479 {
480 return NULL;
481 }
482 }
483
484 int bsize = 4096;
485 char *buf = (char*)safe_malloc(bsize);
486 memset(buf, 0, bsize);
487 int n = 0, i;
488 if (node->is_node)
489 {
490 update_node(tree, node);
491
492 Node *child = get_child(tree, node, 0);
493 if (node->count > 100000 || (prefix == NULL && node->count > SPLIT_LIMIT * 4))
494 {
495 for (i = 0; i < BUCKET_SIZE; i++)
496 {
497 Node *t = child + i;
498 n += safe_snprintf(buf + n, bsize - n, "%x/ %u %u\n",
499 i, t->hash, t->count);
500 }
501 }
502 else
503 {
504 for (i = 0; i < BUCKET_SIZE; i++)
505 {
506 char *r = list_dir(tree, child + i, "", prefix);
507 int rl = strlen(r) + 1;
508 if (bsize - n < rl)
509 {
510 bsize += rl;
511 buf = (char*)safe_realloc(buf, bsize);
512 }
513 n += safe_snprintf(buf + n, bsize - n, "%s", r);
514 free(r);
515 }
516 }
517 }
518 else
519 {
520 char pbuf[20], key[KEY_BUF_LEN];
521 int prefix_len = 0;
522 if (prefix != NULL)
523 prefix_len = strlen(prefix);
524
525 Data *data0 = get_data(node);
526 Data *data;
527 for (data = data0; data != NULL; data = data->next)
528 {
529 Item *it = data->head;
530 for (i = 0; i < data->count; i++, it = (Item*)((char*)it + ITEM_LENGTH(it)))
531 {
532 if (dlen > 0)
533 {
534 safe_snprintf(pbuf, 20, "%08x", key_hash(tree, it));
535 if (memcmp(pbuf + tree->depth + node->depth, dir, dlen) != 0)
536 {
537 continue;
538 }
539 }
540 int l = dc_decode(tree->dc, key, KEY_BUF_LEN, it->key, it->ksz);
541 if (prefix == NULL || (l >= prefix_len && strncmp(key, prefix, prefix_len) == 0))
542 {
543 if (bsize - n < KEY_BUF_LEN + 32)
544 {
545 bsize *= 2;
546 buf = (char*)safe_realloc(buf, bsize);
547 }
548
549 n += safe_snprintf(buf + n, bsize - n, "%s %u %d\n", key, it->hash, it->ver);
550 }
551 }
552 }
553 }
554 return buf;
555 }
556
visit_node(HTree * tree,Node * node,fun_visitor visitor,void * param)557 static void visit_node(HTree *tree, Node *node, fun_visitor visitor, void *param)
558 {
559 int i;
560 if (node->is_node)
561 {
562 Node *child = get_child(tree, node, 0);
563 for (i = 0; i < BUCKET_SIZE; i++)
564 {
565 visit_node(tree, child + i, visitor, param);
566 }
567 }
568 else
569 {
570 Data *data0 = get_data(node);
571 Data *data;
572 for (data = data0; data != NULL; data = data->next)
573 {
574 Item *p = data->head;
575 Item *it = (Item*)tree->buf;
576 int buf_size = TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING);;
577 int decode_len;
578 for (i = 0; i < data->count; i++, p = (Item*)((char*)p + ITEM_LENGTH(p)))
579 {
580 safe_memcpy(it, buf_size, p, sizeof(Item));
581 decode_len = dc_decode(tree->dc, it->key, buf_size, p->key, p->ksz);
582 it->ksz = strlen(it->key);
583 visitor(it, param);
584 }
585 }
586 }
587 }
588
589 /*
590 * API
591 */
592
ht_new(int depth,int pos,bool tmp)593 HTree *ht_new(int depth, int pos, bool tmp)
594 {
595 HTree *tree = (HTree*)safe_malloc(sizeof(HTree));
596 memset(tree, 0, sizeof(HTree));
597 tree->depth = depth;
598 tree->pos = pos;
599 tree->height = 1;
600 if (tmp)
601 tree->block_size = DATA_BLOCK_SIZE_SMALL;
602 else
603 tree->block_size = DATA_BLOCK_SIZE;
604 tree->updating_bucket = -1;
605 tree->updating_tree = NULL;
606
607 int pool_size = g_index[tree->height];
608 Node *root = (Node*)safe_malloc(sizeof(Node) * pool_size);
609
610 memset(root, 0, sizeof(Node) * pool_size);
611
612 // init depth
613 int i,j;
614 for (i = 0; i < tree->height; i++)
615 {
616 for (j = g_index[i]; j < g_index[i + 1]; j++)
617 {
618 root[j].depth = i;
619 }
620 }
621
622 tree->root = root;
623 clear(tree->root);
624
625 tree->dc = dc_new();
626 pthread_mutex_init(&tree->lock, NULL);
627
628 return tree;
629 }
630
ht_open(int depth,int pos,const char * path)631 HTree *ht_open(int depth, int pos, const char *path)
632 {
633 char version[sizeof(HTREE_VERSION) + 1] = {0};
634 HTree *tree = NULL;
635 Node *root = NULL;
636 int pool_used = 0;
637 char *buf = NULL;
638
639 FILE *f = fopen(path, "rb");
640 if (f == NULL)
641 {
642 log_error("open %s failed", path);
643 return NULL;
644 }
645
646 if (fread(version, sizeof(HTREE_VERSION), 1, f) != 1
647 || memcmp(version, HTREE_VERSION, sizeof(HTREE_VERSION)) != 0)
648 {
649 log_error("the version %s is not expected", version);
650 fclose(f);
651 return NULL;
652 }
653
654 off_t fsize = 0;
655 if (fread(&fsize, sizeof(fsize), 1, f) != 1 ||
656 fseeko(f, 0, 2) != 0 || ftello(f) != fsize)
657 {
658 log_error("the size %lld is not expected", (long long int)fsize);
659 fclose(f);
660 return NULL;
661 }
662 fseeko(f, sizeof(HTREE_VERSION) + sizeof(off_t), 0);
663 #if _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L
664 if (posix_fadvise(fileno(f), 0, fsize, POSIX_FADV_SEQUENTIAL) != 0)
665 {
666 log_error("posix_favise() failed");
667 }
668 #endif
669
670 tree = (HTree*)safe_malloc(sizeof(HTree));
671
672 memset(tree, 0, sizeof(HTree));
673 tree->depth = depth;
674 tree->pos = pos;
675 tree->updating_bucket = -1;
676 tree->block_size = DATA_BLOCK_SIZE;
677
678 if (fread(&tree->height, sizeof(int), 1, f) != 1 ||
679 tree->height + depth < 0 || tree->height + depth > 9)
680 {
681 log_error("invalid height: %d", tree->height);
682 goto FAIL;
683 }
684
685 int pool_size = g_index[tree->height];
686 int psize = sizeof(Node) * pool_size;
687 root = (Node*)safe_malloc(psize);
688
689 if (fread(root, psize, 1, f) != 1)
690 {
691 goto FAIL;
692 }
693 tree->root = root;
694
695 // load Data
696 int i,size = 0;
697 Data *data;
698 for (i = 0; i < pool_size; i++)
699 {
700 if(fread(&size, sizeof(int), 1, f) != 1)
701 {
702 goto FAIL;
703 }
704 if (size > 0)
705 {
706 data = (Data*)safe_malloc(size + sizeof(Data*));
707 if (fread(DATA_file_start(data), size, 1, f) != 1)
708 {
709 log_error("load data: size %d fail", size);
710 goto FAIL;
711 }
712 if (data->used != size)
713 {
714 log_error("broken data: %d != %d", data->used, size);
715 goto FAIL;
716 }
717 data->used = data->size = size + sizeof(Data*);
718 data->next = NULL;
719 }
720 else if (size == 0)
721 {
722 data = NULL;
723 }
724 else
725 {
726 log_error("unexpected size: %d", size);
727 goto FAIL;
728 }
729 tree->root[i].data = data;
730 pool_used++;
731 }
732
733
734 // load Codec
735 if (fread(&size, sizeof(int), 1, f) != 1
736 || size < 0 || size > (10<<20))
737 {
738 log_error("bad codec size: %d", size);
739 goto FAIL;
740 }
741 buf = (char*)safe_malloc(size);
742 if (fread(buf, size, 1, f) != 1)
743 {
744 log_error("read codec failed");
745 goto FAIL;
746 }
747 tree->dc = dc_new();
748 if (dc_load(tree->dc, buf, size) != 0)
749 {
750 log_error("load codec failed");
751 goto FAIL;
752 }
753 free(buf);
754 fclose(f);
755
756 pthread_mutex_init(&tree->lock, NULL);
757
758 return tree;
759
760 FAIL:
761 if (tree->dc)
762 dc_destroy(tree->dc);
763 if (buf)
764 free(buf);
765 if (root)
766 {
767 for (i = 0; i < pool_used; i++)
768 {
769 if (root[i].data) free(root[i].data);
770 }
771 free(root);
772 }
773 free(tree);
774 fclose(f);
775 return NULL;
776 }
777
ht_save2(HTree * tree,FILE * f)778 static int ht_save2(HTree *tree, FILE *f)
779 {
780 size_t last_advise = 0;
781 int fd = fileno(f);
782
783 off_t pos = 0;
784 if (fwrite(HTREE_VERSION, sizeof(HTREE_VERSION), 1, f) != 1 ||
785 fwrite(&pos, sizeof(off_t), 1, f) != 1)
786 {
787 log_error("write version failed");
788 return -1;
789 }
790
791 int pool_size = g_index[tree->height];
792 if (fwrite(&tree->height, sizeof(int), 1, f) != 1 ||
793 fwrite(tree->root, sizeof(Node) * pool_size, 1, f) != 1 )
794 {
795 log_error("write nodes failed");
796 return -1;
797 }
798
799 int i, zero = 0;
800 for (i = 0; i < pool_size; i++)
801 {
802 Data *data0= tree->root[i].data;
803 if (data0)
804 {
805 Data new_data;
806 init_data(&new_data, 0);
807 Data *data;
808 for (data = data0; data != NULL; data = data->next)
809 {
810 new_data.count += data->count;
811 new_data.used += data->used - DATA_HEAD_SIZE;
812 }
813 new_data.used -= sizeof(Data*);
814
815 if (fwrite(&(new_data.used), sizeof(int), 1, f) != 1 || fwrite(DATA_file_start(&new_data), DATA_HEAD_SIZE - sizeof(Data*), 1, f) != 1)
816 {
817 log_error("write data head failed");
818 return -1;
819 }
820
821 for (data = data0; data != NULL; data = data->next)
822 {
823 if (data->count == 0)
824 continue;
825 if ( fwrite(((char*)data) + DATA_HEAD_SIZE, data->used - DATA_HEAD_SIZE, 1, f) != 1)
826 {
827 log_error("write data failed");
828 return -1;
829 }
830 }
831 file_dontneed(fd, ftello(f), &last_advise);
832 }
833 else
834 {
835 if (fwrite(&zero, sizeof(int), 1, f) != 1)
836 {
837 log_error("write zero failed");
838 return -1;
839 }
840 }
841 }
842
843 int s = dc_size(tree->dc);
844 char *buf = (char*)safe_malloc(s + sizeof(int));
845 *(int*)buf = s;
846 if (dc_dump(tree->dc, buf + sizeof(int), s) != s)
847 {
848 log_error("dump Codec failed");
849 free(buf);
850 return -1;
851 }
852 if (fwrite(buf, s + sizeof(int), 1, f) != 1)
853 {
854 log_error("write Codec failed");
855 free(buf);
856 return -1;
857 }
858 free(buf);
859
860 pos = ftello(f);
861 fseeko(f, sizeof(HTREE_VERSION), 0);
862 if (fwrite(&pos, sizeof(off_t), 1, f) != 1)
863 {
864 log_error("write size failed");
865 return -1;
866 }
867
868 return 0;
869 }
870
ht_save(HTree * tree,const char * path)871 int ht_save(HTree *tree, const char *path)
872 {
873 if (!tree || !path) return -1;
874
875 mgr_unlink(path);
876
877 char tmp[MAX_PATH_LEN];
878 safe_snprintf(tmp, MAX_PATH_LEN, "%s.tmp", path);
879
880 FILE *f = fopen(tmp, "wb");
881 if (f == NULL)
882 {
883 log_error("open %s failed", tmp);
884 return -1;
885 }
886
887 int buf_size = 1024*1024;
888 char *buff = (char*)malloc(buf_size);
889 memset( buff, '\0', buf_size);
890 setvbuf(f, buff, _IOFBF, buf_size);
891
892 uint64_t file_size = 0;
893 struct timeval save_start, save_end;
894 gettimeofday(&save_start, NULL);
895
896 pthread_mutex_lock(&tree->lock);
897 int ret = ht_save2(tree, f);
898 pthread_mutex_unlock(&tree->lock);
899 if (ret == 0)
900 {
901 fseeko(f, 0, SEEK_END);
902 file_size = ftello(f);
903 }
904 fclose(f);
905 free(buff);
906
907 if (ret == 0)
908 {
909 gettimeofday(&save_end, NULL);
910 float save_secs = (save_end.tv_sec - save_start.tv_sec) + (save_end.tv_usec - save_start.tv_usec) / 1e6;
911 log_notice("save HTree to %s, size = %"PRIu64", in %f secs", path, file_size, save_secs);
912 mgr_rename(tmp, path);
913 }
914 else
915 mgr_unlink(tmp);
916 return ret;
917 }
918
ht_destroy(HTree * tree)919 void ht_destroy(HTree *tree)
920 {
921 if (!tree) return;
922
923 pthread_mutex_lock(&tree->lock);
924
925 dc_destroy(tree->dc);
926
927 int i;
928 int pool_size = g_index[tree->height];
929 for(i = 0; i < pool_size; i++)
930 {
931 if (tree->root[i].data)
932 free_data(tree->root + i);
933 }
934 free(tree->root);
935 free(tree);
936 }
937
keyhash(const char * s,int len)938 static inline uint32_t keyhash(const char *s, int len)
939 {
940 return fnv1a(s, len);
941 }
942
check_key(const char * key,int len)943 bool check_key(const char *key, int len)
944 {
945 if (!key) return false;
946 if (len == 0 || len > MAX_KEY_LEN)
947 {
948 log_error("bad key len=%d", len);
949 return false;
950 }
951 if (key[0] <= ' ')
952 {
953 log_error("bad key len=%d %x", len, key[0]);
954 return false;
955 }
956 int k;
957 for (k = 0; k < len; k++)
958 {
959 if (isspace(key[k]) || iscntrl(key[k]))
960 {
961 log_error("bad key len=%d %s", len, key);
962 return false;
963 }
964 }
965 return true;
966 }
967
check_bucket(HTree * tree,const char * key,int len)968 bool check_bucket(HTree *tree, const char* key, int len)
969 {
970 uint32_t h = keyhash(key, len);
971 if (tree->depth > 0 && h >> ((8-tree->depth) * 4) != (unsigned int)(tree->pos))
972 {
973 log_error("key %s (#%x) should not in this tree (%d:%0x)", key, h >> ((8-tree->depth) * 4), tree->depth, tree->pos);
974 return false;
975 }
976
977 return true;
978 }
979
ht_add2(HTree * tree,const char * key,int len,uint32_t pos,uint16_t hash,int32_t ver)980 void ht_add2(HTree *tree, const char *key, int len, uint32_t pos, uint16_t hash, int32_t ver)
981 {
982 if (!check_bucket(tree, key, len)) return;
983 Item *it = create_item(tree, key, len, pos, hash, ver);
984 add_item(tree, tree->root, it, keyhash(key, len), true);
985 }
986
ht_add(HTree * tree,const char * key,uint32_t pos,uint16_t hash,int32_t ver)987 void ht_add(HTree *tree, const char *key, uint32_t pos, uint16_t hash, int32_t ver)
988 {
989 pthread_mutex_lock(&tree->lock);
990 ht_add2(tree, key, strlen(key), pos, hash, ver);
991 pthread_mutex_unlock(&tree->lock);
992 }
993
ht_remove2(HTree * tree,const char * key,int len)994 void ht_remove2(HTree *tree, const char *key, int len)
995 {
996 if (!check_bucket(tree, key, len)) return;
997 Item *it = create_item(tree, key, len, 0, 0, 0);
998 remove_item(tree, tree->root, it, keyhash(key, len));
999 }
1000
ht_remove(HTree * tree,const char * key)1001 void ht_remove(HTree *tree, const char *key)
1002 {
1003 pthread_mutex_lock(&tree->lock);
1004 ht_remove2(tree, key, strlen(key));
1005 pthread_mutex_unlock(&tree->lock);
1006 }
1007
ht_get2(HTree * tree,const char * key,int len)1008 Item *ht_get2(HTree *tree, const char *key, int len)
1009 {
1010 if (!check_bucket(tree, key, len)) return NULL;
1011
1012 pthread_mutex_lock(&tree->lock);
1013 Item *it = create_item(tree, key, len, 0, 0, 0);
1014 Item *r = get_item_hash(tree, tree->root, it, keyhash(key, len));
1015 if (r != NULL)
1016 {
1017 Item *rr = (Item*)safe_malloc(sizeof(Item) + len);
1018 memcpy(rr, r, sizeof(Item)); // safe
1019 memcpy(rr->key, key, len); // safe
1020 rr->key[len] = 0; // c-str
1021 r = rr; // r is in node->Data block
1022 }
1023 pthread_mutex_unlock(&tree->lock);
1024 return r;
1025 }
1026
ht_get(HTree * tree,const char * key)1027 Item *ht_get(HTree *tree, const char *key)
1028 {
1029 return ht_get2(tree, key, strlen(key));
1030 }
1031
ht_get_hash(HTree * tree,const char * key,unsigned int * count)1032 uint32_t ht_get_hash(HTree *tree, const char *key, unsigned int *count)
1033 {
1034 if (!tree || !key || key[0] != '@')
1035 {
1036 if(count) *count = 0;
1037 return 0;
1038 }
1039
1040 uint32_t hash = 0;
1041 pthread_mutex_lock(&tree->lock);
1042 update_node(tree, tree->root);
1043 hash = get_node_hash(tree, tree->root, key+1, count);
1044 pthread_mutex_unlock(&tree->lock);
1045 return hash;
1046 }
1047
ht_list(HTree * tree,const char * dir,const char * prefix)1048 char *ht_list(HTree *tree, const char *dir, const char *prefix)
1049 {
1050 if (!tree || !dir || strlen(dir) > 8) return NULL;
1051 if (prefix != NULL && strlen(prefix) == 0) prefix = NULL;
1052
1053 pthread_mutex_lock(&tree->lock);
1054 char *r = list_dir(tree, tree->root, dir, prefix);
1055 pthread_mutex_unlock(&tree->lock);
1056
1057 return r;
1058 }
1059
ht_visit(HTree * tree,fun_visitor visitor,void * param)1060 void ht_visit(HTree *tree, fun_visitor visitor, void *param)
1061 {
1062 pthread_mutex_lock(&tree->lock);
1063 visit_node(tree, tree->root, visitor, param);
1064 pthread_mutex_unlock(&tree->lock);
1065 }
1066
ht_visit2(HTree * tree,fun_visitor visitor,void * param)1067 void ht_visit2(HTree *tree, fun_visitor visitor, void *param)
1068 {
1069 visit_node(tree, tree->root, visitor, param);
1070 }
1071
ht_set_updating_bucket(HTree * tree,int bucket,HTree * updating_tree)1072 void ht_set_updating_bucket(HTree *tree, int bucket, HTree *updating_tree)
1073 {
1074 log_notice("updating bucket %d for htree 0x%x", bucket, tree->pos);
1075 pthread_mutex_lock(&tree->lock);
1076 tree->updating_bucket = bucket;
1077 tree->updating_tree = updating_tree;
1078 pthread_mutex_unlock(&tree->lock);
1079 }
1080
ht_get_withbuf(HTree * tree,const char * key,int len,char * buf,bool lock)1081 Item *ht_get_withbuf(HTree *tree, const char *key, int len, char *buf, bool lock)
1082 {
1083 if (!check_bucket(tree, key, len)) return NULL;
1084
1085 Item *it = (Item*)buf;
1086 it->ksz = dc_encode(tree->dc, it->key, TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING), key, len);
1087
1088 if (lock)
1089 pthread_mutex_lock(&tree->lock);
1090 Item *r = get_item_hash(tree, tree->root, it, keyhash(key, len));
1091 if (r != NULL)
1092 {
1093 int l = ITEM_LENGTH(it);
1094 memcpy(it, r, l); // safe
1095 buf[l] = 0; // c-str
1096 r = it;
1097 }
1098 if (lock)
1099 pthread_mutex_unlock(&tree->lock);
1100 return r;
1101 }
1102
1103
ht_get_maybe_tmp(HTree * tree,const char * key,int * is_tmp,char * buf)1104 Item *ht_get_maybe_tmp(HTree *tree, const char *key, int *is_tmp, char *buf)
1105 {
1106 *is_tmp = 0;
1107 Item *item = ht_get_withbuf(tree, key, strlen(key), buf, true);
1108 if (NULL != item)
1109 {
1110 uint32_t bucket = item->pos & 0xff;
1111 if (tree->updating_bucket == bucket)
1112 {
1113 pthread_mutex_lock(&tree->lock);
1114 if (tree->updating_bucket == bucket)
1115 {
1116 log_debug("get tmp for %s", key);
1117 *is_tmp = 1;
1118 item = ht_get_withbuf(tree->updating_tree, key, strlen(key), buf, false);
1119 }
1120 else
1121 {
1122 log_notice("get again for %s", key);
1123 item = ht_get_withbuf(tree, key, strlen(key), buf, false);
1124 }
1125 pthread_mutex_unlock(&tree->lock);
1126 }
1127 }
1128 return item;
1129 }
1130
1131