1 /*
2 * Beansdb - A high available distributed key-value storage system:
3 *
4 * http://beansdb.googlecode.com
5 *
6 * Copyright 2010 Douban Inc. All rights reserved.
7 *
8 * Use and distribution licensed under the BSD license. See
9 * the LICENSE file for full text.
10 *
11 * Authors:
12 * Davies Liu <davies.liu@gmail.com>
13 * Hurricane Lee <hurricane1026@gmail.com>
14 *
15 */
16
17 #ifdef HAVE_CONFIG_H
18 #include "config.h"
19 #endif
20
21 #include <sys/stat.h>
22 #include <sys/types.h>
23 #include <sys/uio.h>
24 #include <sys/time.h>
25
26 #if HAVE_UNISTD_H
27 #include <unistd.h>
28 #endif
29
30 #include <stdlib.h>
31 #include <stdint.h>
32 #include <string.h>
33
34 #include "record.h"
35 #include "hint.h"
36 #include "crc32.c"
37 #include "diskmgr.h"
38 #include "quicklz.h"
39 #include "fnv1a.h"
40
41 #include "mfile.h"
42 #include "util.h"
43 #include "const.h"
44 #include "log.h"
45
46
47 const int PADDING = 256;
48 const int32_t COMPRESS_FLAG = 0x00010000;
49 const int32_t CLIENT_COMPRESS_FLAG = 0x00000010;
50 const float COMPRESS_RATIO_LIMIT = 0.7;
51 const int TRY_COMPRESS_SIZE = 1024 * 10;
52
bad_kv_size(uint32_t ksz,uint32_t vsz)53 static inline bool bad_kv_size(uint32_t ksz, uint32_t vsz)
54 {
55 return ((ksz == 0 || ksz > MAX_KEY_LEN)|| vsz > MAX_VALUE_LEN);
56 }
57
gen_hash(char * buf,int len)58 uint32_t gen_hash(char *buf, int len)
59 {
60 uint32_t hash = len * 97;
61 if (len <= 1024)
62 {
63 hash += fnv1a(buf, len);
64 }
65 else
66 {
67 hash += fnv1a(buf, 512);
68 hash *= 97;
69 hash += fnv1a(buf + len - 512, 512);
70 }
71 return hash;
72 }
73
record_length(DataRecord * r)74 int record_length(DataRecord *r)
75 {
76 size_t n = sizeof(DataRecord) - sizeof(char*) + r->ksz + r->vsz;
77 /*
78 if (n % PADDING != 0)
79 {
80 n += PADDING - (n % PADDING);
81 }
82 */
83 return (n / PADDING + (int)!!(n % PADDING)) * PADDING;
84 }
85
record_value(DataRecord * r)86 char *record_value(DataRecord *r)
87 {
88 char *res = r->value;
89 if (res == r->key + r->ksz + 1)
90 {
91 // value was alloced in record
92 res = (char*)safe_malloc(r->vsz);
93 memcpy(res, r->value, r->vsz); // safe
94 }
95 return res;
96 }
97
free_record(DataRecord ** r)98 void free_record(DataRecord **r)
99 {
100 if (r == NULL || (*r) == NULL) return;
101 if ((*r)->value != NULL && (*r)->free_value) free((*r)->value);
102 free(*r);
103 *r = NULL;
104 }
105
compress_record(DataRecord * r)106 void compress_record(DataRecord *r)
107 {
108 if (r->flag & COMPRESS_FLAG) return;
109 int ksz = r->ksz, vsz = r->vsz;
110 int n = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
111 if (n > PADDING && (r->flag & (COMPRESS_FLAG|CLIENT_COMPRESS_FLAG)) == 0)
112 {
113 char *wbuf = (char*)try_malloc(QLZ_SCRATCH_COMPRESS);
114 char *v = (char*)try_malloc(vsz + 400);
115 if (wbuf == NULL || v == NULL) return ;
116 int try_size = vsz > TRY_COMPRESS_SIZE ? TRY_COMPRESS_SIZE : vsz;
117 int vsize = qlz_compress(r->value, v, try_size, wbuf);
118 if (try_size < vsz && vsize < try_size * COMPRESS_RATIO_LIMIT)
119 {
120 try_size = vsz;
121 vsize = qlz_compress(r->value, v, try_size, wbuf);
122 }
123 free(wbuf);
124
125 if (vsize > try_size * COMPRESS_RATIO_LIMIT || try_size < vsz)
126 {
127 free(v);
128 return;
129 }
130
131 if (r->free_value)
132 {
133 free(r->value);
134 }
135 r->value = v;
136 r->free_value = true;
137 r->vsz = vsize;
138 r->flag |= COMPRESS_FLAG;
139 }
140 }
141
decompress_record(DataRecord * r)142 DataRecord *decompress_record(DataRecord *r)
143 {
144 if (r->flag & COMPRESS_FLAG)
145 {
146 char scratch[QLZ_SCRATCH_DECOMPRESS];
147 unsigned int csize = qlz_size_compressed(r->value);
148 if (csize != r->vsz)
149 {
150 log_error("broken compressed data: %d != %d, flag=%x", csize, r->vsz, r->flag);
151 goto DECOMP_END;
152 }
153 unsigned int size = qlz_size_decompressed(r->value);
154 char *v = (char*)safe_malloc(size);
155 unsigned int ret = qlz_decompress(r->value, v, scratch);
156 if (ret != size)
157 {
158 log_error("decompress %s failed: %d != %d", r->key, ret, size);
159 goto DECOMP_END;
160 }
161 if (r->free_value)
162 {
163 free(r->value);
164 }
165 r->value = v;
166 r->free_value = true;
167 r->vsz = size;
168 r->flag &= ~COMPRESS_FLAG;
169 }
170 return r;
171
172 DECOMP_END:
173 free_record(&r);
174 return NULL;
175 }
176
decode_record(char * buf,uint32_t size,bool decomp,const char * path,uint32_t pos,const char * key,bool do_logging,int * fail_reason)177 DataRecord *decode_record(char *buf, uint32_t size, bool decomp, const char *path, uint32_t pos, const char *key, bool do_logging, int *fail_reason)
178 {
179 DataRecord *r = (DataRecord *) (buf - sizeof(char*));
180 uint32_t ksz = r->ksz, vsz = r->vsz;
181 if (bad_kv_size(ksz, vsz))
182 {
183 if (do_logging)
184 log_error("invalid ksz=%u, vsz=%u, %s @%u, key = (%s)", ksz, vsz, path, pos, key);
185 if (fail_reason)
186 *fail_reason = BAD_REC_SIZE;
187 return NULL;
188 }
189
190 unsigned int need = sizeof(DataRecord) - sizeof(char*) + ksz + vsz;
191 if (size < need)
192 {
193 if (do_logging)
194 log_error("not enough data in buffer %d < %d, %s @%u, key = (%s) ", size, need, path, pos, key);
195 if (fail_reason)
196 *fail_reason = BAD_REC_END;
197 return NULL;
198 }
199 uint32_t crc = crc32(0, (unsigned char*)buf + sizeof(uint32_t), need - sizeof(uint32_t));
200 if (r->crc != crc)
201 {
202 if (do_logging)
203 log_error("CHECKSUM %u != %u, %s @%u, get (%s) got (%s)", crc, r->crc, path, pos, key, r->key);
204 if (fail_reason)
205 *fail_reason = BAD_REC_CRC;
206 return NULL;
207 }
208
209 DataRecord *r2 = (DataRecord *)safe_malloc(need + 1 + sizeof(char*));
210 memcpy(&r2->crc, &r->crc, sizeof(DataRecord) - sizeof(char*) + ksz); // safe
211 r2->key[ksz] = 0; // c str
212 r2->free_value = false;
213 r2->value = r2->key + ksz + 1;
214 memcpy(r2->value, r->key + ksz, vsz); // safe
215
216 if (decomp)
217 {
218 r2 = decompress_record(r2);
219 if (r2 == NULL && fail_reason)
220 *fail_reason = BAD_REC_DECOMPRESS;
221 }
222 return r2;
223 }
224
scan_record(char * begin,char * end,char ** curr,const char * path,int * num_broken_total,HTree * tree,int bucket)225 static inline DataRecord *scan_record(char *begin, char *end, char **curr,
226 const char *path, int *num_broken_total, HTree *tree, int bucket)
227 {
228 int num_broken_curr = 0;
229 while (*curr < end)
230 {
231 char *p = *curr;
232 int bad_reason = 0;
233 bool do_logging = true;
234 if (num_broken_curr > 10000)
235 do_logging = false;
236
237 DataRecord *r = decode_record(p, end-p, false, path, p - begin, "nokey", do_logging, &bad_reason);
238 if (r != NULL)
239 {
240 if (num_broken_curr > 0)
241 {
242 log_error("END_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
243 num_broken_curr = 0;
244 }
245 return r;
246 }
247 else
248 {
249 if (num_broken_curr == 0)
250 {
251 DataRecord *ro = (DataRecord *) (p - sizeof(char*));
252 log_error("START_BROKEN in %s at %ld", path, p - begin);
253 uint32_t ksz = ro->ksz;
254 if (ksz > 0 && ksz <= MAX_KEY_LEN && sizeof(DataRecord) - sizeof(char*) + ksz < end - p)
255 {
256 Item *it = ht_get2(tree, ro->key, ksz);
257 if (it && (it->pos & 0xffffff00) == (p - begin) && (it->pos & 0xff) == bucket)
258 {
259 char key[KEY_BUF_LEN];
260 memcpy(key, ro->key, ksz);
261 key[ksz] = 0;
262 log_error("REMOVE_BROKEN key %s in %s at %ld", key, path, p - begin);
263 ht_remove2(tree, ro->key, ksz);
264 free(it);
265 }
266
267 }
268 if (bad_reason == BAD_REC_CRC)
269 {
270 char *oldp = p;
271 int jump = record_length(ro);
272 p += jump;
273 DataRecord *rn = decode_record(p, end-p, false, path, p - begin, "nokey", true, NULL);
274 if (rn != NULL)
275 {
276 *curr = p;
277 jump /= PADDING;
278 num_broken_curr += jump;
279 (*num_broken_total) += jump;
280 log_error("JUMP_BROKEN in %s, jump %d PADDING, total %d", path, jump, *num_broken_total);
281 return rn;
282 }
283 else
284 {
285 p = oldp;
286 }
287 }
288 }
289
290 num_broken_curr++;
291 (*num_broken_total)++;
292 if (num_broken_curr > MAX_VALUE_LEN/PADDING) // 100M
293 {
294 // TODO: delete broken keys from htree
295 log_error("GIVEUP_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
296 break;
297 }
298 *curr += PADDING;
299 }
300 }
301 if (*curr >= end && num_broken_curr > 0)
302 {
303 log_error("FILE_END_BROKEN in %s after %d PADDING, total %d", path, num_broken_curr, *num_broken_total);
304 }
305 return NULL;
306 }
307
308
309
read_record(FILE * f,bool decomp,const char * path,const char * key)310 DataRecord *read_record(FILE *f, bool decomp, const char *path, const char *key)
311 {
312 DataRecord *r = (DataRecord*) safe_malloc(PADDING + sizeof(char*));
313 r->value = NULL;
314
315 if (fread(&r->crc, 1, PADDING, f) != PADDING)
316 {
317 log_error("read file fail, %s @%lld, key = (%s)", path, (long long int)ftello(f), key);
318 goto READ_END;
319 }
320
321 uint32_t ksz = r->ksz, vsz = r->vsz;
322
323 if (bad_kv_size(ksz, vsz))
324 {
325 goto READ_END;
326 }
327
328 uint32_t crc_old = r->crc;
329 int read_size = PADDING - (sizeof(DataRecord) - sizeof(char*)) - ksz;
330 if (vsz < read_size)
331 {
332 r->value = r->key + ksz + 1;
333 r->free_value = false;
334 memmove(r->value, r->key + ksz, vsz);
335 }
336 else
337 {
338 r->value = (char*)safe_malloc(vsz);
339 r->free_value = true;
340 safe_memcpy(r->value, vsz, r->key + ksz, read_size);
341 int need = vsz - read_size;
342 int ret = 0;
343 if (need > 0 && need != (ret = fread(r->value + read_size, 1, need, f)))
344 {
345 r->key[ksz] = 0; // c str
346 log_error("PREAD %d < %d, %s @%lld, key = (%s)", ret, need, path, (long long int)ftello(f), key);
347 goto READ_END;
348 }
349 }
350 r->key[ksz] = 0; // c str
351
352 uint32_t crc = crc32(0, (unsigned char*)(&r->tstamp),
353 sizeof(DataRecord) - sizeof(char*) - sizeof(uint32_t) + ksz);
354 crc = crc32(crc, (unsigned char*)r->value, vsz);
355 if (crc != crc_old)
356 {
357 log_error("CHECKSUM %u != %u, %s @%lld, get key (%s) got(%s)", crc, r->crc, path, (long long int)ftello(f), key, r->key);
358 goto READ_END;
359 }
360
361 if (decomp)
362 {
363 r = decompress_record(r);
364 }
365 return r;
366
367 READ_END:
368 free_record(&r);
369 return NULL;
370 }
371
fast_read_record(int fd,off_t offset,bool decomp,const char * path,const char * key)372 DataRecord *fast_read_record(int fd, off_t offset, bool decomp, const char *path, const char *key)
373 {
374 DataRecord *r = (DataRecord*) safe_malloc(max(sizeof(DataRecord) + MAX_KEY_LEN, PADDING + sizeof(char*)) + 1);
375 r->value = NULL;
376
377 if (pread(fd, &r->crc, PADDING, offset) != PADDING)
378 {
379 log_error("read file fail, %s @%lld, file size = %lld, key = %s",
380 path, (long long)offset, (long long)lseek(fd, 0L, SEEK_END), key);
381 goto READ_END;
382 }
383
384 if (bad_kv_size(r->ksz, r->vsz))
385 {
386 log_error("invalid ksz=%u, vsz=%u, %s @%lld, key = (%s)",
387 r->ksz, r->vsz, path, (long long)offset, key);
388 goto READ_END;
389 }
390 int ksz = r->ksz, vsz = r->vsz;
391 uint32_t crc_old = r->crc;
392 int read_more = (sizeof(DataRecord) - sizeof(char*)) + ksz + vsz - PADDING;
393 if (read_more <= 0)
394 {
395 r->value = r->key + ksz + 1;
396 r->free_value = false;
397 memmove(r->value, r->key + ksz, vsz);
398 }
399 else if (read_more > vsz)
400 {
401 int key_more = read_more - vsz;
402 r->value = (char*)safe_malloc(vsz + key_more);
403 r->free_value = true;
404 int ret = 0;
405 log_warn("long key ksz %d key_more, vsz %d, read_more %d",
406 ksz, key_more, vsz, read_more);
407 if (read_more != (ret=pread(fd, r->value, read_more, offset + PADDING)))
408 {
409 r->key[ksz] = 0; // c str
410 log_error("PREAD %d < %d, %s @%lld, get key (%s) got(%s)", ret, read_more, path, (long long int) offset, key, r->key);
411 goto READ_END;
412 }
413 memcpy(r->key + ksz - key_more, r->value, key_more);
414 memmove(r->value, r->value + key_more, vsz);
415 }
416 else
417 {
418 int vreadn = vsz - read_more;
419 r->value = (char*)safe_malloc(vsz);
420 r->free_value = true;
421 safe_memcpy(r->value, vsz, r->key + ksz, vreadn);
422 int ret = 0;
423 if (read_more != (ret=pread(fd, r->value + vreadn, read_more, offset + PADDING)))
424 {
425 r->key[ksz] = 0; // c str
426 log_error("PREAD %d < %d, %s @%lld, get key (%s) got(%s)", ret, read_more, path, (long long int) offset, key, r->key);
427 goto READ_END;
428 }
429 }
430 r->key[ksz] = 0; // c str
431
432 uint32_t crc = crc32(0, (unsigned char*)(&r->tstamp),
433 sizeof(DataRecord) - sizeof(char*) - sizeof(uint32_t) + ksz);
434 crc = crc32(crc, (unsigned char*)r->value, vsz);
435 if (crc != crc_old)
436 {
437 log_error("CHECKSUM %u != %u, %s @%lld, get key (%s) got(%s)", crc, r->crc, path, (long long int)offset, key, r->key);
438 goto READ_END;
439 }
440
441 if (decomp)
442 {
443 r = decompress_record(r);
444 }
445 return r;
446
447 READ_END:
448 free_record(&r);
449 return NULL;
450 }
451
encode_record(DataRecord * r,unsigned int * size)452 char *encode_record(DataRecord *r, unsigned int *size)
453 {
454 compress_record(r);
455
456 unsigned int m, n;
457 int ksz = r->ksz, vsz = r->vsz;
458 int hs = sizeof(char*); // over header
459 m = n = sizeof(DataRecord) - hs + ksz + vsz;
460 if (n % PADDING != 0)
461 {
462 m += PADDING - (n % PADDING);
463 }
464
465 char *buf = (char*)safe_malloc(m);
466
467 DataRecord *data = (DataRecord*)(buf - hs);
468 memcpy(&data->crc, &r->crc, sizeof(DataRecord) - hs); // safe
469 memcpy(data->key, r->key, ksz); // safe
470 memcpy(data->key + ksz, r->value, vsz); // safe
471 memset(buf + n, 0, m - n);
472 data->crc = crc32(0, (unsigned char*)&data->tstamp, n - sizeof(uint32_t));
473
474 *size = m;
475 return buf;
476 }
477
write_record(FILE * f,DataRecord * r)478 int write_record(FILE *f, DataRecord *r)
479 {
480 unsigned int size;
481 char *data = encode_record(r, &size);
482 if (fwrite(data, 1, size, f) < size)
483 {
484 log_error("write %d byte failed", size);
485 free(data);
486 return -1;
487 }
488 free(data);
489 return 0;
490 }
491
492
scanDataFile(HTree * tree,int bucket,const char * path,const char * hintpath)493 void scanDataFile(HTree *tree, int bucket, const char *path, const char *hintpath)
494 {
495 MFile *f = open_mfile(path);
496 if (f == NULL) return;
497
498 log_warn("scan datafile %s", path);
499 HTree *cur_tree = ht_new(0, 0, true);
500 char *p = f->addr, *end = f->addr + f->size;
501 int num_broken_total = 0;
502 size_t last_advise = 0;
503
504 while (p < end)
505 {
506 int badksz = 0;
507 DataRecord *r = scan_record(f->addr, end, &p, path, &num_broken_total, tree, bucket);
508 if (r == NULL)
509 break;
510 uint32_t pos = p - f->addr;
511 p += record_length(r);
512 r = decompress_record(r);
513 if (r == NULL)
514 {
515 log_error("decompress_record fail, %s @%u size = %ld", path, pos, p - (pos + f->addr));
516 continue;
517 }
518 uint16_t hash = gen_hash(r->value, r->vsz);
519 if (check_key(r->key, r->ksz))
520 {
521 if (r->version > 0)
522 {
523 ht_add2(tree, r->key, r->ksz, pos | bucket, hash, r->version);
524 }
525 else
526 {
527 ht_remove2(tree, r->key, r->ksz);
528 }
529 ht_add2(cur_tree, r->key, r->ksz, pos | bucket, hash, r->version);
530 }
531 free_record(&r);
532 mfile_dontneed(f, p - f->addr, &last_advise);
533 }
534 close_mfile(f);
535 build_hint(cur_tree, hintpath);
536 }
537
scanDataFileBefore(HTree * tree,int bucket,const char * path,time_t before)538 void scanDataFileBefore(HTree *tree, int bucket, const char *path, time_t before)
539 {
540 MFile *f = open_mfile(path);
541 if (f == NULL) return;
542
543 log_error("scan datafile %s before %ld", path, before);
544 char *p = f->addr, *end = f->addr + f->size;
545 int num_broken_total = 0;
546 size_t last_advise = 0;
547 while (p < end)
548 {
549 DataRecord *r = scan_record(f->addr, end, &p, path, &num_broken_total, tree, bucket);
550 if (r == NULL)
551 break;
552 if (r->tstamp >= before)
553 break;
554 uint32_t pos = p - f->addr;
555 p += record_length(r);
556 r = decompress_record(r);
557 if (r == NULL)
558 {
559 log_error("decompress_record fail, %s @%u size = %ld", path, pos, p - (pos + f->addr));
560 continue;
561 }
562
563 if (check_key(r->key, r->ksz))
564 {
565 if (r->version > 0)
566 {
567 uint16_t hash = gen_hash(r->value, r->vsz);
568 ht_add2(tree, r->key, r->ksz, pos | bucket, hash, r->version);
569 }
570 else
571 {
572 ht_remove2(tree, r->key, r->ksz);
573 }
574 }
575 free_record(&r);
576 mfile_dontneed(f, p - f->addr, &last_advise);
577 }
578 close_mfile(f);
579 }
580
581 // update pos in HTree
update_items(Item * it,void * args)582 void update_items(Item *it, void *args)
583 {
584 HTree *tree = (HTree*) args;
585 Item *p = ht_get(tree, it->key);
586 if (p)
587 {
588 if (it->pos != p->pos && it->ver == p->ver)
589 {
590 if (it->ver > 0)
591 {
592 ht_add(tree, p->key, it->pos, p->hash, p->ver);
593 }
594 else
595 {
596 ht_remove(tree, p->key);
597 }
598 }
599 free(p);
600 }
601 else
602 {
603 ht_add(tree, it->key, it->pos, it->hash, it->ver);
604 }
605 }
optimizeDataFile(HTree * tree,Mgr * mgr,int bucket,const char * path,const char * hintpath,int last_bucket,const char * lastdata,const char * lasthint_real,uint32_t max_data_size,bool skipped,bool use_tmp,uint32_t * deleted_bytes)606 int optimizeDataFile(HTree *tree, Mgr *mgr, int bucket, const char *path, const char *hintpath,
607 int last_bucket, const char *lastdata, const char *lasthint_real, uint32_t max_data_size,
608 bool skipped, bool use_tmp, uint32_t *deleted_bytes)
609 {
610
611 struct timeval opt_start, opt_end, update_start, update_end;
612 gettimeofday(&opt_start, NULL);
613
614 int err = -1;
615 log_notice("begin optimize %s -> %s, use_tmp = %s", path, lastdata, use_tmp ? "true" : "false");
616
617 //to destroy:
618 FILE *new_df = NULL;
619 HTree *cur_tree = NULL;
620 char *hintdata = NULL;
621 MFile *f = open_mfile(path);
622 if (f == NULL)
623 {
624 err = -1;
625 goto OPT_FAIL;
626 }
627
628 uint32_t old_srcdata_size = f->size;
629 uint32_t new_df_orig_size = 0;
630 char tmp[MAX_PATH_LEN] = "";
631 uint32_t hint_used = 0, hint_size = 0;
632
633 if (!use_tmp)
634 {
635 new_df = fopen(lastdata, "ab");
636 new_df_orig_size = ftello(new_df);
637
638 int end = new_df_orig_size % 256;
639 if (end != 0)
640 {
641 char bytes[256];
642 int size = 256 - end;
643 log_warn("size of %s is 0x%llx, add padding", lastdata, (long long)new_df_orig_size);
644 if (fwrite(bytes, 1, size, new_df) < size)
645 {
646 log_error("write error when padding %s", lastdata);
647 goto OPT_FAIL;
648 }
649 }
650
651 if (new_df_orig_size > 0)
652 {
653 HintFile *hint = open_hint(lasthint_real, NULL);
654 if (hint == NULL)
655 {
656 log_error("open last hint file %s failed", lasthint_real);
657 err = 1;
658 goto OPT_FAIL;
659 }
660 hint_size = hint->size * 2;
661 if (hint_size < 4096) hint_size = 4096;
662 hintdata = (char*)safe_malloc(hint_size);
663 memcpy(hintdata, hint->buf, hint->size); // safe
664 hint_used = hint->size;
665 close_hint(hint);
666 }
667 }
668 else
669 {
670 strcpy(tmp, lastdata);
671 strcat(tmp, ".tmp");
672 mgr_alloc(mgr, simple_basename(tmp));
673
674 new_df = fopen(tmp, "wb");
675 if (new_df == NULL)
676 {
677 log_error("open tmp datafile failed, %s", tmp);
678 goto OPT_FAIL;
679 }
680 }
681 if (hintdata == NULL)
682 {
683 hint_size = 1<<20;
684 hintdata = (char*)safe_malloc(hint_size);
685 }
686
687 cur_tree = ht_new(0, 0, true);
688 int nrecord = 0, deleted = 0, broken = 0, released = 0;
689 char *p = f->addr, *end = f->addr + f->size;
690 char *newp = p;
691 size_t last_advise = 0;
692 while (p < end)
693 {
694 DataRecord *r = scan_record(f->addr, end, &p, path, &broken, tree, bucket);
695 if (r == NULL)
696 {
697 if (p < end)
698 goto OPT_FAIL;
699 break;
700 }
701
702 newp = p + record_length(r);
703 nrecord++;
704 Item *it = ht_get2(tree, r->key, r->ksz);
705 uint32_t pos = p - f->addr;
706 if (it && it->pos == (pos | bucket) && (it->ver > 0 || skipped))
707 {
708 uint32_t new_pos = ftello(new_df);
709 if (new_pos + record_length(r) > max_data_size)
710 {
711 if (use_tmp)
712 {
713 log_warn("Bug: optimize %s into tmp %s overflow", path, tmp);
714 }
715 else
716 {
717 log_warn("optimize %s into %s overflow, ftruncate to %u", path, lastdata, new_df_orig_size);
718 fflush(new_df);
719 if (0 != ftruncate(fileno(new_df), new_df_orig_size))
720 {
721 log_error("ftruncate failed for %s old size = %u", path, new_df_orig_size);
722 }
723 rewind(new_df);
724 }
725 err = 1;
726 goto OPT_FAIL;
727 }
728
729 uint16_t hash = it->hash;
730 ht_add2(cur_tree, r->key, r->ksz, new_pos | last_bucket, hash, it->ver);
731 // append record to hint file
732 int hsize = sizeof(HintRecord) - NAME_IN_RECORD + r->ksz + 1;
733 if (hint_used + hsize > hint_size)
734 {
735 hint_size *= 2;
736 hintdata = (char*)safe_realloc(hintdata, hint_size);
737 }
738 HintRecord *hr = (HintRecord*)(hintdata + hint_used);
739 hr->ksize = r->ksz;
740 hr->pos = new_pos >> 8;
741 hr->version = it->ver;
742 hr->hash = hash;
743 safe_memcpy(hr->key, hint_size - sizeof(uint32_t) -
744 sizeof(int32_t) - sizeof(uint16_t), r->key, r->ksz + 1);
745 hint_used += hsize;
746
747 r->version = it->ver;
748 if (write_record(new_df, r) != 0)
749 {
750 log_error("write error: %s -> %d", path, last_bucket);
751 free(it);
752 free_record(&r);
753 goto OPT_FAIL;
754 }
755 }
756 else
757 {
758 if (it && it->pos == (pos | bucket) && it->ver < 0)
759 {
760 deleted++;
761 ht_add2(cur_tree, r->key, r->ksz, 0, it->hash, it->ver);
762 }
763 released++;
764 }
765 if (it) free(it);
766 p = newp;
767 free_record(&r);
768
769 mfile_dontneed(f, pos, &last_advise);
770 }
771 fseeko(new_df, 0L, SEEK_END);
772 *deleted_bytes = f->size - (ftello(new_df) - new_df_orig_size);
773
774 close_mfile(f);
775 fclose(new_df);
776
777 gettimeofday(&update_start, NULL);
778 if (bucket == last_bucket)
779 {
780 ht_set_updating_bucket(tree, bucket, cur_tree);
781 ht_visit2(cur_tree, update_items, tree);
782 mgr_unlink(lastdata);
783 mgr_rename(tmp, lastdata);
784 ht_set_updating_bucket(tree, -1, NULL);
785 }
786 else
787 {
788 if (use_tmp)
789 mgr_rename(tmp, lastdata);
790 ht_visit(cur_tree, update_items, tree);
791 mgr_unlink(path);
792 }
793 gettimeofday(&update_end, NULL);
794
795 ht_destroy(cur_tree);
796
797 if (last_bucket != bucket)
798 mgr_unlink(hintpath);
799 write_hint_file(hintdata, hint_used, lasthint_real);
800 free(hintdata);
801
802
803 gettimeofday(&opt_end, NULL);
804 float update_secs = (update_end.tv_sec - update_start.tv_sec) + (update_end.tv_usec - update_start.tv_usec) / 1e6;
805 float opt_secs = (opt_end.tv_sec - opt_start.tv_sec) + (opt_end.tv_usec - opt_start.tv_usec) / 1e6;
806 log_notice("optimize %s -> %d (%u B) complete, %d/%d records released, %d deleted, %u/%u bytes released, %d bytes broken, use %fs/%fs",
807 path, last_bucket, (last_bucket == bucket) ? old_srcdata_size : new_df_orig_size, released, nrecord, deleted, *deleted_bytes, old_srcdata_size, broken, update_secs, opt_secs);
808 return 0;
809
810 OPT_FAIL:
811 log_notice("optimize %s -> %d (%u B) failed, %d/%d records released, %d deleted, %u/%u bytes released, %d bytes broken, use %fs/%fs, err = %d",
812 path, last_bucket, (last_bucket == bucket) ? old_srcdata_size : new_df_orig_size, released, nrecord, deleted, *deleted_bytes, old_srcdata_size, broken, update_secs, opt_secs, err);
813 if (hintdata) free(hintdata);
814 if (cur_tree) ht_destroy(cur_tree);
815 if (f) close_mfile(f);
816 if (new_df) fclose(new_df);
817 if (use_tmp) mgr_unlink(tmp);
818 return err;
819 }
820