1 /** @file chert_compact.cc
2 * @brief Compact a chert database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2015 Olly Betts
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
20 */
21
22 #include <config.h>
23
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28
29 #include <algorithm>
30 #include <queue>
31
32 #include <cerrno>
33 #include <cstdio>
34
35 #include "safeunistd.h"
36
37 #include "chert_table.h"
38 #include "chert_cursor.h"
39 #include "chert_database.h"
40 #include "filetests.h"
41 #include "internaltypes.h"
42 #include "pack.h"
43 #include "backends/valuestats.h"
44
45 #include "../byte_length_strings.h"
46 #include "../prefix_compressed_strings.h"
47
48 using namespace std;
49
50 // Put all the helpers in a namespace to avoid symbols colliding with those of
51 // the same name in other flint-derived backends.
52 namespace ChertCompact {
53
54 static inline bool
is_metainfo_key(const string & key)55 is_metainfo_key(const string & key)
56 {
57 return key.size() == 1 && key[0] == '\0';
58 }
59
60 static inline bool
is_user_metadata_key(const string & key)61 is_user_metadata_key(const string & key)
62 {
63 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
64 }
65
66 static inline bool
is_valuestats_key(const string & key)67 is_valuestats_key(const string & key)
68 {
69 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
70 }
71
72 static inline bool
is_valuechunk_key(const string & key)73 is_valuechunk_key(const string & key)
74 {
75 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
76 }
77
78 static inline bool
is_doclenchunk_key(const string & key)79 is_doclenchunk_key(const string & key)
80 {
81 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
82 }
83
84 class PostlistCursor : private ChertCursor {
85 Xapian::docid offset;
86
87 public:
88 string key, tag;
89 Xapian::docid firstdid;
90 Xapian::termcount tf, cf;
91
PostlistCursor(ChertTable * in,Xapian::docid offset_)92 PostlistCursor(ChertTable *in, Xapian::docid offset_)
93 : ChertCursor(in), offset(offset_), firstdid(0)
94 {
95 find_entry(string());
96 next();
97 }
98
next()99 bool next() {
100 if (!ChertCursor::next()) return false;
101 // We put all chunks into the non-initial chunk form here, then fix up
102 // the first chunk for each term in the merged database as we merge.
103 read_tag();
104 key = current_key;
105 tag = current_tag;
106 tf = cf = 0;
107 if (is_metainfo_key(key)) return true;
108 if (is_user_metadata_key(key)) return true;
109 if (is_valuestats_key(key)) return true;
110 if (is_valuechunk_key(key)) {
111 const char * p = key.data();
112 const char * end = p + key.length();
113 p += 2;
114 Xapian::valueno slot;
115 if (!unpack_uint(&p, end, &slot))
116 throw Xapian::DatabaseCorruptError("bad value key");
117 Xapian::docid did;
118 if (!C_unpack_uint_preserving_sort(&p, end, &did))
119 throw Xapian::DatabaseCorruptError("bad value key");
120 did += offset;
121
122 key.assign("\0\xd8", 2);
123 pack_uint(key, slot);
124 C_pack_uint_preserving_sort(key, did);
125 return true;
126 }
127
128 // Adjust key if this is *NOT* an initial chunk.
129 // key is: pack_string_preserving_sort(key, tname)
130 // plus optionally: C_pack_uint_preserving_sort(key, did)
131 const char * d = key.data();
132 const char * e = d + key.size();
133 if (is_doclenchunk_key(key)) {
134 d += 2;
135 } else {
136 string tname;
137 if (!unpack_string_preserving_sort(&d, e, tname))
138 throw Xapian::DatabaseCorruptError("Bad postlist key");
139 }
140
141 if (d == e) {
142 // This is an initial chunk for a term, so adjust tag header.
143 d = tag.data();
144 e = d + tag.size();
145 if (!unpack_uint(&d, e, &tf) ||
146 !unpack_uint(&d, e, &cf) ||
147 !unpack_uint(&d, e, &firstdid)) {
148 throw Xapian::DatabaseCorruptError("Bad postlist key");
149 }
150 ++firstdid;
151 tag.erase(0, d - tag.data());
152 } else {
153 // Not an initial chunk, so adjust key.
154 size_t tmp = d - key.data();
155 if (!C_unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
156 throw Xapian::DatabaseCorruptError("Bad postlist key");
157 if (is_doclenchunk_key(key)) {
158 key.erase(tmp);
159 } else {
160 key.erase(tmp - 1);
161 }
162 }
163 firstdid += offset;
164 return true;
165 }
166 };
167
168 class PostlistCursorGt {
169 public:
170 /** Return true if and only if a's key is strictly greater than b's key.
171 */
operator ()(const PostlistCursor * a,const PostlistCursor * b) const172 bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
173 if (a->key > b->key) return true;
174 if (a->key != b->key) return false;
175 return (a->firstdid > b->firstdid);
176 }
177 };
178
179 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)180 encode_valuestats(Xapian::doccount freq,
181 const string & lbound, const string & ubound)
182 {
183 string value;
184 pack_uint(value, freq);
185 pack_string(value, lbound);
186 // We don't store or count empty values, so neither of the bounds
187 // can be empty. So we can safely store an empty upper bound when
188 // the bounds are equal.
189 if (lbound != ubound) value += ubound;
190 return value;
191 }
192
193 static void
merge_postlists(Xapian::Compactor * compactor,ChertTable * out,vector<Xapian::docid>::const_iterator offset,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e,Xapian::docid last_docid)194 merge_postlists(Xapian::Compactor * compactor,
195 ChertTable * out, vector<Xapian::docid>::const_iterator offset,
196 vector<ChertTable*>::const_iterator b,
197 vector<ChertTable*>::const_iterator e,
198 Xapian::docid last_docid)
199 {
200 Xapian::totallength tot_totlen = 0;
201 Xapian::termcount doclen_lbound = static_cast<Xapian::termcount>(-1);
202 Xapian::termcount wdf_ubound = 0;
203 Xapian::termcount doclen_ubound = 0;
204 priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
205 for ( ; b != e; ++b, ++offset) {
206 ChertTable *in = *b;
207 if (in->empty()) {
208 // Skip empty tables.
209 continue;
210 }
211
212 PostlistCursor * cur = new PostlistCursor(in, *offset);
213 // Merge the METAINFO tags from each database into one.
214 // They have a key consisting of a single zero byte.
215 // They may be absent, if the database contains no documents. If it
216 // has user metadata we'll still get here.
217 if (is_metainfo_key(cur->key)) {
218 const char * data = cur->tag.data();
219 const char * end = data + cur->tag.size();
220 Xapian::docid dummy_did = 0;
221 if (!unpack_uint(&data, end, &dummy_did)) {
222 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
223 }
224
225 Xapian::termcount doclen_lbound_tmp;
226 if (!unpack_uint(&data, end, &doclen_lbound_tmp)) {
227 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
228 }
229 doclen_lbound = min(doclen_lbound, doclen_lbound_tmp);
230
231 Xapian::termcount wdf_ubound_tmp;
232 if (!unpack_uint(&data, end, &wdf_ubound_tmp)) {
233 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
234 }
235 wdf_ubound = max(wdf_ubound, wdf_ubound_tmp);
236
237 Xapian::termcount doclen_ubound_tmp;
238 if (!unpack_uint(&data, end, &doclen_ubound_tmp)) {
239 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
240 }
241 doclen_ubound_tmp += wdf_ubound_tmp;
242 doclen_ubound = max(doclen_ubound, doclen_ubound_tmp);
243
244 Xapian::totallength totlen = 0;
245 if (!unpack_uint_last(&data, end, &totlen)) {
246 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
247 }
248 tot_totlen += totlen;
249 if (tot_totlen < totlen) {
250 throw "totlen wrapped!";
251 }
252 if (cur->next()) {
253 pq.push(cur);
254 } else {
255 delete cur;
256 }
257 } else {
258 pq.push(cur);
259 }
260 }
261
262 // Don't write the metainfo key for a totally empty database.
263 if (last_docid) {
264 if (doclen_lbound > doclen_ubound)
265 doclen_lbound = doclen_ubound;
266 string tag;
267 pack_uint(tag, last_docid);
268 pack_uint(tag, doclen_lbound);
269 pack_uint(tag, wdf_ubound);
270 pack_uint(tag, doclen_ubound - wdf_ubound);
271 pack_uint_last(tag, tot_totlen);
272 out->add(string(1, '\0'), tag);
273 }
274
275 string last_key;
276 {
277 // Merge user metadata.
278 vector<string> tags;
279 while (!pq.empty()) {
280 PostlistCursor * cur = pq.top();
281 const string& key = cur->key;
282 if (!is_user_metadata_key(key)) break;
283
284 if (key != last_key) {
285 if (!tags.empty()) {
286 if (tags.size() > 1 && compactor) {
287 Assert(!last_key.empty());
288 // FIXME: It would be better to merge all duplicates
289 // for a key in one call, but currently we don't in
290 // multipass mode.
291 const string & resolved_tag =
292 compactor->resolve_duplicate_metadata(last_key,
293 tags.size(),
294 &tags[0]);
295 if (!resolved_tag.empty())
296 out->add(last_key, resolved_tag);
297 } else {
298 Assert(!last_key.empty());
299 out->add(last_key, tags[0]);
300 }
301 tags.resize(0);
302 }
303 last_key = key;
304 }
305 tags.push_back(cur->tag);
306
307 pq.pop();
308 if (cur->next()) {
309 pq.push(cur);
310 } else {
311 delete cur;
312 }
313 }
314 if (!tags.empty()) {
315 if (tags.size() > 1 && compactor) {
316 Assert(!last_key.empty());
317 const string & resolved_tag =
318 compactor->resolve_duplicate_metadata(last_key,
319 tags.size(),
320 &tags[0]);
321 if (!resolved_tag.empty())
322 out->add(last_key, resolved_tag);
323 } else {
324 Assert(!last_key.empty());
325 out->add(last_key, tags[0]);
326 }
327 }
328 }
329
330 {
331 // Merge valuestats.
332 Xapian::doccount freq = 0;
333 string lbound, ubound;
334
335 while (!pq.empty()) {
336 PostlistCursor * cur = pq.top();
337 const string& key = cur->key;
338 if (!is_valuestats_key(key)) break;
339 if (key != last_key) {
340 // For the first valuestats key, last_key will be the previous
341 // key we wrote, which we don't want to overwrite. This is the
342 // only time that freq will be 0, so check that.
343 if (freq) {
344 out->add(last_key, encode_valuestats(freq, lbound, ubound));
345 freq = 0;
346 }
347 last_key = key;
348 }
349
350 const string & tag = cur->tag;
351
352 const char * pos = tag.data();
353 const char * end = pos + tag.size();
354
355 Xapian::doccount f;
356 string l, u;
357 if (!unpack_uint(&pos, end, &f)) {
358 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
359 throw Xapian::RangeError("Frequency statistic in value table is too large");
360 }
361 if (!unpack_string(&pos, end, l)) {
362 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
363 throw Xapian::RangeError("Lower bound in value table is too large");
364 }
365 size_t len = end - pos;
366 if (len == 0) {
367 u = l;
368 } else {
369 u.assign(pos, len);
370 }
371 if (freq == 0) {
372 freq = f;
373 lbound = l;
374 ubound = u;
375 } else {
376 freq += f;
377 if (l < lbound) lbound = l;
378 if (u > ubound) ubound = u;
379 }
380
381 pq.pop();
382 if (cur->next()) {
383 pq.push(cur);
384 } else {
385 delete cur;
386 }
387 }
388
389 if (freq) {
390 out->add(last_key, encode_valuestats(freq, lbound, ubound));
391 }
392 }
393
394 // Merge valuestream chunks.
395 while (!pq.empty()) {
396 PostlistCursor * cur = pq.top();
397 const string & key = cur->key;
398 if (!is_valuechunk_key(key)) break;
399 Assert(!is_user_metadata_key(key));
400 out->add(key, cur->tag);
401 pq.pop();
402 if (cur->next()) {
403 pq.push(cur);
404 } else {
405 delete cur;
406 }
407 }
408
409 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
410 vector<pair<Xapian::docid, string> > tags;
411 while (true) {
412 PostlistCursor * cur = NULL;
413 if (!pq.empty()) {
414 cur = pq.top();
415 pq.pop();
416 }
417 Assert(cur == NULL || !is_user_metadata_key(cur->key));
418 if (cur == NULL || cur->key != last_key) {
419 if (!tags.empty()) {
420 string first_tag;
421 pack_uint(first_tag, tf);
422 pack_uint(first_tag, cf);
423 pack_uint(first_tag, tags[0].first - 1);
424 string tag = tags[0].second;
425 tag[0] = (tags.size() == 1) ? '1' : '0';
426 first_tag += tag;
427 out->add(last_key, first_tag);
428
429 string term;
430 if (!is_doclenchunk_key(last_key)) {
431 const char * p = last_key.data();
432 const char * end = p + last_key.size();
433 if (!unpack_string_preserving_sort(&p, end, term) || p != end)
434 throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
435 }
436
437 vector<pair<Xapian::docid, string> >::const_iterator i;
438 i = tags.begin();
439 while (++i != tags.end()) {
440 tag = i->second;
441 tag[0] = (i + 1 == tags.end()) ? '1' : '0';
442 out->add(pack_chert_postlist_key(term, i->first), tag);
443 }
444 }
445 tags.clear();
446 if (cur == NULL) break;
447 tf = cf = 0;
448 last_key = cur->key;
449 }
450 tf += cur->tf;
451 cf += cur->cf;
452 tags.push_back(make_pair(cur->firstdid, cur->tag));
453 if (cur->next()) {
454 pq.push(cur);
455 } else {
456 delete cur;
457 }
458 }
459 }
460
461 struct MergeCursor : public ChertCursor {
MergeCursorChertCompact::MergeCursor462 explicit MergeCursor(ChertTable *in) : ChertCursor(in) {
463 find_entry(string());
464 next();
465 }
466 };
467
468 struct CursorGt {
469 /// Return true if and only if a's key is strictly greater than b's key.
operator ()ChertCompact::CursorGt470 bool operator()(const ChertCursor *a, const ChertCursor *b) const {
471 if (b->after_end()) return false;
472 if (a->after_end()) return true;
473 return (a->current_key > b->current_key);
474 }
475 };
476
477 static void
merge_spellings(ChertTable * out,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e)478 merge_spellings(ChertTable * out,
479 vector<ChertTable*>::const_iterator b,
480 vector<ChertTable*>::const_iterator e)
481 {
482 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
483 for ( ; b != e; ++b) {
484 ChertTable *in = *b;
485 if (!in->empty()) {
486 pq.push(new MergeCursor(in));
487 }
488 }
489
490 while (!pq.empty()) {
491 MergeCursor * cur = pq.top();
492 pq.pop();
493
494 string key = cur->current_key;
495 if (pq.empty() || pq.top()->current_key > key) {
496 // No need to merge the tags, just copy the (possibly compressed)
497 // tag value.
498 bool compressed = cur->read_tag(true);
499 out->add(key, cur->current_tag, compressed);
500 if (cur->next()) {
501 pq.push(cur);
502 } else {
503 delete cur;
504 }
505 continue;
506 }
507
508 // Merge tag values with the same key:
509 string tag;
510 if (key[0] != 'W') {
511 // We just want the union of words, so copy over the first instance
512 // and skip any identical ones.
513 priority_queue<PrefixCompressedStringItor *,
514 vector<PrefixCompressedStringItor *>,
515 PrefixCompressedStringItorGt> pqtag;
516 // Stick all the MergeCursor pointers in a vector because their
517 // current_tag members must remain valid while we're merging their
518 // tags, but we need to call next() on them all afterwards.
519 vector<MergeCursor *> vec;
520 vec.reserve(pq.size());
521
522 while (true) {
523 cur->read_tag();
524 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
525 vec.push_back(cur);
526 if (pq.empty() || pq.top()->current_key != key) break;
527 cur = pq.top();
528 pq.pop();
529 }
530
531 PrefixCompressedStringWriter wr(tag);
532 string lastword;
533 while (!pqtag.empty()) {
534 PrefixCompressedStringItor * it = pqtag.top();
535 pqtag.pop();
536 string word = **it;
537 if (word != lastword) {
538 lastword = word;
539 wr.append(lastword);
540 }
541 ++*it;
542 if (!it->at_end()) {
543 pqtag.push(it);
544 } else {
545 delete it;
546 }
547 }
548
549 vector<MergeCursor *>::const_iterator i;
550 for (i = vec.begin(); i != vec.end(); ++i) {
551 cur = *i;
552 if (cur->next()) {
553 pq.push(cur);
554 } else {
555 delete cur;
556 }
557 }
558 } else {
559 // We want to sum the frequencies from tags for the same key.
560 Xapian::termcount tot_freq = 0;
561 while (true) {
562 cur->read_tag();
563 Xapian::termcount freq;
564 const char * p = cur->current_tag.data();
565 const char * end = p + cur->current_tag.size();
566 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
567 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
568 }
569 tot_freq += freq;
570 if (cur->next()) {
571 pq.push(cur);
572 } else {
573 delete cur;
574 }
575 if (pq.empty() || pq.top()->current_key != key) break;
576 cur = pq.top();
577 pq.pop();
578 }
579 tag.resize(0);
580 pack_uint_last(tag, tot_freq);
581 }
582 out->add(key, tag);
583 }
584 }
585
586 static void
merge_synonyms(ChertTable * out,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e)587 merge_synonyms(ChertTable * out,
588 vector<ChertTable*>::const_iterator b,
589 vector<ChertTable*>::const_iterator e)
590 {
591 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
592 for ( ; b != e; ++b) {
593 ChertTable *in = *b;
594 if (!in->empty()) {
595 pq.push(new MergeCursor(in));
596 }
597 }
598
599 while (!pq.empty()) {
600 MergeCursor * cur = pq.top();
601 pq.pop();
602
603 string key = cur->current_key;
604 if (pq.empty() || pq.top()->current_key > key) {
605 // No need to merge the tags, just copy the (possibly compressed)
606 // tag value.
607 bool compressed = cur->read_tag(true);
608 out->add(key, cur->current_tag, compressed);
609 if (cur->next()) {
610 pq.push(cur);
611 } else {
612 delete cur;
613 }
614 continue;
615 }
616
617 // Merge tag values with the same key:
618 string tag;
619
620 // We just want the union of words, so copy over the first instance
621 // and skip any identical ones.
622 priority_queue<ByteLengthPrefixedStringItor *,
623 vector<ByteLengthPrefixedStringItor *>,
624 ByteLengthPrefixedStringItorGt> pqtag;
625 vector<MergeCursor *> vec;
626
627 while (true) {
628 cur->read_tag();
629 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
630 vec.push_back(cur);
631 if (pq.empty() || pq.top()->current_key != key) break;
632 cur = pq.top();
633 pq.pop();
634 }
635
636 string lastword;
637 while (!pqtag.empty()) {
638 ByteLengthPrefixedStringItor * it = pqtag.top();
639 pqtag.pop();
640 if (**it != lastword) {
641 lastword = **it;
642 tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
643 tag += lastword;
644 }
645 ++*it;
646 if (!it->at_end()) {
647 pqtag.push(it);
648 } else {
649 delete it;
650 }
651 }
652
653 vector<MergeCursor *>::const_iterator i;
654 for (i = vec.begin(); i != vec.end(); ++i) {
655 cur = *i;
656 if (cur->next()) {
657 pq.push(cur);
658 } else {
659 delete cur;
660 }
661 }
662
663 out->add(key, tag);
664 }
665 }
666
667 static void
multimerge_postlists(Xapian::Compactor * compactor,ChertTable * out,const char * tmpdir,vector<ChertTable * > tmp,vector<Xapian::docid> off,Xapian::docid last_docid)668 multimerge_postlists(Xapian::Compactor * compactor,
669 ChertTable * out, const char * tmpdir,
670 vector<ChertTable *> tmp,
671 vector<Xapian::docid> off,
672 Xapian::docid last_docid)
673 {
674 unsigned int c = 0;
675 while (tmp.size() > 3) {
676 vector<ChertTable *> tmpout;
677 tmpout.reserve(tmp.size() / 2);
678 vector<Xapian::docid> newoff;
679 newoff.resize(tmp.size() / 2);
680 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
681 j = i + 2;
682 if (j == tmp.size() - 1) ++j;
683
684 string dest = tmpdir;
685 char buf[64];
686 sprintf(buf, "/tmp%u_%u.", c, i / 2);
687 dest += buf;
688
689 // Don't compress temporary tables, even if the final table would
690 // be.
691 ChertTable * tmptab = new ChertTable("postlist", dest, false);
692 // Use maximum blocksize for temporary tables.
693 tmptab->create_and_open(65536);
694
695 merge_postlists(compactor, tmptab, off.begin() + i,
696 tmp.begin() + i, tmp.begin() + j,
697 last_docid);
698 if (c > 0) {
699 for (unsigned int k = i; k < j; ++k) {
700 unlink((tmp[k]->get_path() + "DB").c_str());
701 unlink((tmp[k]->get_path() + "baseA").c_str());
702 unlink((tmp[k]->get_path() + "baseB").c_str());
703 delete tmp[k];
704 tmp[k] = NULL;
705 }
706 }
707 tmpout.push_back(tmptab);
708 tmptab->flush_db();
709 tmptab->commit(1);
710 }
711 swap(tmp, tmpout);
712 swap(off, newoff);
713 ++c;
714 }
715 merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end(),
716 last_docid);
717 if (c > 0) {
718 for (size_t k = 0; k < tmp.size(); ++k) {
719 unlink((tmp[k]->get_path() + "DB").c_str());
720 unlink((tmp[k]->get_path() + "baseA").c_str());
721 unlink((tmp[k]->get_path() + "baseB").c_str());
722 delete tmp[k];
723 tmp[k] = NULL;
724 }
725 }
726 }
727
728 static void
merge_docid_keyed(ChertTable * out,const vector<ChertTable * > & inputs,const vector<Xapian::docid> & offset)729 merge_docid_keyed(ChertTable *out, const vector<ChertTable*> & inputs,
730 const vector<Xapian::docid> & offset)
731 {
732 for (size_t i = 0; i < inputs.size(); ++i) {
733 Xapian::docid off = offset[i];
734
735 ChertTable * in = inputs[i];
736 if (in->empty()) continue;
737
738 ChertCursor cur(in);
739 cur.find_entry(string());
740
741 string key;
742 while (cur.next()) {
743 // Adjust the key if this isn't the first database.
744 if (off) {
745 Xapian::docid did;
746 const char * d = cur.current_key.data();
747 const char * e = d + cur.current_key.size();
748 if (!C_unpack_uint_preserving_sort(&d, e, &did)) {
749 string msg = "Bad key in ";
750 msg += inputs[i]->get_path();
751 msg += "DB";
752 throw Xapian::DatabaseCorruptError(msg);
753 }
754 did += off;
755 key.resize(0);
756 C_pack_uint_preserving_sort(key, did);
757 if (d != e) {
758 // Copy over the termname for the position table.
759 key.append(d, e - d);
760 }
761 } else {
762 key = cur.current_key;
763 }
764 bool compressed = cur.read_tag(true);
765 out->add(key, cur.current_tag, compressed);
766 }
767 }
768 }
769
770 }
771
772 using namespace ChertCompact;
773
774 void
compact(Xapian::Compactor * compactor,const char * destdir,const vector<Xapian::Database::Internal * > & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,unsigned flags,Xapian::docid last_docid)775 ChertDatabase::compact(Xapian::Compactor * compactor,
776 const char * destdir,
777 const vector<Xapian::Database::Internal*> & sources,
778 const vector<Xapian::docid> & offset,
779 size_t block_size,
780 Xapian::Compactor::compaction_level compaction,
781 unsigned flags,
782 Xapian::docid last_docid)
783 {
784 enum table_type {
785 POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
786 };
787 struct table_list {
788 // The "base name" of the table.
789 const char * name;
790 // The type.
791 table_type type;
792 // zlib compression strategy to use on tags.
793 int compress_strategy;
794 // Create tables after position lazily.
795 bool lazy;
796 };
797
798 static const table_list tables[] = {
799 // name type compress_strategy lazy
800 { "postlist", POSTLIST, DONT_COMPRESS, false },
801 { "record", RECORD, Z_DEFAULT_STRATEGY, false },
802 { "termlist", TERMLIST, Z_DEFAULT_STRATEGY, false },
803 { "position", POSITION, DONT_COMPRESS, true },
804 { "spelling", SPELLING, Z_DEFAULT_STRATEGY, true },
805 { "synonym", SYNONYM, Z_DEFAULT_STRATEGY, true }
806 };
807 const table_list * tables_end = tables +
808 (sizeof(tables) / sizeof(tables[0]));
809
810 bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
811 bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
812 if (single_file) {
813 throw Xapian::InvalidOperationError("chert doesn't support single file databases");
814 }
815
816 for (size_t i = 0; i != sources.size(); ++i) {
817 ChertDatabase * db = static_cast<ChertDatabase*>(sources[i]);
818 if (db->has_uncommitted_changes()) {
819 const char * m =
820 "Can't compact from a WritableDatabase with uncommitted "
821 "changes - either call commit() first, or create a new "
822 "Database object from the filename on disk";
823 throw Xapian::InvalidOperationError(m);
824 }
825 }
826
827 if (block_size < 2048 || block_size > 65536 ||
828 (block_size & (block_size - 1)) != 0) {
829 block_size = CHERT_DEFAULT_BLOCK_SIZE;
830 }
831
832 FlintLock lock(destdir);
833 {
834 string explanation;
835 FlintLock::reason why = lock.lock(true, false, explanation);
836 if (why != FlintLock::SUCCESS) {
837 lock.throw_databaselockerror(why, destdir, explanation);
838 }
839 }
840
841 vector<ChertTable *> tabs;
842 tabs.reserve(tables_end - tables);
843 for (const table_list * t = tables; t < tables_end; ++t) {
844 // The postlist table requires an N-way merge, adjusting the
845 // headers of various blocks. The spelling and synonym tables also
846 // need special handling. The other tables have keys sorted in
847 // docid order, so we can merge them by simply copying all the keys
848 // from each source table in turn.
849 if (compactor)
850 compactor->set_status(t->name, string());
851
852 string dest = destdir;
853 dest += '/';
854 dest += t->name;
855 dest += '.';
856
857 bool output_will_exist = !t->lazy;
858
859 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
860 // on certain systems).
861 bool bad_stat = false;
862
863 off_t in_size = 0;
864
865 vector<ChertTable*> inputs;
866 inputs.reserve(sources.size());
867 size_t inputs_present = 0;
868 for (auto src : sources) {
869 ChertDatabase * db = static_cast<ChertDatabase*>(src);
870 ChertTable * table;
871 switch (t->type) {
872 case POSTLIST:
873 table = &(db->postlist_table);
874 break;
875 case RECORD:
876 table = &(db->record_table);
877 break;
878 case TERMLIST:
879 table = &(db->termlist_table);
880 break;
881 case POSITION:
882 table = &(db->position_table);
883 break;
884 case SPELLING:
885 table = &(db->spelling_table);
886 break;
887 case SYNONYM:
888 table = &(db->synonym_table);
889 break;
890 default:
891 Assert(false);
892 return;
893 }
894
895 off_t db_size = file_size(table->get_path() + "DB");
896 if (errno == 0) {
897 in_size += db_size / 1024;
898 output_will_exist = true;
899 ++inputs_present;
900 } else if (errno != ENOENT) {
901 // We get ENOENT for an optional table.
902 bad_stat = true;
903 output_will_exist = true;
904 ++inputs_present;
905 }
906 inputs.push_back(table);
907 }
908
909 // If any inputs lack a termlist table, suppress it in the output.
910 if (t->type == TERMLIST && inputs_present != sources.size()) {
911 if (inputs_present != 0) {
912 if (compactor) {
913 string m = str(inputs_present);
914 m += " of ";
915 m += str(sources.size());
916 m += " inputs present, so suppressing output";
917 compactor->set_status(t->name, m);
918 }
919 continue;
920 }
921 output_will_exist = false;
922 }
923
924 if (!output_will_exist) {
925 if (compactor)
926 compactor->set_status(t->name, "doesn't exist");
927 continue;
928 }
929
930 ChertTable out(t->name, dest, false, t->compress_strategy, t->lazy);
931 if (!t->lazy) {
932 out.create_and_open(block_size);
933 } else {
934 out.erase();
935 out.set_block_size(block_size);
936 }
937
938 out.set_full_compaction(compaction != compactor->STANDARD);
939 if (compaction == compactor->FULLER) out.set_max_item_size(1);
940
941 switch (t->type) {
942 case POSTLIST:
943 if (multipass && inputs.size() > 3) {
944 multimerge_postlists(compactor, &out, destdir, inputs,
945 offset, last_docid);
946 } else {
947 merge_postlists(compactor, &out, offset.begin(),
948 inputs.begin(), inputs.end(),
949 last_docid);
950 }
951 break;
952 case SPELLING:
953 merge_spellings(&out, inputs.begin(), inputs.end());
954 break;
955 case SYNONYM:
956 merge_synonyms(&out, inputs.begin(), inputs.end());
957 break;
958 default:
959 // Position, Record, Termlist
960 merge_docid_keyed(&out, inputs, offset);
961 break;
962 }
963
964 // Commit as revision 1.
965 out.flush_db();
966 out.commit(1);
967
968 off_t out_size = 0;
969 if (!bad_stat) {
970 off_t db_size = file_size(dest + "DB");
971 if (errno == 0) {
972 out_size = db_size / 1024;
973 } else {
974 bad_stat = (errno != ENOENT);
975 }
976 }
977 if (bad_stat) {
978 if (compactor)
979 compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
980 } else {
981 string status;
982 if (out_size == in_size) {
983 status = "Size unchanged (";
984 } else {
985 off_t delta;
986 if (out_size < in_size) {
987 delta = in_size - out_size;
988 status = "Reduced by ";
989 } else {
990 delta = out_size - in_size;
991 status = "INCREASED by ";
992 }
993 if (in_size) {
994 status += str(100 * delta / in_size);
995 status += "% ";
996 }
997 status += str(delta);
998 status += "K (";
999 status += str(in_size);
1000 status += "K -> ";
1001 }
1002 status += str(out_size);
1003 status += "K)";
1004 if (compactor)
1005 compactor->set_status(t->name, status);
1006 }
1007 }
1008 }
1009