1 /*
2  *  Beansdb - A high available distributed key-value storage system:
3  *
4  *      http://beansdb.googlecode.com
5  *
6  *  Copyright 2009 Douban Inc.  All rights reserved.
7  *
8  *  Use and distribution licensed under the BSD license.  See
9  *  the LICENSE file for full text.
10  *
11  *  Authors:
12  *      Davies Liu <davies.liu@gmail.com>
13  *      Hurricane Lee <hurricane1026@gmail.com>
14  */
15 #include<sys/time.h>
16 #include <stdio.h>
17 #include <errno.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <assert.h>
21 #include <pthread.h>
22 #include <fcntl.h>
23 #include <ctype.h>
24 #include <inttypes.h>
25 
26 #include "fnv1a.h"
27 #include "mfile.h"
28 #include "htree.h"
29 #include "codec.h"
30 #include "const.h"
31 #include "log.h"
32 #include "diskmgr.h"
33 
34 const int BUCKET_SIZE = 16;
35 const int SPLIT_LIMIT = 64;
36 const int MAX_DEPTH = 8;
37 static const long long g_index[] = {0, 1, 17, 273, 4369, 69905, 1118481, 17895697, 286331153, 4581298449L};
38 
39 const char HTREE_VERSION[] = "HTREE001";
40 #define TREE_BUF_SIZE 512
41 #define max(a,b) ((a)>(b)?(a):(b))
42 #define INDEX(it) (0x0f & (keyhash >> ((7 - node->depth - tree->depth) * 4)))
43 #define ITEM_LENGTH(it) ((it)->ksz + sizeof(Item) - ITEM_PADDING)
44 #define HASH(it) ((it)->hash * ((it)->ver>0))
45 #define DATA_file_start(data) (&((data)->size))
46 #define DATA_HEAD_SIZE (int)(((char*)&(((Data*)0)->head)) - (char*)(0))
47 #define DATA_BLOCK_SIZE 256
48 #define DATA_BLOCK_SIZE_SMALL 1024
49 
50 typedef struct t_data Data;
51 struct t_data
52 {
53     Data *next;
54     int size;
55     int used;
56     int count;
57     Item head[0];
58 };
59 
60 
61 typedef struct t_node Node;
62 struct t_node
63 {
64     uint16_t is_node:1;
65     uint16_t valid:1;
66     uint16_t depth:4;
67     uint16_t flag:9;
68     uint16_t hash;
69     uint32_t count;
70     Data *data;
71 };
72 
73 struct t_hash_tree
74 {
75     int depth;
76     int pos;
77     int height;
78     int block_size;
79     Node *root;
80     Codec *dc;
81     pthread_mutex_t lock;
82     char buf[TREE_BUF_SIZE];
83 
84     uint32_t updating_bucket;
85     HTree *updating_tree;
86 };
87 
88 
89 // forward dec
90 static void add_item(HTree *tree, Node *node, Item *it, uint32_t keyhash, bool enlarge);
91 static void remove_item(HTree *tree, Node *node, Item *it, uint32_t keyhash);
92 static void split_node(HTree *tree, Node *node);
93 static void merge_node(HTree *tree, Node *node);
94 static void update_node(HTree *tree, Node *node);
95 
check_version(Item * oldit,Item * newit,HTree * tree,uint32_t keyhash)96 static inline bool check_version(Item *oldit, Item *newit, HTree *tree, uint32_t keyhash)
97 {
98     if (abs(newit->ver) >= abs(oldit->ver))
99         return true;
100     else
101     {
102         char key[KEY_BUF_LEN];
103         dc_decode(tree->dc, key, KEY_BUF_LEN, oldit->key, oldit->ksz);
104         log_warn("BUG: bad version, oldv=%d, newv=%d, key=%s, keyhash = 0x%x, oldpos = %u",  oldit->ver, newit->ver, key, keyhash, oldit->pos);
105         return false;
106     }
107 }
108 
get_pos(HTree * tree,Node * node)109 static inline uint32_t get_pos(HTree *tree, Node *node)
110 {
111     return (node - tree->root) - g_index[(int)node->depth];
112 }
113 
get_child(HTree * tree,Node * node,int b)114 static inline Node *get_child(HTree *tree, Node *node, int b)
115 {
116     int i = g_index[node->depth + 1] + (get_pos(tree, node) << 4) + b;
117     return tree->root + i;
118 }
119 
init_data(Data * data,int size)120 static inline void init_data(Data *data, int size)
121 {
122     data->next = NULL;
123     data->count = 0;
124     data->used = DATA_HEAD_SIZE;
125     data->size = size;
126  }
127 
get_data(Node * node)128 static inline Data *get_data(Node *node)
129 {
130     return node->data;
131 }
132 
set_data(Node * node,Data * data)133 static inline void set_data(Node *node, Data *data)
134 {
135    node->data = data;
136 }
137 
free_data(Node * node)138 static inline void free_data(Node *node)
139 {
140     Data *d, *d0;
141     for (d = node->data; d != NULL; )
142     {
143         d0 = d;
144         d = d->next;
145         free(d0);
146     }
147     node->data = NULL;
148 }
149 
150 
151 
key_hash(HTree * tree,Item * it)152 static inline uint32_t key_hash(HTree *tree, Item *it)
153 {
154     char buf[KEY_BUF_LEN];
155     int n = dc_decode(tree->dc, buf, KEY_BUF_LEN, it->key, it->ksz);
156     return fnv1a(buf, n);
157 }
158 
create_item(HTree * tree,const char * key,int len,uint32_t pos,uint16_t hash,int32_t ver)159 static Item *create_item(HTree *tree, const char *key, int len, uint32_t pos, uint16_t hash, int32_t ver)
160 {
161     Item *it = (Item*)tree->buf;
162     it->pos = pos;
163     it->ver = ver;
164     it->hash = hash;
165     it->ksz = dc_encode(tree->dc, it->key, TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING), key, len);
166     return it;
167 }
168 
enlarge_pool(HTree * tree)169 static void enlarge_pool(HTree *tree)
170 {
171     int i;
172     int old_size = g_index[tree->height];
173     int new_size = g_index[tree->height + 1];
174 
175     log_notice("enlarge pool %d -> %d, new_height = %d", old_size, new_size, tree->height + 1);
176 
177     tree->root = (Node*)safe_realloc(tree->root, sizeof(Node) * new_size);
178     memset(tree->root + old_size, 0, sizeof(Node) * (new_size - old_size));
179     for (i = old_size; i<new_size; i++)
180         tree->root[i].depth = tree->height;
181 
182     tree->height++;
183 }
184 
clear(Node * node)185 static void clear(Node *node)
186 {
187     Data *data = (Data*)safe_malloc(64);
188     init_data(data, 64);
189     set_data(node, data);
190 
191     node->is_node = 0;
192     node->valid = 1;
193     node->count = 0;
194     node->hash = 0;
195 }
196 
add_item(HTree * tree,Node * node,Item * it,uint32_t keyhash,bool enlarge)197 static void add_item(HTree *tree, Node *node, Item *it, uint32_t keyhash, bool enlarge)
198 {
199     int it_len = ITEM_LENGTH(it);
200     while (node->is_node)
201     {
202         node->valid = 0;
203         node = get_child(tree, node, INDEX(it));
204     }
205 
206     Data *data0 = get_data(node);
207     Data *data;
208     for (data = data0; data != NULL;  data = data->next)
209     {
210         Item *p = data->head;
211         int i;
212         for (i = 0; i<data->count; ++i, p = (Item*)((char*)p + ITEM_LENGTH(p)))
213         {
214             if (it->ksz == p->ksz &&
215                     memcmp(it->key, p->key, it->ksz) == 0)
216             {
217                 check_version(p, it, tree, keyhash);
218                 node->hash += (HASH(it) - HASH(p)) * keyhash;
219                 node->count += (it->ver > 0);
220                 node->count -= (p->ver > 0);
221                 memcpy(p, it, sizeof(Item)); // safe
222                 return;
223             }
224         }
225     }
226 
227     Data *last = NULL;
228     Data *llast = NULL;
229     for (data = data0; data != NULL && data->size < data->used + it_len; llast = last, last = data, data = data->next)
230         ;
231 
232     if (data == NULL)
233     {
234         if (last->used + it_len > tree->block_size)
235         {
236             int size = DATA_HEAD_SIZE + it_len;
237             data = (Data*)safe_malloc(size);
238             init_data(data, size);
239             last->next = data;
240         }
241         else
242         {
243             int size = max(last->used + it_len, last->size);
244             size = min(size, tree->block_size);
245             data = (Data*)safe_realloc(last, size);
246             data->size = size;
247 
248             if (llast)
249                 llast->next = data;
250             else
251                 set_data(node, data);
252         }
253     }
254 
255     Item *p = (Item*)(((char*)data) + data->used);
256     safe_memcpy(p, data->size - data->used, it, it_len);
257     data->count++;
258     data->used += it_len;
259     node->count += (it->ver > 0);
260     node->hash += keyhash * HASH(it);
261 
262     if (node->count > SPLIT_LIMIT)
263     {
264         if (node->depth == tree->height - 1)
265         {
266             if (enlarge && (tree->height + tree->depth < MAX_DEPTH) && (node->count > SPLIT_LIMIT * 4))
267             {
268                 int pos = node - tree->root;
269                 enlarge_pool(tree);
270                 node = tree->root + pos; // reload
271                 split_node(tree, node);
272             }
273         }
274         else
275         {
276             split_node(tree, node);
277         }
278     }
279 }
280 
split_node(HTree * tree,Node * node)281 static void split_node(HTree *tree, Node *node)
282 {
283     Node *child = get_child(tree, node, 0);
284     int i;
285     for (i = 0; i < BUCKET_SIZE; ++i)
286         clear(child + i);
287 
288     Data *data0 = get_data(node);
289     Data *data;
290     for (data = data0; data != NULL; data = data->next)
291     {
292 	    Item *it = data->head;
293 	    for (i = 0; i < data->count; ++i)
294 	    {
295 		    int32_t keyhash = key_hash(tree, it);
296 		    add_item(tree, child + INDEX(it), it, keyhash, false);
297 		    it = (Item*)((char*)it + ITEM_LENGTH(it));
298 	    }
299     }
300 
301     free_data(node);
302 
303     node->is_node = 1;
304     node->valid = 0;
305 }
306 
remove_item(HTree * tree,Node * node,Item * it,uint32_t keyhash)307 static void remove_item(HTree *tree, Node *node, Item *it, uint32_t keyhash)
308 {
309     while (node->is_node)
310     {
311         node->valid = 0;
312         node = get_child(tree, node, INDEX(it));
313     }
314 
315     Data *data0 = get_data(node);
316     Data *data;
317     Data *last = NULL;
318     for (data = data0; data != NULL; last = data, data = data->next)
319     {
320         Item *p = data->head;
321         int i;
322         int p_len;
323         for (i = 0; i < data->count; ++i, p = (Item*)((char*)p + p_len))
324         {
325             p_len = ITEM_LENGTH(p);
326             if (it->ksz == p->ksz &&
327                     memcmp(it->key, p->key, it->ksz) == 0)
328             {
329                 data->count--;
330                 data->used -= p_len;
331                 node->count -= p->ver > 0;
332                 node->hash -= keyhash * HASH(p);
333                 if (data->count > 0)
334                 {
335                     memmove(p, (char*)p + p_len, data->size - ((char*)p - (char*)data) - p_len);
336                 }
337                 else if (data != data0 && data->next != NULL) //neither first nor last
338                 {
339                     last->next = data->next;
340                     free(data);
341                 }
342                 return;
343             }
344         }
345     }
346 }
347 
merge_node(HTree * tree,Node * node)348 static void merge_node(HTree *tree, Node *node)
349 {
350     clear(node);
351 
352     Node *child = get_child(tree, node, 0);
353     int i, j;
354     for (i = 0; i < BUCKET_SIZE; ++i)
355     {
356         Data *data0 = get_data(child + i);
357         Data *data;
358         for (data = data0; data != NULL; data = data->next)
359         {
360             Item *it = data->head;
361             for (j = 0; j < data->count; ++j, it = (Item*)((char*)it + ITEM_LENGTH(it)))
362             {
363                 if (it->ver > 0)
364                 {
365                     add_item(tree, node, it, key_hash(tree, it), false);
366                 } // drop deleted items, ver < 0
367             }
368         }
369         free_data(child + i);
370     }
371 }
372 
update_node(HTree * tree,Node * node)373 static void update_node(HTree *tree, Node *node)
374 {
375     if (node->valid) return;
376 
377     int i;
378     node->hash = 0;
379     if (node->is_node)
380     {
381         Node *child = get_child(tree, node, 0);
382         node->count = 0;
383         for (i = 0; i < BUCKET_SIZE; i++)
384         {
385             update_node(tree, child+i);
386             node->count += child[i].count;
387         }
388         for (i = 0; i < BUCKET_SIZE; i++)
389         {
390             if (node->count > SPLIT_LIMIT * 4)
391             {
392                 node->hash *= 97;
393             }
394             node->hash += child[i].hash;
395         }
396     }
397     node->valid = 1;
398 
399     // merge nodes
400     if (node->count <= SPLIT_LIMIT)
401     {
402         merge_node(tree, node);
403     }
404 }
405 
get_item_hash(HTree * tree,Node * node,Item * it,uint32_t keyhash)406 static Item *get_item_hash(HTree *tree, Node *node, Item *it, uint32_t keyhash)
407 {
408     while (node->is_node)
409         node = get_child(tree, node, INDEX(it));
410 
411     Item *r = NULL;
412     Data *data0 = get_data(node);
413     Data *data;
414     for (data = data0; data != NULL && r == NULL; data = data->next)
415     {
416         Item *p = data->head;
417         int i;
418         int p_len;
419         for (i = 0; i<data->count; i++, p = (Item*)((char*)p + p_len))
420         {
421             p_len = ITEM_LENGTH(p);
422             if (ITEM_LENGTH(it) == p_len &&
423                     memcmp(it->key, p->key, it->ksz) == 0)
424             {
425                 r = p;
426                 break;
427             }
428         }
429     }
430     return r;
431 }
432 
hex2int(char b)433 static inline int hex2int(char b)
434 {
435     if (('0'<=b && b<='9') || ('a'<=b && b<='f'))
436     {
437         return (b>='a') ?  (b-'a'+10) : (b-'0');
438     }
439     else
440     {
441         return -1;
442     }
443 }
444 
get_node_hash(HTree * tree,Node * node,const char * dir,unsigned int * count)445 static uint16_t get_node_hash(HTree *tree, Node *node, const char *dir,
446                               unsigned int *count)
447 {
448     if (node->is_node && strlen(dir) > 0)
449     {
450         char i = hex2int(dir[0]);
451         if (i >= 0)
452         {
453             return get_node_hash(tree, get_child(tree, node, i), dir + 1, count);
454         }
455         else
456         {
457             if(count) *count = 0;
458             return 0;
459         }
460     }
461     update_node(tree, node);
462     if (count) *count = node->count;
463     return node->hash;
464 }
465 
list_dir(HTree * tree,Node * node,const char * dir,const char * prefix)466 static char *list_dir(HTree *tree, Node *node, const char *dir, const char *prefix)
467 {
468     int dlen = strlen(dir);
469     while (node->is_node && dlen > 0)
470     {
471         int b = hex2int(dir[0]);
472         if (b >= 0 && b < BUCKET_SIZE)
473         {
474             node = get_child(tree, node, b);
475             ++dir;
476             --dlen;
477         }
478         else
479         {
480             return NULL;
481         }
482     }
483 
484     int bsize = 4096;
485     char *buf = (char*)safe_malloc(bsize);
486     memset(buf, 0, bsize);
487     int n = 0, i;
488     if (node->is_node)
489     {
490         update_node(tree, node);
491 
492         Node *child = get_child(tree, node, 0);
493         if (node->count > 100000 || (prefix == NULL && node->count > SPLIT_LIMIT * 4))
494         {
495             for (i = 0; i < BUCKET_SIZE; i++)
496             {
497                 Node *t = child + i;
498                 n += safe_snprintf(buf + n, bsize - n, "%x/ %u %u\n",
499                               i, t->hash, t->count);
500             }
501         }
502         else
503         {
504             for (i = 0; i < BUCKET_SIZE; i++)
505             {
506                 char *r = list_dir(tree, child + i, "", prefix);
507                 int rl = strlen(r) + 1;
508                 if (bsize - n < rl)
509                 {
510                     bsize += rl;
511                     buf = (char*)safe_realloc(buf, bsize);
512                 }
513                 n += safe_snprintf(buf + n, bsize - n, "%s", r);
514                 free(r);
515             }
516         }
517     }
518     else
519     {
520         char pbuf[20], key[KEY_BUF_LEN];
521         int prefix_len = 0;
522         if (prefix != NULL)
523             prefix_len = strlen(prefix);
524 
525         Data *data0 = get_data(node);
526         Data *data;
527         for (data = data0; data != NULL;  data = data->next)
528         {
529             Item *it = data->head;
530             for (i = 0; i < data->count; i++, it = (Item*)((char*)it + ITEM_LENGTH(it)))
531             {
532                 if (dlen > 0)
533                 {
534                     safe_snprintf(pbuf, 20, "%08x", key_hash(tree, it));
535                     if (memcmp(pbuf + tree->depth + node->depth, dir, dlen) != 0)
536                     {
537                         continue;
538                     }
539                 }
540                 int l = dc_decode(tree->dc, key, KEY_BUF_LEN, it->key, it->ksz);
541                 if (prefix == NULL || (l >= prefix_len && strncmp(key, prefix, prefix_len) == 0))
542                 {
543                     if (bsize - n < KEY_BUF_LEN + 32)
544                     {
545                         bsize *= 2;
546                         buf = (char*)safe_realloc(buf, bsize);
547                     }
548 
549                     n += safe_snprintf(buf + n, bsize - n, "%s %u %d\n", key, it->hash, it->ver);
550                 }
551             }
552         }
553     }
554     return buf;
555 }
556 
visit_node(HTree * tree,Node * node,fun_visitor visitor,void * param)557 static void visit_node(HTree *tree, Node *node, fun_visitor visitor, void *param)
558 {
559     int i;
560     if (node->is_node)
561     {
562         Node *child = get_child(tree, node, 0);
563         for (i = 0; i < BUCKET_SIZE; i++)
564         {
565             visit_node(tree, child + i, visitor, param);
566         }
567     }
568     else
569     {
570         Data *data0 = get_data(node);
571         Data *data;
572         for (data = data0; data != NULL; data = data->next)
573         {
574             Item *p = data->head;
575             Item *it = (Item*)tree->buf;
576             int buf_size = TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING);;
577             int decode_len;
578             for (i = 0; i < data->count; i++, p = (Item*)((char*)p + ITEM_LENGTH(p)))
579             {
580                 safe_memcpy(it, buf_size, p, sizeof(Item));
581                 decode_len = dc_decode(tree->dc, it->key, buf_size, p->key, p->ksz);
582                 it->ksz = strlen(it->key);
583                 visitor(it, param);
584             }
585         }
586     }
587 }
588 
589 /*
590  * API
591  */
592 
ht_new(int depth,int pos,bool tmp)593 HTree *ht_new(int depth, int pos, bool tmp)
594 {
595     HTree *tree = (HTree*)safe_malloc(sizeof(HTree));
596     memset(tree, 0, sizeof(HTree));
597     tree->depth = depth;
598     tree->pos = pos;
599     tree->height = 1;
600     if (tmp)
601         tree->block_size = DATA_BLOCK_SIZE_SMALL;
602     else
603         tree->block_size = DATA_BLOCK_SIZE;
604     tree->updating_bucket = -1;
605     tree->updating_tree = NULL;
606 
607     int pool_size = g_index[tree->height];
608     Node *root = (Node*)safe_malloc(sizeof(Node) * pool_size);
609 
610     memset(root, 0, sizeof(Node) * pool_size);
611 
612     // init depth
613     int i,j;
614     for (i = 0; i < tree->height; i++)
615     {
616         for (j = g_index[i]; j < g_index[i + 1]; j++)
617         {
618             root[j].depth = i;
619         }
620     }
621 
622     tree->root = root;
623     clear(tree->root);
624 
625     tree->dc = dc_new();
626     pthread_mutex_init(&tree->lock, NULL);
627 
628     return tree;
629 }
630 
ht_open(int depth,int pos,const char * path)631 HTree *ht_open(int depth, int pos, const char *path)
632 {
633     char version[sizeof(HTREE_VERSION) + 1] = {0};
634     HTree *tree = NULL;
635     Node *root = NULL;
636     int pool_used = 0;
637     char *buf = NULL;
638 
639     FILE *f = fopen(path, "rb");
640     if (f == NULL)
641     {
642         log_error("open %s failed", path);
643         return NULL;
644     }
645 
646     if (fread(version, sizeof(HTREE_VERSION), 1, f) != 1
647             || memcmp(version, HTREE_VERSION, sizeof(HTREE_VERSION)) != 0)
648     {
649         log_error("the version %s is not expected", version);
650         fclose(f);
651         return NULL;
652     }
653 
654     off_t fsize = 0;
655     if (fread(&fsize, sizeof(fsize), 1, f) != 1 ||
656             fseeko(f, 0, 2) != 0 || ftello(f) != fsize)
657     {
658         log_error("the size %lld is not expected", (long long int)fsize);
659         fclose(f);
660         return NULL;
661     }
662     fseeko(f, sizeof(HTREE_VERSION) + sizeof(off_t), 0);
663 #if _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L
664     if (posix_fadvise(fileno(f), 0, fsize, POSIX_FADV_SEQUENTIAL) != 0)
665     {
666         log_error("posix_favise() failed");
667     }
668 #endif
669 
670     tree = (HTree*)safe_malloc(sizeof(HTree));
671 
672     memset(tree, 0, sizeof(HTree));
673     tree->depth = depth;
674     tree->pos = pos;
675     tree->updating_bucket = -1;
676     tree->block_size = DATA_BLOCK_SIZE;
677 
678     if (fread(&tree->height, sizeof(int), 1, f) != 1 ||
679             tree->height + depth < 0 || tree->height + depth > 9)
680     {
681         log_error("invalid height: %d", tree->height);
682         goto FAIL;
683     }
684 
685     int pool_size = g_index[tree->height];
686     int psize = sizeof(Node) * pool_size;
687     root = (Node*)safe_malloc(psize);
688 
689     if (fread(root, psize, 1, f) != 1)
690     {
691         goto FAIL;
692     }
693     tree->root = root;
694 
695     // load Data
696     int i,size = 0;
697     Data *data;
698     for (i = 0; i < pool_size; i++)
699     {
700         if(fread(&size, sizeof(int), 1, f) != 1)
701         {
702             goto FAIL;
703         }
704         if (size > 0)
705         {
706             data = (Data*)safe_malloc(size + sizeof(Data*));
707             if (fread(DATA_file_start(data), size, 1, f) != 1)
708             {
709                 log_error("load data: size %d fail", size);
710                 goto FAIL;
711             }
712             if (data->used != size)
713             {
714                 log_error("broken data: %d != %d", data->used, size);
715                 goto FAIL;
716             }
717             data->used = data->size = size + sizeof(Data*);
718             data->next = NULL;
719         }
720         else if (size == 0)
721         {
722             data = NULL;
723         }
724         else
725         {
726             log_error("unexpected size: %d", size);
727             goto FAIL;
728         }
729         tree->root[i].data = data;
730         pool_used++;
731     }
732 
733 
734     // load Codec
735     if (fread(&size, sizeof(int), 1, f) != 1
736             || size < 0 || size > (10<<20))
737     {
738         log_error("bad codec size: %d", size);
739         goto FAIL;
740     }
741     buf = (char*)safe_malloc(size);
742     if (fread(buf, size, 1, f) != 1)
743     {
744         log_error("read codec failed");
745         goto FAIL;
746     }
747     tree->dc = dc_new();
748     if (dc_load(tree->dc, buf, size) != 0)
749     {
750         log_error("load codec failed");
751         goto FAIL;
752     }
753     free(buf);
754     fclose(f);
755 
756     pthread_mutex_init(&tree->lock, NULL);
757 
758     return tree;
759 
760 FAIL:
761     if (tree->dc)
762         dc_destroy(tree->dc);
763     if (buf)
764         free(buf);
765     if (root)
766     {
767         for (i = 0; i < pool_used; i++)
768         {
769             if (root[i].data) free(root[i].data);
770         }
771         free(root);
772     }
773     free(tree);
774     fclose(f);
775     return NULL;
776 }
777 
ht_save2(HTree * tree,FILE * f)778 static int ht_save2(HTree *tree, FILE *f)
779 {
780     size_t last_advise = 0;
781     int fd = fileno(f);
782 
783     off_t pos = 0;
784     if (fwrite(HTREE_VERSION, sizeof(HTREE_VERSION), 1, f) != 1 ||
785             fwrite(&pos, sizeof(off_t), 1, f) != 1)
786     {
787         log_error("write version failed");
788         return -1;
789     }
790 
791     int pool_size = g_index[tree->height];
792     if (fwrite(&tree->height, sizeof(int), 1, f) != 1 ||
793             fwrite(tree->root, sizeof(Node) * pool_size, 1, f) != 1 )
794     {
795         log_error("write nodes failed");
796         return -1;
797     }
798 
799     int i, zero = 0;
800     for (i = 0; i < pool_size; i++)
801     {
802         Data *data0= tree->root[i].data;
803         if (data0)
804         {
805             Data new_data;
806             init_data(&new_data, 0);
807 	        Data *data;
808             for (data = data0; data != NULL; data = data->next)
809             {
810                 new_data.count += data->count;
811                 new_data.used += data->used - DATA_HEAD_SIZE;
812             }
813             new_data.used -= sizeof(Data*);
814 
815             if (fwrite(&(new_data.used), sizeof(int), 1, f) != 1 || fwrite(DATA_file_start(&new_data), DATA_HEAD_SIZE - sizeof(Data*), 1, f) != 1)
816             {
817                 log_error("write data head failed");
818                 return -1;
819             }
820 
821             for (data = data0; data != NULL; data = data->next)
822             {
823                 if (data->count == 0)
824                     continue;
825                 if ( fwrite(((char*)data) + DATA_HEAD_SIZE, data->used - DATA_HEAD_SIZE, 1, f) != 1)
826                 {
827                     log_error("write data failed");
828                     return -1;
829                 }
830             }
831             file_dontneed(fd, ftello(f), &last_advise);
832         }
833         else
834         {
835             if (fwrite(&zero, sizeof(int), 1, f) != 1)
836             {
837                 log_error("write zero failed");
838                 return -1;
839             }
840         }
841     }
842 
843     int s = dc_size(tree->dc);
844     char *buf = (char*)safe_malloc(s + sizeof(int));
845     *(int*)buf = s;
846     if (dc_dump(tree->dc, buf + sizeof(int), s) != s)
847     {
848         log_error("dump Codec failed");
849         free(buf);
850         return -1;
851     }
852     if (fwrite(buf, s + sizeof(int), 1, f) != 1)
853     {
854         log_error("write Codec failed");
855         free(buf);
856         return -1;
857     }
858     free(buf);
859 
860     pos = ftello(f);
861     fseeko(f, sizeof(HTREE_VERSION), 0);
862     if (fwrite(&pos, sizeof(off_t), 1, f) != 1)
863     {
864         log_error("write size failed");
865         return -1;
866     }
867 
868     return 0;
869 }
870 
ht_save(HTree * tree,const char * path)871 int ht_save(HTree *tree, const char *path)
872 {
873     if (!tree || !path) return -1;
874 
875     mgr_unlink(path);
876 
877     char tmp[MAX_PATH_LEN];
878     safe_snprintf(tmp, MAX_PATH_LEN, "%s.tmp", path);
879 
880     FILE *f = fopen(tmp, "wb");
881     if (f == NULL)
882     {
883         log_error("open %s failed", tmp);
884         return -1;
885     }
886 
887     int buf_size = 1024*1024;
888     char *buff = (char*)malloc(buf_size);
889     memset( buff, '\0', buf_size);
890     setvbuf(f, buff, _IOFBF, buf_size);
891 
892     uint64_t file_size = 0;
893     struct timeval save_start, save_end;
894     gettimeofday(&save_start, NULL);
895 
896     pthread_mutex_lock(&tree->lock);
897     int ret = ht_save2(tree, f);
898     pthread_mutex_unlock(&tree->lock);
899     if (ret == 0)
900     {
901         fseeko(f, 0, SEEK_END);
902         file_size = ftello(f);
903     }
904     fclose(f);
905     free(buff);
906 
907     if (ret == 0)
908     {
909         gettimeofday(&save_end, NULL);
910         float save_secs = (save_end.tv_sec - save_start.tv_sec) + (save_end.tv_usec - save_start.tv_usec) / 1e6;
911         log_notice("save HTree to %s, size = %"PRIu64", in %f secs", path, file_size, save_secs);
912         mgr_rename(tmp, path);
913     }
914     else
915         mgr_unlink(tmp);
916     return ret;
917 }
918 
ht_destroy(HTree * tree)919 void ht_destroy(HTree *tree)
920 {
921     if (!tree) return;
922 
923     pthread_mutex_lock(&tree->lock);
924 
925     dc_destroy(tree->dc);
926 
927     int i;
928     int pool_size = g_index[tree->height];
929     for(i = 0; i < pool_size; i++)
930     {
931         if (tree->root[i].data)
932             free_data(tree->root + i);
933     }
934     free(tree->root);
935     free(tree);
936 }
937 
keyhash(const char * s,int len)938 static inline uint32_t keyhash(const char *s, int len)
939 {
940     return fnv1a(s, len);
941 }
942 
check_key(const char * key,int len)943 bool check_key(const char *key, int len)
944 {
945     if (!key) return false;
946     if (len == 0 || len > MAX_KEY_LEN)
947     {
948         log_error("bad key len=%d", len);
949         return false;
950     }
951     if (key[0] <= ' ')
952     {
953         log_error("bad key len=%d %x", len, key[0]);
954         return false;
955     }
956     int k;
957     for (k = 0; k < len; k++)
958     {
959         if (isspace(key[k]) || iscntrl(key[k]))
960         {
961             log_error("bad key len=%d %s", len, key);
962             return false;
963         }
964     }
965     return true;
966 }
967 
check_bucket(HTree * tree,const char * key,int len)968 bool check_bucket(HTree *tree, const char* key, int len)
969 {
970     uint32_t h = keyhash(key, len);
971     if (tree->depth > 0 && h >> ((8-tree->depth) * 4) != (unsigned int)(tree->pos))
972     {
973         log_error("key %s (#%x) should not in this tree (%d:%0x)", key, h >> ((8-tree->depth) * 4), tree->depth, tree->pos);
974         return false;
975     }
976 
977     return true;
978 }
979 
ht_add2(HTree * tree,const char * key,int len,uint32_t pos,uint16_t hash,int32_t ver)980 void ht_add2(HTree *tree, const char *key, int len, uint32_t pos, uint16_t hash, int32_t ver)
981 {
982     if (!check_bucket(tree, key, len)) return;
983     Item *it = create_item(tree, key, len, pos, hash, ver);
984     add_item(tree, tree->root, it, keyhash(key, len), true);
985 }
986 
ht_add(HTree * tree,const char * key,uint32_t pos,uint16_t hash,int32_t ver)987 void ht_add(HTree *tree, const char *key, uint32_t pos, uint16_t hash, int32_t ver)
988 {
989     pthread_mutex_lock(&tree->lock);
990     ht_add2(tree, key, strlen(key), pos, hash, ver);
991     pthread_mutex_unlock(&tree->lock);
992 }
993 
ht_remove2(HTree * tree,const char * key,int len)994 void ht_remove2(HTree *tree, const char *key, int len)
995 {
996     if (!check_bucket(tree, key, len)) return;
997     Item *it = create_item(tree, key, len, 0, 0, 0);
998     remove_item(tree, tree->root, it, keyhash(key, len));
999 }
1000 
ht_remove(HTree * tree,const char * key)1001 void ht_remove(HTree *tree, const char *key)
1002 {
1003     pthread_mutex_lock(&tree->lock);
1004     ht_remove2(tree, key, strlen(key));
1005     pthread_mutex_unlock(&tree->lock);
1006 }
1007 
ht_get2(HTree * tree,const char * key,int len)1008 Item *ht_get2(HTree *tree, const char *key, int len)
1009 {
1010     if (!check_bucket(tree, key, len)) return NULL;
1011 
1012     pthread_mutex_lock(&tree->lock);
1013     Item *it = create_item(tree, key, len, 0, 0, 0);
1014     Item *r = get_item_hash(tree, tree->root, it, keyhash(key, len));
1015     if (r != NULL)
1016     {
1017         Item *rr = (Item*)safe_malloc(sizeof(Item) + len);
1018         memcpy(rr, r, sizeof(Item)); // safe
1019         memcpy(rr->key, key, len);  // safe
1020         rr->key[len] = 0; // c-str
1021         r = rr; // r is in node->Data block
1022     }
1023     pthread_mutex_unlock(&tree->lock);
1024     return r;
1025 }
1026 
ht_get(HTree * tree,const char * key)1027 Item *ht_get(HTree *tree, const char *key)
1028 {
1029     return ht_get2(tree, key, strlen(key));
1030 }
1031 
ht_get_hash(HTree * tree,const char * key,unsigned int * count)1032 uint32_t ht_get_hash(HTree *tree, const char *key, unsigned int *count)
1033 {
1034     if (!tree || !key || key[0] != '@')
1035     {
1036         if(count) *count = 0;
1037         return 0;
1038     }
1039 
1040     uint32_t hash = 0;
1041     pthread_mutex_lock(&tree->lock);
1042     update_node(tree, tree->root);
1043     hash = get_node_hash(tree, tree->root, key+1, count);
1044     pthread_mutex_unlock(&tree->lock);
1045     return hash;
1046 }
1047 
ht_list(HTree * tree,const char * dir,const char * prefix)1048 char *ht_list(HTree *tree, const char *dir, const char *prefix)
1049 {
1050     if (!tree || !dir || strlen(dir) > 8) return NULL;
1051     if (prefix != NULL && strlen(prefix) == 0) prefix = NULL;
1052 
1053     pthread_mutex_lock(&tree->lock);
1054     char *r = list_dir(tree, tree->root, dir, prefix);
1055     pthread_mutex_unlock(&tree->lock);
1056 
1057     return r;
1058 }
1059 
ht_visit(HTree * tree,fun_visitor visitor,void * param)1060 void ht_visit(HTree *tree, fun_visitor visitor, void *param)
1061 {
1062     pthread_mutex_lock(&tree->lock);
1063     visit_node(tree, tree->root, visitor, param);
1064     pthread_mutex_unlock(&tree->lock);
1065 }
1066 
ht_visit2(HTree * tree,fun_visitor visitor,void * param)1067 void ht_visit2(HTree *tree, fun_visitor visitor, void *param)
1068 {
1069     visit_node(tree, tree->root, visitor, param);
1070 }
1071 
ht_set_updating_bucket(HTree * tree,int bucket,HTree * updating_tree)1072 void ht_set_updating_bucket(HTree *tree, int bucket, HTree *updating_tree)
1073 {
1074     log_notice("updating bucket %d for htree 0x%x", bucket, tree->pos);
1075     pthread_mutex_lock(&tree->lock);
1076     tree->updating_bucket = bucket;
1077     tree->updating_tree = updating_tree;
1078     pthread_mutex_unlock(&tree->lock);
1079 }
1080 
ht_get_withbuf(HTree * tree,const char * key,int len,char * buf,bool lock)1081 Item *ht_get_withbuf(HTree *tree, const char *key, int len, char *buf, bool lock)
1082 {
1083     if (!check_bucket(tree, key, len)) return NULL;
1084 
1085     Item *it = (Item*)buf;
1086     it->ksz = dc_encode(tree->dc, it->key, TREE_BUF_SIZE - (sizeof(Item) - ITEM_PADDING), key, len);
1087 
1088     if (lock)
1089         pthread_mutex_lock(&tree->lock);
1090     Item *r = get_item_hash(tree, tree->root, it, keyhash(key, len));
1091     if (r != NULL)
1092     {
1093         int l = ITEM_LENGTH(it);
1094         memcpy(it, r, l); // safe
1095         buf[l] = 0; // c-str
1096         r = it;
1097     }
1098     if (lock)
1099         pthread_mutex_unlock(&tree->lock);
1100     return r;
1101 }
1102 
1103 
ht_get_maybe_tmp(HTree * tree,const char * key,int * is_tmp,char * buf)1104 Item *ht_get_maybe_tmp(HTree *tree, const char *key, int *is_tmp, char *buf)
1105 {
1106     *is_tmp = 0;
1107     Item *item = ht_get_withbuf(tree, key, strlen(key), buf, true);
1108     if (NULL != item)
1109     {
1110         uint32_t bucket = item->pos & 0xff;
1111         if (tree->updating_bucket == bucket)
1112         {
1113             pthread_mutex_lock(&tree->lock);
1114             if (tree->updating_bucket == bucket)
1115             {
1116                 log_debug("get tmp for %s", key);
1117                 *is_tmp = 1;
1118                 item = ht_get_withbuf(tree->updating_tree, key, strlen(key), buf, false);
1119             }
1120             else
1121             {
1122                 log_notice("get again for %s", key);
1123                 item = ht_get_withbuf(tree, key, strlen(key), buf, false);
1124             }
1125             pthread_mutex_unlock(&tree->lock);
1126         }
1127     }
1128     return item;
1129 }
1130 
1131