1 /*
2  *  Beansdb - A high available distributed key-value storage system:
3  *
4  *      http://beansdb.googlecode.com
5  *
6  *  Copyright 2010 Douban Inc.  All rights reserved.
7  *
8  *  Use and distribution licensed under the BSD license.  See
9  *  the LICENSE file for full text.
10  *
11  *  Authors:
12  *      Davies Liu <davies.liu@gmail.com>
13  *      Hurricane Lee <hurricane1026@gmail.com>
14  *
15  */
16 
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20 
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/uio.h>
24 #include <sys/time.h>
25 
26 #if HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29 
30 #include <stdlib.h>
31 #include <stdint.h>
32 #include <string.h>
33 
34 #include "record.h"
35 #include "hint.h"
36 #include "crc32.c"
37 #include "diskmgr.h"
38 #include "quicklz.h"
39 #include "fnv1a.h"
40 
41 #include "mfile.h"
42 #include "util.h"
43 #include "const.h"
44 #include "log.h"
45 
46 
47 const int PADDING = 256;
48 const int32_t COMPRESS_FLAG = 0x00010000;
49 const int32_t CLIENT_COMPRESS_FLAG = 0x00000010;
50 const float COMPRESS_RATIO_LIMIT = 0.7;
51 const int TRY_COMPRESS_SIZE = 1024 * 10;
52 
bad_kv_size(uint32_t ksz,uint32_t vsz)53 static inline bool bad_kv_size(uint32_t ksz, uint32_t vsz)
54 {
55     return ((ksz == 0 || ksz > MAX_KEY_LEN)|| vsz > MAX_VALUE_LEN);
56 }
57 
gen_hash(char * buf,int len)58 uint32_t gen_hash(char *buf, int len)
59 {
60     uint32_t hash = len * 97;
61     if (len <= 1024)
62     {
63         hash += fnv1a(buf, len);
64     }
65     else
66     {
67         hash += fnv1a(buf, 512);
68         hash *= 97;
69         hash += fnv1a(buf + len - 512, 512);
70     }
71     return hash;
72 }
73 
record_length(DataRecord * r)74 int record_length(DataRecord *r)
75 {
76     size_t n = sizeof(DataRecord) - sizeof(char*) + r->ksz + r->vsz;
77     /*
78     if (n % PADDING != 0)
79     {
80         n += PADDING - (n % PADDING);
81     }
82     */
83     return (n / PADDING + (int)!!(n % PADDING)) * PADDING;
84 }
85 
record_value(DataRecord * r)86 char *record_value(DataRecord *r)
87 {
88     char *res = r->value;
89     if (res == r->key + r->ksz + 1)
90     {
91         // value was alloced in record
92         res = (char*)safe_malloc(r->vsz);
93         memcpy(res, r->value, r->vsz); // safe
94     }
95     return res;
96 }
97 
free_record(DataRecord ** r)98 void free_record(DataRecord **r)
99 {
100     if (r == NULL || (*r) == NULL) return;
101     if ((*r)->value != NULL && (*r)->free_value) free((*r)->value);
102     free(*r);
103     *r = NULL;
104 }
105 
compress_record(DataRecord * r)106 void compress_record(DataRecord *r)
107 {
108     if (r->flag & COMPRESS_FLAG) return;
109     int ksz = r->ksz, vsz = r->vsz;
110     int n = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
111     if (n > PADDING && (r->flag & (COMPRESS_FLAG|CLIENT_COMPRESS_FLAG)) == 0)
112     {
113         char *wbuf = (char*)try_malloc(QLZ_SCRATCH_COMPRESS);
114         char *v = (char*)try_malloc(vsz + 400);
115         if (wbuf == NULL || v == NULL) return ;
116         int try_size = vsz > TRY_COMPRESS_SIZE ? TRY_COMPRESS_SIZE : vsz;
117         int vsize = qlz_compress(r->value, v, try_size, wbuf);
118         if (try_size < vsz && vsize < try_size * COMPRESS_RATIO_LIMIT)
119         {
120             try_size = vsz;
121             vsize = qlz_compress(r->value, v, try_size, wbuf);
122         }
123         free(wbuf);
124 
125         if (vsize > try_size * COMPRESS_RATIO_LIMIT || try_size < vsz)
126         {
127             free(v);
128             return;
129         }
130 
131         if (r->free_value)
132         {
133             free(r->value);
134         }
135         r->value = v;
136         r->free_value = true;
137         r->vsz = vsize;
138         r->flag |= COMPRESS_FLAG;
139     }
140 }
141 
decompress_record(DataRecord * r)142 DataRecord *decompress_record(DataRecord *r)
143 {
144     if (r->flag & COMPRESS_FLAG)
145     {
146         char scratch[QLZ_SCRATCH_DECOMPRESS];
147         unsigned int csize = qlz_size_compressed(r->value);
148         if (csize != r->vsz)
149         {
150             log_error("broken compressed data: %d != %d, flag=%x", csize, r->vsz, r->flag);
151             goto DECOMP_END;
152         }
153         unsigned int size = qlz_size_decompressed(r->value);
154         char *v = (char*)safe_malloc(size);
155         unsigned int ret = qlz_decompress(r->value, v, scratch);
156         if (ret != size)
157         {
158             log_error("decompress %s failed: %d != %d", r->key, ret, size);
159             goto DECOMP_END;
160         }
161         if (r->free_value)
162         {
163             free(r->value);
164         }
165         r->value = v;
166         r->free_value = true;
167         r->vsz = size;
168         r->flag &= ~COMPRESS_FLAG;
169     }
170     return r;
171 
172 DECOMP_END:
173     free_record(&r);
174     return NULL;
175 }
176 
decode_record(char * buf,uint32_t size,bool decomp,const char * path,uint32_t pos,const char * key,bool do_logging,int * fail_reason)177 DataRecord *decode_record(char *buf, uint32_t size, bool decomp, const char *path, uint32_t pos, const char *key, bool do_logging, int *fail_reason)
178 {
179     DataRecord *r = (DataRecord *) (buf - sizeof(char*));
180     uint32_t ksz = r->ksz, vsz = r->vsz;
181     if (bad_kv_size(ksz, vsz))
182     {
183         if (do_logging)
184             log_error("invalid ksz=%u, vsz=%u, %s @%u, key = (%s)", ksz, vsz, path, pos, key);
185         if (fail_reason)
186             *fail_reason = BAD_REC_SIZE;
187         return NULL;
188     }
189 
190     unsigned int need = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
191     if (size < need)
192     {
193         if (do_logging)
194             log_error("not enough data in buffer %d < %d, %s @%u,  key = (%s) ", size, need, path, pos, key);
195         if (fail_reason)
196             *fail_reason = BAD_REC_END;
197         return NULL;
198     }
199     uint32_t crc = crc32(0, (unsigned char*)buf + sizeof(uint32_t),  need - sizeof(uint32_t));
200     if (r->crc != crc)
201     {
202         if (do_logging)
203             log_error("CHECKSUM %u != %u, %s @%u, get (%s) got (%s)", crc, r->crc,  path, pos, key, r->key);
204         if (fail_reason)
205             *fail_reason = BAD_REC_CRC;
206         return NULL;
207     }
208 
209     DataRecord *r2 = (DataRecord *)safe_malloc(need + 1 + sizeof(char*));
210     memcpy(&r2->crc, &r->crc, sizeof(DataRecord) - sizeof(char*) + ksz); // safe
211     r2->key[ksz] = 0; // c str
212     r2->free_value = false;
213     r2->value = r2->key + ksz + 1;
214     memcpy(r2->value, r->key + ksz, vsz); // safe
215 
216     if (decomp)
217     {
218         r2 = decompress_record(r2);
219         if (r2 == NULL && fail_reason)
220            *fail_reason = BAD_REC_DECOMPRESS;
221     }
222     return r2;
223 }
224 
scan_record(char * begin,char * end,char ** curr,const char * path,int * num_broken_total,HTree * tree,int bucket)225 static inline DataRecord *scan_record(char *begin, char *end,  char **curr,
226         const char *path, int *num_broken_total, HTree *tree, int bucket)
227 {
228     int num_broken_curr = 0;
229     while (*curr <  end)
230     {
231         char *p = *curr;
232         int bad_reason = 0;
233         bool do_logging = true;
234         if (num_broken_curr > 10000)
235             do_logging = false;
236 
237         DataRecord *r = decode_record(p, end-p, false,  path, p - begin, "nokey", do_logging,  &bad_reason);
238         if (r != NULL)
239         {
240             if (num_broken_curr > 0)
241             {
242                 log_error("END_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
243                 num_broken_curr = 0;
244             }
245             return r;
246         }
247         else
248         {
249             if (num_broken_curr == 0)
250             {
251                 DataRecord *ro = (DataRecord *) (p - sizeof(char*));
252                 log_error("START_BROKEN in %s at %ld", path, p - begin);
253                 uint32_t ksz = ro->ksz;
254                 if (ksz > 0 && ksz <= MAX_KEY_LEN && sizeof(DataRecord) - sizeof(char*) + ksz < end - p)
255                 {
256                     Item *it = ht_get2(tree, ro->key, ksz);
257                     if (it && (it->pos & 0xffffff00) == (p - begin) && (it->pos & 0xff) == bucket)
258                     {
259                         char key[KEY_BUF_LEN];
260                         memcpy(key, ro->key, ksz);
261                         key[ksz] = 0;
262                         log_error("REMOVE_BROKEN key %s in %s at %ld", key, path, p - begin);
263                         ht_remove2(tree, ro->key, ksz);
264                         free(it);
265                     }
266 
267                 }
268                 if (bad_reason == BAD_REC_CRC)
269                 {
270                     char *oldp = p;
271                     int jump = record_length(ro);
272                     p += jump;
273                     DataRecord *rn = decode_record(p, end-p, false,  path, p - begin, "nokey", true, NULL);
274                     if (rn != NULL)
275                     {
276                         *curr = p;
277                         jump /= PADDING;
278                         num_broken_curr += jump;
279                         (*num_broken_total) += jump;
280                         log_error("JUMP_BROKEN in %s, jump %d PADDING, total %d", path, jump, *num_broken_total);
281                         return rn;
282                     }
283                     else
284                     {
285                         p = oldp;
286                     }
287                 }
288             }
289 
290             num_broken_curr++;
291             (*num_broken_total)++;
292             if (num_broken_curr > MAX_VALUE_LEN/PADDING)   // 100M
293             {
294                 // TODO: delete broken keys from htree
295                 log_error("GIVEUP_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
296                 break;
297             }
298             *curr += PADDING;
299         }
300     }
301     if (*curr >= end && num_broken_curr > 0)
302     {
303         log_error("FILE_END_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
304     }
305     return NULL;
306 }
307 
308 
309 
read_record(FILE * f,bool decomp,const char * path,const char * key)310 DataRecord *read_record(FILE *f, bool decomp, const char *path, const char *key)
311 {
312     DataRecord *r = (DataRecord*) safe_malloc(PADDING + sizeof(char*));
313     r->value = NULL;
314 
315     if (fread(&r->crc, 1, PADDING, f) != PADDING)
316     {
317         log_error("read file fail, %s @%lld, key = (%s)",  path, (long long int)ftello(f), key);
318         goto READ_END;
319     }
320 
321     uint32_t ksz = r->ksz, vsz = r->vsz;
322 
323     if (bad_kv_size(ksz, vsz))
324     {
325         goto READ_END;
326     }
327 
328     uint32_t crc_old = r->crc;
329     int read_size = PADDING - (sizeof(DataRecord) - sizeof(char*)) - ksz;
330     if (vsz < read_size)
331     {
332         r->value = r->key + ksz + 1;
333         r->free_value = false;
334         memmove(r->value, r->key + ksz, vsz);
335     }
336     else
337     {
338         r->value = (char*)safe_malloc(vsz);
339         r->free_value = true;
340         safe_memcpy(r->value, vsz, r->key + ksz, read_size);
341         int need = vsz - read_size;
342         int ret = 0;
343         if (need > 0 && need != (ret = fread(r->value + read_size, 1, need, f)))
344         {
345             r->key[ksz] = 0; // c str
346             log_error("PREAD %d < %d, %s @%lld, key = (%s)", ret, need, path, (long long int)ftello(f), key);
347             goto READ_END;
348         }
349     }
350     r->key[ksz] = 0; // c str
351 
352     uint32_t crc = crc32(0, (unsigned char*)(&r->tstamp),
353                          sizeof(DataRecord) - sizeof(char*) - sizeof(uint32_t) + ksz);
354     crc = crc32(crc, (unsigned char*)r->value, vsz);
355     if (crc != crc_old)
356     {
357         log_error("CHECKSUM %u != %u, %s @%lld, get key (%s) got(%s)", crc, r->crc, path, (long long int)ftello(f), key, r->key);
358         goto READ_END;
359     }
360 
361     if (decomp)
362     {
363         r = decompress_record(r);
364     }
365     return r;
366 
367 READ_END:
368     free_record(&r);
369     return NULL;
370 }
371 
fast_read_record(int fd,off_t offset,bool decomp,const char * path,const char * key)372 DataRecord *fast_read_record(int fd, off_t offset, bool decomp, const char *path, const char *key)
373 {
374     DataRecord *r = (DataRecord*) safe_malloc(max(sizeof(DataRecord) + MAX_KEY_LEN, PADDING + sizeof(char*)) + 1);
375     r->value = NULL;
376 
377     if (pread(fd, &r->crc, PADDING, offset) != PADDING)
378     {
379         log_error("read file fail, %s @%lld, file size = %lld, key = %s",
380                 path, (long long)offset, (long long)lseek(fd, 0L, SEEK_END), key);
381         goto READ_END;
382     }
383 
384     if (bad_kv_size(r->ksz, r->vsz))
385     {
386         log_error("invalid ksz=%u, vsz=%u, %s @%lld, key = (%s)",
387                 r->ksz, r->vsz, path, (long long)offset, key);
388         goto READ_END;
389     }
390     int ksz = r->ksz, vsz = r->vsz;
391     uint32_t crc_old = r->crc;
392     int read_more = (sizeof(DataRecord) - sizeof(char*)) + ksz + vsz - PADDING;
393     if (read_more <= 0)
394     {
395         r->value = r->key + ksz + 1;
396         r->free_value = false;
397         memmove(r->value, r->key + ksz, vsz);
398     }
399     else if (read_more > vsz)
400     {
401         int key_more = read_more - vsz;
402         r->value = (char*)safe_malloc(vsz + key_more);
403         r->free_value = true;
404         int ret = 0;
405         log_warn("long key ksz %d key_more, vsz %d, read_more %d",
406                 ksz, key_more, vsz, read_more);
407         if (read_more != (ret=pread(fd, r->value, read_more, offset + PADDING)))
408         {
409             r->key[ksz] = 0; // c str
410             log_error("PREAD %d < %d, %s @%lld, get key (%s) got(%s)", ret, read_more, path, (long long int) offset, key, r->key);
411             goto READ_END;
412         }
413         memcpy(r->key + ksz - key_more, r->value, key_more);
414         memmove(r->value, r->value + key_more, vsz);
415     }
416     else
417     {
418         int vreadn = vsz - read_more;
419         r->value = (char*)safe_malloc(vsz);
420         r->free_value = true;
421         safe_memcpy(r->value, vsz, r->key + ksz, vreadn);
422         int ret = 0;
423         if (read_more != (ret=pread(fd, r->value + vreadn, read_more, offset + PADDING)))
424         {
425             r->key[ksz] = 0; // c str
426             log_error("PREAD %d < %d, %s @%lld, get key (%s) got(%s)", ret, read_more, path, (long long int) offset, key, r->key);
427             goto READ_END;
428         }
429     }
430     r->key[ksz] = 0; // c str
431 
432     uint32_t crc = crc32(0, (unsigned char*)(&r->tstamp),
433                          sizeof(DataRecord) - sizeof(char*) - sizeof(uint32_t) + ksz);
434     crc = crc32(crc, (unsigned char*)r->value, vsz);
435     if (crc != crc_old)
436     {
437         log_error("CHECKSUM %u != %u, %s @%lld, get key (%s) got(%s)", crc, r->crc, path, (long long int)offset, key, r->key);
438         goto READ_END;
439     }
440 
441     if (decomp)
442     {
443         r = decompress_record(r);
444     }
445     return r;
446 
447 READ_END:
448     free_record(&r);
449     return NULL;
450 }
451 
encode_record(DataRecord * r,unsigned int * size)452 char *encode_record(DataRecord *r, unsigned int *size)
453 {
454     compress_record(r);
455 
456     unsigned int m, n;
457     int ksz = r->ksz, vsz = r->vsz;
458     int hs = sizeof(char*); // over header
459     m = n = sizeof(DataRecord) - hs + ksz + vsz;
460     if (n % PADDING != 0)
461     {
462         m += PADDING - (n % PADDING);
463     }
464 
465     char *buf = (char*)safe_malloc(m);
466 
467     DataRecord *data = (DataRecord*)(buf - hs);
468     memcpy(&data->crc, &r->crc, sizeof(DataRecord) - hs); // safe
469     memcpy(data->key, r->key, ksz); // safe
470     memcpy(data->key + ksz, r->value, vsz); // safe
471     memset(buf + n, 0, m - n);
472     data->crc = crc32(0, (unsigned char*)&data->tstamp, n - sizeof(uint32_t));
473 
474     *size = m;
475     return buf;
476 }
477 
write_record(FILE * f,DataRecord * r)478 int write_record(FILE *f, DataRecord *r)
479 {
480     unsigned int size;
481     char *data = encode_record(r, &size);
482     if (fwrite(data, 1, size, f) < size)
483     {
484         log_error("write %d byte failed", size);
485         free(data);
486         return -1;
487     }
488     free(data);
489     return 0;
490 }
491 
492 
scanDataFile(HTree * tree,int bucket,const char * path,const char * hintpath)493 void scanDataFile(HTree *tree, int bucket, const char *path, const char *hintpath)
494 {
495     MFile *f = open_mfile(path);
496     if (f == NULL) return;
497 
498     log_warn("scan datafile %s", path);
499     HTree *cur_tree = ht_new(0, 0, true);
500     char *p = f->addr, *end = f->addr + f->size;
501     int num_broken_total = 0;
502     size_t last_advise = 0;
503 
504     while (p < end)
505     {
506         int badksz = 0;
507         DataRecord *r = scan_record(f->addr, end, &p, path, &num_broken_total, tree, bucket);
508         if (r == NULL)
509             break;
510         uint32_t pos = p - f->addr;
511         p += record_length(r);
512         r = decompress_record(r);
513         if (r == NULL)
514         {
515             log_error("decompress_record fail, %s @%u size = %ld", path, pos, p - (pos + f->addr));
516             continue;
517         }
518         uint16_t hash = gen_hash(r->value, r->vsz);
519         if (check_key(r->key, r->ksz))
520         {
521             if (r->version > 0)
522             {
523                 ht_add2(tree, r->key, r->ksz, pos | bucket, hash, r->version);
524             }
525             else
526             {
527                 ht_remove2(tree, r->key, r->ksz);
528             }
529             ht_add2(cur_tree, r->key, r->ksz, pos | bucket, hash, r->version);
530         }
531         free_record(&r);
532         mfile_dontneed(f, p - f->addr, &last_advise);
533     }
534     close_mfile(f);
535     build_hint(cur_tree, hintpath);
536 }
537 
scanDataFileBefore(HTree * tree,int bucket,const char * path,time_t before)538 void scanDataFileBefore(HTree *tree, int bucket, const char *path, time_t before)
539 {
540     MFile *f = open_mfile(path);
541     if (f == NULL) return;
542 
543     log_error("scan datafile %s before %ld", path, before);
544     char *p = f->addr, *end = f->addr + f->size;
545     int num_broken_total = 0;
546     size_t last_advise = 0;
547     while (p < end)
548     {
549         DataRecord *r = scan_record(f->addr, end, &p, path, &num_broken_total, tree, bucket);
550         if (r == NULL)
551             break;
552         if (r->tstamp >= before)
553             break;
554         uint32_t pos = p - f->addr;
555         p += record_length(r);
556         r = decompress_record(r);
557         if (r == NULL)
558         {
559             log_error("decompress_record fail, %s @%u size = %ld", path, pos, p - (pos + f->addr));
560             continue;
561         }
562 
563         if (check_key(r->key, r->ksz))
564         {
565             if (r->version > 0)
566             {
567                 uint16_t hash = gen_hash(r->value, r->vsz);
568                 ht_add2(tree, r->key, r->ksz, pos | bucket, hash, r->version);
569             }
570             else
571             {
572                 ht_remove2(tree, r->key, r->ksz);
573             }
574         }
575         free_record(&r);
576         mfile_dontneed(f, p - f->addr, &last_advise);
577     }
578     close_mfile(f);
579 }
580 
581 // update pos in HTree
update_items(Item * it,void * args)582 void update_items(Item *it, void *args)
583 {
584     HTree *tree = (HTree*) args;
585     Item *p = ht_get(tree, it->key);
586     if (p)
587     {
588         if (it->pos != p->pos && it->ver == p->ver)
589         {
590             if (it->ver > 0)
591             {
592                 ht_add(tree, p->key, it->pos, p->hash, p->ver);
593             }
594             else
595             {
596                 ht_remove(tree, p->key);
597             }
598         }
599         free(p);
600     }
601     else
602     {
603         ht_add(tree, it->key, it->pos, it->hash, it->ver);
604     }
605 }
optimizeDataFile(HTree * tree,Mgr * mgr,int bucket,const char * path,const char * hintpath,int last_bucket,const char * lastdata,const char * lasthint_real,uint32_t max_data_size,bool skipped,bool use_tmp,uint32_t * deleted_bytes)606 int optimizeDataFile(HTree *tree, Mgr *mgr, int bucket, const char *path, const char *hintpath,
607         int last_bucket, const char *lastdata, const char *lasthint_real, uint32_t max_data_size,
608         bool skipped, bool use_tmp, uint32_t *deleted_bytes)
609 {
610 
611     struct timeval opt_start, opt_end, update_start, update_end;
612     gettimeofday(&opt_start, NULL);
613 
614     int err = -1;
615     log_notice("begin optimize %s -> %s, use_tmp = %s", path, lastdata, use_tmp ? "true" : "false");
616 
617 //to destroy:
618     FILE *new_df = NULL;
619     HTree *cur_tree = NULL;
620     char *hintdata = NULL;
621     MFile *f = open_mfile(path);
622     if (f == NULL)
623     {
624           err = -1;
625           goto  OPT_FAIL;
626     }
627 
628     uint32_t old_srcdata_size = f->size;
629     uint32_t new_df_orig_size =  0;
630     char tmp[MAX_PATH_LEN] = "";
631     uint32_t hint_used = 0, hint_size = 0;
632 
633     if (!use_tmp)
634     {
635         new_df = fopen(lastdata, "ab");
636         new_df_orig_size = ftello(new_df);
637 
638         int end = new_df_orig_size % 256;
639         if (end != 0)
640         {
641             char bytes[256];
642             int size = 256 - end;
643             log_warn("size of %s is 0x%llx, add padding", lastdata, (long long)new_df_orig_size);
644             if (fwrite(bytes, 1, size, new_df) < size)
645             {
646                 log_error("write error when padding %s", lastdata);
647                 goto  OPT_FAIL;
648             }
649         }
650 
651         if (new_df_orig_size > 0)
652         {
653             HintFile *hint = open_hint(lasthint_real, NULL);
654             if (hint == NULL)
655             {
656                 log_error("open last hint file %s failed", lasthint_real);
657                 err = 1;
658                 goto  OPT_FAIL;
659             }
660             hint_size = hint->size * 2;
661             if (hint_size < 4096) hint_size = 4096;
662             hintdata = (char*)safe_malloc(hint_size);
663             memcpy(hintdata, hint->buf, hint->size); // safe
664             hint_used = hint->size;
665             close_hint(hint);
666         }
667     }
668     else
669     {
670         strcpy(tmp, lastdata);
671         strcat(tmp, ".tmp");
672         mgr_alloc(mgr, simple_basename(tmp));
673 
674         new_df = fopen(tmp, "wb");
675         if (new_df == NULL)
676         {
677             log_error("open tmp datafile failed, %s", tmp);
678             goto  OPT_FAIL;
679         }
680     }
681     if (hintdata == NULL)
682     {
683         hint_size = 1<<20;
684         hintdata = (char*)safe_malloc(hint_size);
685     }
686 
687     cur_tree = ht_new(0, 0, true);
688     int nrecord = 0, deleted = 0, broken = 0, released = 0;
689     char *p = f->addr, *end = f->addr + f->size;
690     char *newp = p;
691     size_t last_advise = 0;
692     while (p < end)
693     {
694         DataRecord *r = scan_record(f->addr, end, &p, path, &broken, tree, bucket);
695         if (r == NULL)
696         {
697             if (p < end)
698                 goto  OPT_FAIL;
699             break;
700         }
701 
702         newp = p + record_length(r);
703         nrecord++;
704         Item *it = ht_get2(tree, r->key, r->ksz);
705         uint32_t pos = p - f->addr;
706         if (it && it->pos  == (pos | bucket) && (it->ver > 0 || skipped))
707         {
708             uint32_t new_pos = ftello(new_df);
709             if (new_pos + record_length(r) > max_data_size)
710             {
711                 if (use_tmp)
712                 {
713                     log_warn("Bug: optimize %s into tmp %s overflow", path, tmp);
714                 }
715                 else
716                 {
717                     log_warn("optimize %s into %s overflow, ftruncate to %u", path, lastdata, new_df_orig_size);
718                     fflush(new_df);
719                     if (0 != ftruncate(fileno(new_df), new_df_orig_size))
720                     {
721                         log_error("ftruncate failed for  %s old size = %u", path, new_df_orig_size);
722                     }
723                     rewind(new_df);
724                 }
725                 err = 1;
726                 goto  OPT_FAIL;
727             }
728 
729             uint16_t hash = it->hash;
730             ht_add2(cur_tree, r->key, r->ksz, new_pos | last_bucket, hash, it->ver);
731             // append record to hint file
732             int hsize = sizeof(HintRecord) - NAME_IN_RECORD + r->ksz + 1;
733             if (hint_used + hsize > hint_size)
734             {
735                 hint_size *= 2;
736                 hintdata = (char*)safe_realloc(hintdata, hint_size);
737             }
738             HintRecord *hr = (HintRecord*)(hintdata + hint_used);
739             hr->ksize = r->ksz;
740             hr->pos = new_pos >> 8;
741             hr->version = it->ver;
742             hr->hash = hash;
743             safe_memcpy(hr->key, hint_size - sizeof(uint32_t) -
744                     sizeof(int32_t) - sizeof(uint16_t), r->key, r->ksz + 1);
745             hint_used += hsize;
746 
747             r->version = it->ver;
748             if (write_record(new_df, r) != 0)
749             {
750                 log_error("write error: %s -> %d", path, last_bucket);
751                 free(it);
752                 free_record(&r);
753                 goto  OPT_FAIL;
754             }
755         }
756         else
757         {
758             if (it && it->pos == (pos | bucket) && it->ver < 0)
759             {
760                 deleted++;
761                 ht_add2(cur_tree, r->key, r->ksz, 0, it->hash, it->ver);
762             }
763             released++;
764         }
765         if (it) free(it);
766         p = newp;
767         free_record(&r);
768 
769         mfile_dontneed(f, pos, &last_advise);
770     }
771     fseeko(new_df, 0L, SEEK_END);
772     *deleted_bytes = f->size - (ftello(new_df) - new_df_orig_size);
773 
774     close_mfile(f);
775     fclose(new_df);
776 
777     gettimeofday(&update_start, NULL);
778     if (bucket == last_bucket)
779     {
780         ht_set_updating_bucket(tree, bucket, cur_tree);
781         ht_visit2(cur_tree, update_items, tree);
782         mgr_unlink(lastdata);
783         mgr_rename(tmp, lastdata);
784         ht_set_updating_bucket(tree, -1, NULL);
785     }
786     else
787     {
788         if (use_tmp)
789             mgr_rename(tmp, lastdata);
790         ht_visit(cur_tree, update_items, tree);
791         mgr_unlink(path);
792     }
793     gettimeofday(&update_end, NULL);
794 
795     ht_destroy(cur_tree);
796 
797     if (last_bucket != bucket)
798         mgr_unlink(hintpath);
799     write_hint_file(hintdata, hint_used, lasthint_real);
800     free(hintdata);
801 
802 
803     gettimeofday(&opt_end, NULL);
804     float update_secs = (update_end.tv_sec - update_start.tv_sec) + (update_end.tv_usec - update_start.tv_usec) / 1e6;
805     float opt_secs = (opt_end.tv_sec - opt_start.tv_sec) + (opt_end.tv_usec - opt_start.tv_usec) / 1e6;
806     log_notice("optimize %s -> %d (%u B) complete, %d/%d records released, %d deleted, %u/%u bytes released, %d bytes broken, use %fs/%fs",
807             path, last_bucket, (last_bucket == bucket) ? old_srcdata_size : new_df_orig_size, released, nrecord, deleted, *deleted_bytes, old_srcdata_size, broken, update_secs, opt_secs);
808     return 0;
809 
810 OPT_FAIL:
811     log_notice("optimize %s -> %d (%u B) failed,   %d/%d records released, %d deleted, %u/%u bytes released, %d bytes broken, use %fs/%fs, err = %d",
812             path, last_bucket, (last_bucket == bucket) ? old_srcdata_size : new_df_orig_size, released, nrecord, deleted, *deleted_bytes, old_srcdata_size, broken, update_secs, opt_secs, err);
813     if (hintdata) free(hintdata);
814     if (cur_tree)  ht_destroy(cur_tree);
815     if (f) close_mfile(f);
816     if (new_df) fclose(new_df);
817     if (use_tmp) mgr_unlink(tmp);
818     return err;
819 }
820