1 /** @file chert_compact.cc
2 * @brief Compact a chert database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2013,2015 Olly Betts
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
20 */
21
22 #include <config.h>
23
24 #include <xapian/compactor.h>
25
26 #include <algorithm>
27 #include <queue>
28
29 #include <cstdio>
30
31 #include "safeerrno.h"
32 #include <sys/types.h>
33 #include "safesysstat.h"
34 #include "safeunistd.h"
35
36 #include "chert_table.h"
37 #include "chert_compact.h"
38 #include "chert_cursor.h"
39 #include "chert_database.h"
40 #include "internaltypes.h"
41 #include "noreturn.h"
42 #include "pack.h"
43 #include "utils.h"
44 #include "valuestats.h"
45
46 #include "../byte_length_strings.h"
47 #include "../prefix_compressed_strings.h"
48 #include <xapian.h>
49
50 using namespace std;
51
52 XAPIAN_NORETURN(
53 static void failed_to_open_at_rev(string, chert_revision_number_t));
54 static void
failed_to_open_at_rev(string m,chert_revision_number_t rev)55 failed_to_open_at_rev(string m, chert_revision_number_t rev)
56 {
57 m += ": Couldn't open at revision ";
58 m += str(rev);
59 throw Xapian::DatabaseError(m);
60 }
61
62 // Put all the helpers in a namespace to avoid symbols colliding with those of
63 // the same name in xapian-compact-flint.cc.
64 namespace ChertCompact {
65
66 static inline bool
is_metainfo_key(const string & key)67 is_metainfo_key(const string & key)
68 {
69 return key.size() == 1 && key[0] == '\0';
70 }
71
72 static inline bool
is_user_metadata_key(const string & key)73 is_user_metadata_key(const string & key)
74 {
75 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
76 }
77
78 static inline bool
is_valuestats_key(const string & key)79 is_valuestats_key(const string & key)
80 {
81 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
82 }
83
84 static inline bool
is_valuechunk_key(const string & key)85 is_valuechunk_key(const string & key)
86 {
87 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
88 }
89
90 static inline bool
is_doclenchunk_key(const string & key)91 is_doclenchunk_key(const string & key)
92 {
93 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
94 }
95
96 class PostlistCursor : private ChertCursor {
97 Xapian::docid offset;
98
99 public:
100 string key, tag;
101 Xapian::docid firstdid;
102 Xapian::termcount tf, cf;
103
PostlistCursor(ChertTable * in,Xapian::docid offset_)104 PostlistCursor(ChertTable *in, Xapian::docid offset_)
105 : ChertCursor(in), offset(offset_), firstdid(0)
106 {
107 find_entry(string());
108 next();
109 }
110
~PostlistCursor()111 ~PostlistCursor()
112 {
113 delete ChertCursor::get_table();
114 }
115
next()116 bool next() {
117 if (!ChertCursor::next()) return false;
118 // We put all chunks into the non-initial chunk form here, then fix up
119 // the first chunk for each term in the merged database as we merge.
120 read_tag();
121 key = current_key;
122 tag = current_tag;
123 tf = cf = 0;
124 if (is_metainfo_key(key)) return true;
125 if (is_user_metadata_key(key)) return true;
126 if (is_valuestats_key(key)) return true;
127 if (is_valuechunk_key(key)) {
128 const char * p = key.data();
129 const char * end = p + key.length();
130 p += 2;
131 Xapian::valueno slot;
132 if (!unpack_uint(&p, end, &slot))
133 throw Xapian::DatabaseCorruptError("bad value key");
134 Xapian::docid did;
135 if (!unpack_uint_preserving_sort(&p, end, &did))
136 throw Xapian::DatabaseCorruptError("bad value key");
137 did += offset;
138
139 key.assign("\0\xd8", 2);
140 pack_uint(key, slot);
141 pack_uint_preserving_sort(key, did);
142 return true;
143 }
144
145 // Adjust key if this is *NOT* an initial chunk.
146 // key is: pack_string_preserving_sort(key, tname)
147 // plus optionally: pack_uint_preserving_sort(key, did)
148 const char * d = key.data();
149 const char * e = d + key.size();
150 if (is_doclenchunk_key(key)) {
151 d += 2;
152 } else {
153 string tname;
154 if (!unpack_string_preserving_sort(&d, e, tname))
155 throw Xapian::DatabaseCorruptError("Bad postlist key");
156 }
157
158 if (d == e) {
159 // This is an initial chunk for a term, so adjust tag header.
160 d = tag.data();
161 e = d + tag.size();
162 if (!unpack_uint(&d, e, &tf) ||
163 !unpack_uint(&d, e, &cf) ||
164 !unpack_uint(&d, e, &firstdid)) {
165 throw Xapian::DatabaseCorruptError("Bad postlist key");
166 }
167 ++firstdid;
168 tag.erase(0, d - tag.data());
169 } else {
170 // Not an initial chunk, so adjust key.
171 size_t tmp = d - key.data();
172 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
173 throw Xapian::DatabaseCorruptError("Bad postlist key");
174 if (is_doclenchunk_key(key)) {
175 key.erase(tmp);
176 } else {
177 key.erase(tmp - 1);
178 }
179 }
180 firstdid += offset;
181 return true;
182 }
183 };
184
185 class PostlistCursorGt {
186 public:
187 /** Return true if and only if a's key is strictly greater than b's key.
188 */
operator ()(const PostlistCursor * a,const PostlistCursor * b)189 bool operator()(const PostlistCursor *a, const PostlistCursor *b) {
190 if (a->key > b->key) return true;
191 if (a->key != b->key) return false;
192 return (a->firstdid > b->firstdid);
193 }
194 };
195
196 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)197 encode_valuestats(Xapian::doccount freq,
198 const string & lbound, const string & ubound)
199 {
200 string value;
201 pack_uint(value, freq);
202 pack_string(value, lbound);
203 // We don't store or count empty values, so neither of the bounds
204 // can be empty. So we can safely store an empty upper bound when
205 // the bounds are equal.
206 if (lbound != ubound) value += ubound;
207 return value;
208 }
209
210 static void
merge_postlists(Xapian::Compactor & compactor,ChertTable * out,vector<Xapian::docid>::const_iterator offset,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev,Xapian::docid last_docid)211 merge_postlists(Xapian::Compactor & compactor,
212 ChertTable * out, vector<Xapian::docid>::const_iterator offset,
213 vector<string>::const_iterator b,
214 vector<string>::const_iterator e,
215 vector<chert_revision_number_t>::const_iterator rev,
216 Xapian::docid last_docid)
217 {
218 totlen_t tot_totlen = 0;
219 Xapian::termcount doclen_lbound = static_cast<Xapian::termcount>(-1);
220 Xapian::termcount wdf_ubound = 0;
221 Xapian::termcount doclen_ubound = 0;
222 priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
223 for ( ; b != e; ++b, ++offset, ++rev) {
224 ChertTable *in = new ChertTable("postlist", *b, true);
225 if (!in->open(*rev)) {
226 failed_to_open_at_rev(*b, *rev);
227 }
228 if (in->empty()) {
229 // Skip empty tables.
230 delete in;
231 continue;
232 }
233
234 // PostlistCursor takes ownership of ChertTable in and is
235 // responsible for deleting it.
236 PostlistCursor * cur = new PostlistCursor(in, *offset);
237 // Merge the METAINFO tags from each database into one.
238 // They have a key consisting of a single zero byte.
239 // They may be absent, if the database contains no documents. If it
240 // has user metadata we'll still get here.
241 if (is_metainfo_key(cur->key)) {
242 const char * data = cur->tag.data();
243 const char * end = data + cur->tag.size();
244 Xapian::docid dummy_did = 0;
245 if (!unpack_uint(&data, end, &dummy_did)) {
246 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
247 }
248
249 Xapian::termcount doclen_lbound_tmp;
250 if (!unpack_uint(&data, end, &doclen_lbound_tmp)) {
251 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
252 }
253 doclen_lbound = min(doclen_lbound, doclen_lbound_tmp);
254
255 Xapian::termcount wdf_ubound_tmp;
256 if (!unpack_uint(&data, end, &wdf_ubound_tmp)) {
257 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
258 }
259 wdf_ubound = max(wdf_ubound, wdf_ubound_tmp);
260
261 Xapian::termcount doclen_ubound_tmp;
262 if (!unpack_uint(&data, end, &doclen_ubound_tmp)) {
263 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
264 }
265 doclen_ubound_tmp += wdf_ubound_tmp;
266 doclen_ubound = max(doclen_ubound, doclen_ubound_tmp);
267
268 totlen_t totlen = 0;
269 if (!unpack_uint_last(&data, end, &totlen)) {
270 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
271 }
272 tot_totlen += totlen;
273 if (tot_totlen < totlen) {
274 throw "totlen wrapped!";
275 }
276 if (cur->next()) {
277 pq.push(cur);
278 } else {
279 delete cur;
280 }
281 } else {
282 pq.push(cur);
283 }
284 }
285
286 // Don't write the metainfo key for a totally empty database.
287 if (last_docid) {
288 if (doclen_lbound > doclen_ubound)
289 doclen_lbound = doclen_ubound;
290 string tag;
291 pack_uint(tag, last_docid);
292 pack_uint(tag, doclen_lbound);
293 pack_uint(tag, wdf_ubound);
294 pack_uint(tag, doclen_ubound - wdf_ubound);
295 pack_uint_last(tag, tot_totlen);
296 out->add(string(1, '\0'), tag);
297 }
298
299 string last_key;
300 {
301 // Merge user metadata.
302 vector<string> tags;
303 while (!pq.empty()) {
304 PostlistCursor * cur = pq.top();
305 const string& key = cur->key;
306 if (!is_user_metadata_key(key)) break;
307
308 if (key != last_key) {
309 if (tags.size() > 1) {
310 Assert(!last_key.empty());
311 // FIXME: It would be better to merge all duplicates for a
312 // key in one call, but currently we don't in multipass
313 // mode.
314 out->add(last_key,
315 compactor.resolve_duplicate_metadata(last_key,
316 tags.size(),
317 &tags[0]));
318 } else if (tags.size() == 1) {
319 Assert(!last_key.empty());
320 out->add(last_key, tags[0]);
321 }
322 tags.resize(0);
323 last_key = key;
324 }
325 tags.push_back(cur->tag);
326
327 pq.pop();
328 if (cur->next()) {
329 pq.push(cur);
330 } else {
331 delete cur;
332 }
333 }
334 if (tags.size() > 1) {
335 Assert(!last_key.empty());
336 out->add(last_key,
337 compactor.resolve_duplicate_metadata(last_key,
338 tags.size(),
339 &tags[0]));
340 } else if (tags.size() == 1) {
341 Assert(!last_key.empty());
342 out->add(last_key, tags[0]);
343 }
344 }
345
346 {
347 // Merge valuestats.
348 Xapian::doccount freq = 0;
349 string lbound, ubound;
350
351 while (!pq.empty()) {
352 PostlistCursor * cur = pq.top();
353 const string& key = cur->key;
354 if (!is_valuestats_key(key)) break;
355 if (key != last_key) {
356 // For the first valuestats key, last_key will be the previous
357 // key we wrote, which we don't want to overwrite. This is the
358 // only time that freq will be 0, so check that.
359 if (freq) {
360 out->add(last_key, encode_valuestats(freq, lbound, ubound));
361 freq = 0;
362 }
363 last_key = key;
364 }
365
366 const string & tag = cur->tag;
367
368 const char * pos = tag.data();
369 const char * end = pos + tag.size();
370
371 Xapian::doccount f;
372 string l, u;
373 if (!unpack_uint(&pos, end, &f)) {
374 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
375 throw Xapian::RangeError("Frequency statistic in value table is too large");
376 }
377 if (!unpack_string(&pos, end, l)) {
378 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
379 throw Xapian::RangeError("Lower bound in value table is too large");
380 }
381 size_t len = end - pos;
382 if (len == 0) {
383 u = l;
384 } else {
385 u.assign(pos, len);
386 }
387 if (freq == 0) {
388 freq = f;
389 lbound = l;
390 ubound = u;
391 } else {
392 freq += f;
393 if (l < lbound) lbound = l;
394 if (u > ubound) ubound = u;
395 }
396
397 pq.pop();
398 if (cur->next()) {
399 pq.push(cur);
400 } else {
401 delete cur;
402 }
403 }
404
405 if (freq) {
406 out->add(last_key, encode_valuestats(freq, lbound, ubound));
407 }
408 }
409
410 // Merge valuestream chunks.
411 while (!pq.empty()) {
412 PostlistCursor * cur = pq.top();
413 const string & key = cur->key;
414 if (!is_valuechunk_key(key)) break;
415 Assert(!is_user_metadata_key(key));
416 out->add(key, cur->tag);
417 pq.pop();
418 if (cur->next()) {
419 pq.push(cur);
420 } else {
421 delete cur;
422 }
423 }
424
425 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
426 vector<pair<Xapian::docid, string> > tags;
427 while (true) {
428 PostlistCursor * cur = NULL;
429 if (!pq.empty()) {
430 cur = pq.top();
431 pq.pop();
432 }
433 Assert(cur == NULL || !is_user_metadata_key(cur->key));
434 if (cur == NULL || cur->key != last_key) {
435 if (!tags.empty()) {
436 string first_tag;
437 pack_uint(first_tag, tf);
438 pack_uint(first_tag, cf);
439 pack_uint(first_tag, tags[0].first - 1);
440 string tag = tags[0].second;
441 tag[0] = (tags.size() == 1) ? '1' : '0';
442 first_tag += tag;
443 out->add(last_key, first_tag);
444
445 string term;
446 if (!is_doclenchunk_key(last_key)) {
447 const char * p = last_key.data();
448 const char * end = p + last_key.size();
449 if (!unpack_string_preserving_sort(&p, end, term) || p != end)
450 throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
451 }
452
453 vector<pair<Xapian::docid, string> >::const_iterator i;
454 i = tags.begin();
455 while (++i != tags.end()) {
456 tag = i->second;
457 tag[0] = (i + 1 == tags.end()) ? '1' : '0';
458 out->add(pack_chert_postlist_key(term, i->first), tag);
459 }
460 }
461 tags.clear();
462 if (cur == NULL) break;
463 tf = cf = 0;
464 last_key = cur->key;
465 }
466 tf += cur->tf;
467 cf += cur->cf;
468 tags.push_back(make_pair(cur->firstdid, cur->tag));
469 if (cur->next()) {
470 pq.push(cur);
471 } else {
472 delete cur;
473 }
474 }
475 }
476
477 struct MergeCursor : public ChertCursor {
MergeCursorChertCompact::MergeCursor478 MergeCursor(ChertTable *in) : ChertCursor(in) {
479 find_entry(string());
480 next();
481 }
482
~MergeCursorChertCompact::MergeCursor483 ~MergeCursor() {
484 delete ChertCursor::get_table();
485 }
486 };
487
488 struct CursorGt {
489 /// Return true if and only if a's key is strictly greater than b's key.
operator ()ChertCompact::CursorGt490 bool operator()(const ChertCursor *a, const ChertCursor *b) {
491 if (b->after_end()) return false;
492 if (a->after_end()) return true;
493 return (a->current_key > b->current_key);
494 }
495 };
496
497 static void
merge_spellings(ChertTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev)498 merge_spellings(ChertTable * out,
499 vector<string>::const_iterator b,
500 vector<string>::const_iterator e,
501 vector<chert_revision_number_t>::const_iterator rev)
502 {
503 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
504 for ( ; b != e; ++b, ++rev) {
505 ChertTable *in = new ChertTable("spelling", *b, true, DONT_COMPRESS, true);
506 if (!in->open(*rev)) {
507 failed_to_open_at_rev(*b, *rev);
508 }
509 if (!in->empty()) {
510 // The MergeCursor takes ownership of ChertTable in and is
511 // responsible for deleting it.
512 pq.push(new MergeCursor(in));
513 } else {
514 delete in;
515 }
516 }
517
518 while (!pq.empty()) {
519 MergeCursor * cur = pq.top();
520 pq.pop();
521
522 string key = cur->current_key;
523 if (pq.empty() || pq.top()->current_key > key) {
524 // No need to merge the tags, just copy the (possibly compressed)
525 // tag value.
526 bool compressed = cur->read_tag(true);
527 out->add(key, cur->current_tag, compressed);
528 if (cur->next()) {
529 pq.push(cur);
530 } else {
531 delete cur;
532 }
533 continue;
534 }
535
536 // Merge tag values with the same key:
537 string tag;
538 if (key[0] != 'W') {
539 // We just want the union of words, so copy over the first instance
540 // and skip any identical ones.
541 priority_queue<PrefixCompressedStringItor *,
542 vector<PrefixCompressedStringItor *>,
543 PrefixCompressedStringItorGt> pqtag;
544 // Stick all the MergeCursor pointers in a vector because their
545 // current_tag members must remain valid while we're merging their
546 // tags, but we need to call next() on them all afterwards.
547 vector<MergeCursor *> vec;
548 vec.reserve(pq.size());
549
550 while (true) {
551 cur->read_tag();
552 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
553 vec.push_back(cur);
554 if (pq.empty() || pq.top()->current_key != key) break;
555 cur = pq.top();
556 pq.pop();
557 }
558
559 PrefixCompressedStringWriter wr(tag);
560 string lastword;
561 while (!pqtag.empty()) {
562 PrefixCompressedStringItor * it = pqtag.top();
563 pqtag.pop();
564 string word = **it;
565 if (word != lastword) {
566 lastword = word;
567 wr.append(lastword);
568 }
569 ++*it;
570 if (!it->at_end()) {
571 pqtag.push(it);
572 } else {
573 delete it;
574 }
575 }
576
577 vector<MergeCursor *>::const_iterator i;
578 for (i = vec.begin(); i != vec.end(); ++i) {
579 cur = *i;
580 if (cur->next()) {
581 pq.push(cur);
582 } else {
583 delete cur;
584 }
585 }
586 } else {
587 // We want to sum the frequencies from tags for the same key.
588 Xapian::termcount tot_freq = 0;
589 while (true) {
590 cur->read_tag();
591 Xapian::termcount freq;
592 const char * p = cur->current_tag.data();
593 const char * end = p + cur->current_tag.size();
594 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
595 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
596 }
597 tot_freq += freq;
598 if (cur->next()) {
599 pq.push(cur);
600 } else {
601 delete cur;
602 }
603 if (pq.empty() || pq.top()->current_key != key) break;
604 cur = pq.top();
605 pq.pop();
606 }
607 tag.resize(0);
608 pack_uint_last(tag, tot_freq);
609 }
610 out->add(key, tag);
611 }
612 }
613
614 static void
merge_synonyms(ChertTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev)615 merge_synonyms(ChertTable * out,
616 vector<string>::const_iterator b,
617 vector<string>::const_iterator e,
618 vector<chert_revision_number_t>::const_iterator rev)
619 {
620 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
621 for ( ; b != e; ++b, ++rev) {
622 ChertTable *in = new ChertTable("synonym", *b, true, DONT_COMPRESS, true);
623 if (!in->open(*rev)) {
624 failed_to_open_at_rev(*b, *rev);
625 }
626 if (!in->empty()) {
627 // The MergeCursor takes ownership of ChertTable in and is
628 // responsible for deleting it.
629 pq.push(new MergeCursor(in));
630 } else {
631 delete in;
632 }
633 }
634
635 while (!pq.empty()) {
636 MergeCursor * cur = pq.top();
637 pq.pop();
638
639 string key = cur->current_key;
640 if (pq.empty() || pq.top()->current_key > key) {
641 // No need to merge the tags, just copy the (possibly compressed)
642 // tag value.
643 bool compressed = cur->read_tag(true);
644 out->add(key, cur->current_tag, compressed);
645 if (cur->next()) {
646 pq.push(cur);
647 } else {
648 delete cur;
649 }
650 continue;
651 }
652
653 // Merge tag values with the same key:
654 string tag;
655
656 // We just want the union of words, so copy over the first instance
657 // and skip any identical ones.
658 priority_queue<ByteLengthPrefixedStringItor *,
659 vector<ByteLengthPrefixedStringItor *>,
660 ByteLengthPrefixedStringItorGt> pqtag;
661 vector<MergeCursor *> vec;
662
663 while (true) {
664 cur->read_tag();
665 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
666 vec.push_back(cur);
667 if (pq.empty() || pq.top()->current_key != key) break;
668 cur = pq.top();
669 pq.pop();
670 }
671
672 string lastword;
673 while (!pqtag.empty()) {
674 ByteLengthPrefixedStringItor * it = pqtag.top();
675 pqtag.pop();
676 if (**it != lastword) {
677 lastword = **it;
678 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
679 tag += lastword;
680 }
681 ++*it;
682 if (!it->at_end()) {
683 pqtag.push(it);
684 } else {
685 delete it;
686 }
687 }
688
689 vector<MergeCursor *>::const_iterator i;
690 for (i = vec.begin(); i != vec.end(); ++i) {
691 cur = *i;
692 if (cur->next()) {
693 pq.push(cur);
694 } else {
695 delete cur;
696 }
697 }
698
699 out->add(key, tag);
700 }
701 }
702
703 static void
multimerge_postlists(Xapian::Compactor & compactor,ChertTable * out,const char * tmpdir,Xapian::docid last_docid,vector<string> tmp,vector<chert_revision_number_t> revs,vector<Xapian::docid> off)704 multimerge_postlists(Xapian::Compactor & compactor,
705 ChertTable * out, const char * tmpdir,
706 Xapian::docid last_docid,
707 vector<string> tmp, vector<chert_revision_number_t> revs,
708 vector<Xapian::docid> off)
709 {
710 unsigned int c = 0;
711 while (tmp.size() > 3) {
712 vector<string> tmpout;
713 tmpout.reserve(tmp.size() / 2);
714 vector<Xapian::docid> newoff;
715 newoff.resize(tmp.size() / 2);
716 vector<chert_revision_number_t> newrevs;
717 newrevs.reserve(tmp.size() / 2);
718 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
719 j = i + 2;
720 if (j == tmp.size() - 1) ++j;
721
722 string dest = tmpdir;
723 char buf[64];
724 sprintf(buf, "/tmp%u_%u.", c, i / 2);
725 dest += buf;
726
727 // Don't compress temporary tables, even if the final table would
728 // be.
729 ChertTable tmptab("postlist", dest, false);
730 // Use maximum blocksize for temporary tables.
731 tmptab.create_and_open(65536);
732
733 merge_postlists(compactor, &tmptab, off.begin() + i,
734 tmp.begin() + i, tmp.begin() + j,
735 revs.begin() + i, last_docid);
736 if (c > 0) {
737 for (unsigned int k = i; k < j; ++k) {
738 unlink((tmp[k] + "DB").c_str());
739 unlink((tmp[k] + "baseA").c_str());
740 unlink((tmp[k] + "baseB").c_str());
741 }
742 }
743 tmpout.push_back(dest);
744 tmptab.flush_db();
745 tmptab.commit(1);
746 newrevs.push_back(1);
747 }
748 swap(tmp, tmpout);
749 swap(off, newoff);
750 swap(revs, newrevs);
751 ++c;
752 }
753 merge_postlists(compactor,
754 out, off.begin(), tmp.begin(), tmp.end(), revs.begin(),
755 last_docid);
756 if (c > 0) {
757 for (size_t k = 0; k < tmp.size(); ++k) {
758 unlink((tmp[k] + "DB").c_str());
759 unlink((tmp[k] + "baseA").c_str());
760 unlink((tmp[k] + "baseB").c_str());
761 }
762 }
763 }
764
765 static void
merge_docid_keyed(const char * tablename,ChertTable * out,const vector<string> & inputs,const vector<chert_revision_number_t> & revs,const vector<Xapian::docid> & offset,bool lazy)766 merge_docid_keyed(const char * tablename,
767 ChertTable *out, const vector<string> & inputs,
768 const vector<chert_revision_number_t> & revs,
769 const vector<Xapian::docid> & offset, bool lazy)
770 {
771 for (size_t i = 0; i < inputs.size(); ++i) {
772 Xapian::docid off = offset[i];
773
774 ChertTable in(tablename, inputs[i], true, DONT_COMPRESS, lazy);
775 if (!in.open(revs[i])) {
776 failed_to_open_at_rev(inputs[i], revs[i]);
777 }
778 if (in.empty()) continue;
779
780 ChertCursor cur(&in);
781 cur.find_entry(string());
782
783 string key;
784 while (cur.next()) {
785 // Adjust the key if this isn't the first database.
786 if (off) {
787 Xapian::docid did;
788 const char * d = cur.current_key.data();
789 const char * e = d + cur.current_key.size();
790 if (!unpack_uint_preserving_sort(&d, e, &did)) {
791 string msg = "Bad key in ";
792 msg += inputs[i];
793 throw Xapian::DatabaseCorruptError(msg);
794 }
795 did += off;
796 key.resize(0);
797 pack_uint_preserving_sort(key, did);
798 if (d != e) {
799 // Copy over the termname for the position table.
800 key.append(d, e - d);
801 }
802 } else {
803 key = cur.current_key;
804 }
805 bool compressed = cur.read_tag(true);
806 out->add(key, cur.current_tag, compressed);
807 }
808 }
809 }
810
811 }
812
813 using namespace ChertCompact;
814
815 void
compact_chert(Xapian::Compactor & compactor,const char * destdir,const vector<string> & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,bool multipass,Xapian::docid last_docid)816 compact_chert(Xapian::Compactor & compactor,
817 const char * destdir, const vector<string> & sources,
818 const vector<Xapian::docid> & offset, size_t block_size,
819 Xapian::Compactor::compaction_level compaction, bool multipass,
820 Xapian::docid last_docid) {
821 // Get the revisions of each database to use to ensure we don't read tables
822 // at different revisions from any of them.
823 vector<chert_revision_number_t> revs;
824 revs.reserve(sources.size());
825 for (vector<string>::const_iterator i = sources.begin();
826 i != sources.end(); ++i) {
827 ChertDatabase db(*i);
828 revs.push_back(db.get_revision_number());
829 }
830
831 enum table_type {
832 POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
833 };
834 struct table_list {
835 // The "base name" of the table.
836 const char * name;
837 // The type.
838 table_type type;
839 // zlib compression strategy to use on tags.
840 int compress_strategy;
841 // Create tables after position lazily.
842 bool lazy;
843 };
844
845 static const table_list tables[] = {
846 // name type compress_strategy lazy
847 { "postlist", POSTLIST, DONT_COMPRESS, false },
848 { "record", RECORD, Z_DEFAULT_STRATEGY, false },
849 { "termlist", TERMLIST, Z_DEFAULT_STRATEGY, false },
850 { "position", POSITION, DONT_COMPRESS, true },
851 { "spelling", SPELLING, Z_DEFAULT_STRATEGY, true },
852 { "synonym", SYNONYM, Z_DEFAULT_STRATEGY, true }
853 };
854 const table_list * tables_end = tables +
855 (sizeof(tables) / sizeof(tables[0]));
856
857 for (const table_list * t = tables; t < tables_end; ++t) {
858 // The postlist table requires an N-way merge, adjusting the
859 // headers of various blocks. The spelling and synonym tables also
860 // need special handling. The other tables have keys sorted in
861 // docid order, so we can merge them by simply copying all the keys
862 // from each source table in turn.
863 compactor.set_status(t->name, string());
864
865 string dest = destdir;
866 dest += '/';
867 dest += t->name;
868 dest += '.';
869
870 bool output_will_exist = !t->lazy;
871
872 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
873 // on certain systems).
874 bool bad_stat = false;
875
876 off_t in_size = 0;
877
878 vector<string> inputs;
879 inputs.reserve(sources.size());
880 size_t inputs_present = 0;
881 for (vector<string>::const_iterator src = sources.begin();
882 src != sources.end(); ++src) {
883 string s(*src);
884 s += t->name;
885 s += '.';
886
887 struct stat sb;
888 if (stat(s + "DB", &sb) == 0) {
889 in_size += sb.st_size / 1024;
890 output_will_exist = true;
891 ++inputs_present;
892 } else if (errno != ENOENT) {
893 // We get ENOENT for an optional table.
894 bad_stat = true;
895 output_will_exist = true;
896 ++inputs_present;
897 }
898 inputs.push_back(s);
899 }
900
901 // If any inputs lack a termlist table, suppress it in the output.
902 if (t->type == TERMLIST && inputs_present != sources.size()) {
903 if (inputs_present != 0) {
904 string m = str(inputs_present);
905 m += " of ";
906 m += str(sources.size());
907 m += " inputs present, so suppressing output";
908 compactor.set_status(t->name, m);
909 continue;
910 }
911 output_will_exist = false;
912 }
913
914 if (!output_will_exist) {
915 compactor.set_status(t->name, "doesn't exist");
916 continue;
917 }
918
919 ChertTable out(t->name, dest, false, t->compress_strategy, t->lazy);
920 if (!t->lazy) {
921 out.create_and_open(block_size);
922 } else {
923 out.erase();
924 out.set_block_size(block_size);
925 }
926
927 out.set_full_compaction(compaction != compactor.STANDARD);
928 if (compaction == compactor.FULLER) out.set_max_item_size(1);
929
930 switch (t->type) {
931 case POSTLIST:
932 if (multipass && inputs.size() > 3) {
933 multimerge_postlists(compactor, &out, destdir, last_docid,
934 inputs, revs, offset);
935 } else {
936 merge_postlists(compactor, &out, offset.begin(),
937 inputs.begin(), inputs.end(),
938 revs.begin(), last_docid);
939 }
940 break;
941 case SPELLING:
942 merge_spellings(&out, inputs.begin(), inputs.end(),
943 revs.begin());
944 break;
945 case SYNONYM:
946 merge_synonyms(&out, inputs.begin(), inputs.end(),
947 revs.begin());
948 break;
949 default:
950 // Position, Record, Termlist
951 merge_docid_keyed(t->name, &out, inputs, revs, offset, t->lazy);
952 break;
953 }
954
955 // Commit as revision 1.
956 out.flush_db();
957 out.commit(1);
958
959 off_t out_size = 0;
960 if (!bad_stat) {
961 struct stat sb;
962 if (stat(dest + "DB", &sb) == 0) {
963 out_size = sb.st_size / 1024;
964 } else {
965 bad_stat = (errno != ENOENT);
966 }
967 }
968 if (bad_stat) {
969 compactor.set_status(t->name, "Done (couldn't stat all the DB files)");
970 } else {
971 string status;
972 if (out_size == in_size) {
973 status = "Size unchanged (";
974 } else {
975 off_t delta;
976 if (out_size < in_size) {
977 delta = in_size - out_size;
978 status = "Reduced by ";
979 } else {
980 delta = out_size - in_size;
981 status = "INCREASED by ";
982 }
983 if (in_size) {
984 status += str(100 * delta / in_size);
985 status += "% ";
986 }
987 status += str(delta);
988 status += "K (";
989 status += str(in_size);
990 status += "K -> ";
991 }
992 status += str(out_size);
993 status += "K)";
994 compactor.set_status(t->name, status);
995 }
996 }
997 }
998