1 /** @file chert_compact.cc
2  * @brief Compact a chert database, or merge and compact several.
3  */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2013,2015 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include <xapian/compactor.h>
25 
26 #include <algorithm>
27 #include <queue>
28 
29 #include <cstdio>
30 
31 #include "safeerrno.h"
32 #include <sys/types.h>
33 #include "safesysstat.h"
34 #include "safeunistd.h"
35 
36 #include "chert_table.h"
37 #include "chert_compact.h"
38 #include "chert_cursor.h"
39 #include "chert_database.h"
40 #include "internaltypes.h"
41 #include "noreturn.h"
42 #include "pack.h"
43 #include "utils.h"
44 #include "valuestats.h"
45 
46 #include "../byte_length_strings.h"
47 #include "../prefix_compressed_strings.h"
48 #include <xapian.h>
49 
50 using namespace std;
51 
52 XAPIAN_NORETURN(
53 static void failed_to_open_at_rev(string, chert_revision_number_t));
54 static void
failed_to_open_at_rev(string m,chert_revision_number_t rev)55 failed_to_open_at_rev(string m, chert_revision_number_t rev)
56 {
57     m += ": Couldn't open at revision ";
58     m += str(rev);
59     throw Xapian::DatabaseError(m);
60 }
61 
62 // Put all the helpers in a namespace to avoid symbols colliding with those of
63 // the same name in xapian-compact-flint.cc.
64 namespace ChertCompact {
65 
66 static inline bool
is_metainfo_key(const string & key)67 is_metainfo_key(const string & key)
68 {
69     return key.size() == 1 && key[0] == '\0';
70 }
71 
72 static inline bool
is_user_metadata_key(const string & key)73 is_user_metadata_key(const string & key)
74 {
75     return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
76 }
77 
78 static inline bool
is_valuestats_key(const string & key)79 is_valuestats_key(const string & key)
80 {
81     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
82 }
83 
84 static inline bool
is_valuechunk_key(const string & key)85 is_valuechunk_key(const string & key)
86 {
87     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
88 }
89 
90 static inline bool
is_doclenchunk_key(const string & key)91 is_doclenchunk_key(const string & key)
92 {
93     return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
94 }
95 
96 class PostlistCursor : private ChertCursor {
97     Xapian::docid offset;
98 
99   public:
100     string key, tag;
101     Xapian::docid firstdid;
102     Xapian::termcount tf, cf;
103 
PostlistCursor(ChertTable * in,Xapian::docid offset_)104     PostlistCursor(ChertTable *in, Xapian::docid offset_)
105 	: ChertCursor(in), offset(offset_), firstdid(0)
106     {
107 	find_entry(string());
108 	next();
109     }
110 
~PostlistCursor()111     ~PostlistCursor()
112     {
113 	delete ChertCursor::get_table();
114     }
115 
next()116     bool next() {
117 	if (!ChertCursor::next()) return false;
118 	// We put all chunks into the non-initial chunk form here, then fix up
119 	// the first chunk for each term in the merged database as we merge.
120 	read_tag();
121 	key = current_key;
122 	tag = current_tag;
123 	tf = cf = 0;
124 	if (is_metainfo_key(key)) return true;
125 	if (is_user_metadata_key(key)) return true;
126 	if (is_valuestats_key(key)) return true;
127 	if (is_valuechunk_key(key)) {
128 	    const char * p = key.data();
129 	    const char * end = p + key.length();
130 	    p += 2;
131 	    Xapian::valueno slot;
132 	    if (!unpack_uint(&p, end, &slot))
133 		throw Xapian::DatabaseCorruptError("bad value key");
134 	    Xapian::docid did;
135 	    if (!unpack_uint_preserving_sort(&p, end, &did))
136 		throw Xapian::DatabaseCorruptError("bad value key");
137 	    did += offset;
138 
139 	    key.assign("\0\xd8", 2);
140 	    pack_uint(key, slot);
141 	    pack_uint_preserving_sort(key, did);
142 	    return true;
143 	}
144 
145 	// Adjust key if this is *NOT* an initial chunk.
146 	// key is: pack_string_preserving_sort(key, tname)
147 	// plus optionally: pack_uint_preserving_sort(key, did)
148 	const char * d = key.data();
149 	const char * e = d + key.size();
150 	if (is_doclenchunk_key(key)) {
151 	    d += 2;
152 	} else {
153 	    string tname;
154 	    if (!unpack_string_preserving_sort(&d, e, tname))
155 		throw Xapian::DatabaseCorruptError("Bad postlist key");
156 	}
157 
158 	if (d == e) {
159 	    // This is an initial chunk for a term, so adjust tag header.
160 	    d = tag.data();
161 	    e = d + tag.size();
162 	    if (!unpack_uint(&d, e, &tf) ||
163 		!unpack_uint(&d, e, &cf) ||
164 		!unpack_uint(&d, e, &firstdid)) {
165 		throw Xapian::DatabaseCorruptError("Bad postlist key");
166 	    }
167 	    ++firstdid;
168 	    tag.erase(0, d - tag.data());
169 	} else {
170 	    // Not an initial chunk, so adjust key.
171 	    size_t tmp = d - key.data();
172 	    if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
173 		throw Xapian::DatabaseCorruptError("Bad postlist key");
174 	    if (is_doclenchunk_key(key)) {
175 		key.erase(tmp);
176 	    } else {
177 		key.erase(tmp - 1);
178 	    }
179 	}
180 	firstdid += offset;
181 	return true;
182     }
183 };
184 
185 class PostlistCursorGt {
186   public:
187     /** Return true if and only if a's key is strictly greater than b's key.
188      */
operator ()(const PostlistCursor * a,const PostlistCursor * b)189     bool operator()(const PostlistCursor *a, const PostlistCursor *b) {
190 	if (a->key > b->key) return true;
191 	if (a->key != b->key) return false;
192 	return (a->firstdid > b->firstdid);
193     }
194 };
195 
196 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)197 encode_valuestats(Xapian::doccount freq,
198 		  const string & lbound, const string & ubound)
199 {
200     string value;
201     pack_uint(value, freq);
202     pack_string(value, lbound);
203     // We don't store or count empty values, so neither of the bounds
204     // can be empty.  So we can safely store an empty upper bound when
205     // the bounds are equal.
206     if (lbound != ubound) value += ubound;
207     return value;
208 }
209 
210 static void
merge_postlists(Xapian::Compactor & compactor,ChertTable * out,vector<Xapian::docid>::const_iterator offset,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev,Xapian::docid last_docid)211 merge_postlists(Xapian::Compactor & compactor,
212 		ChertTable * out, vector<Xapian::docid>::const_iterator offset,
213 		vector<string>::const_iterator b,
214 		vector<string>::const_iterator e,
215 		vector<chert_revision_number_t>::const_iterator rev,
216 		Xapian::docid last_docid)
217 {
218     totlen_t tot_totlen = 0;
219     Xapian::termcount doclen_lbound = static_cast<Xapian::termcount>(-1);
220     Xapian::termcount wdf_ubound = 0;
221     Xapian::termcount doclen_ubound = 0;
222     priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
223     for ( ; b != e; ++b, ++offset, ++rev) {
224 	ChertTable *in = new ChertTable("postlist", *b, true);
225 	if (!in->open(*rev)) {
226 	    failed_to_open_at_rev(*b, *rev);
227 	}
228 	if (in->empty()) {
229 	    // Skip empty tables.
230 	    delete in;
231 	    continue;
232 	}
233 
234 	// PostlistCursor takes ownership of ChertTable in and is
235 	// responsible for deleting it.
236 	PostlistCursor * cur = new PostlistCursor(in, *offset);
237 	// Merge the METAINFO tags from each database into one.
238 	// They have a key consisting of a single zero byte.
239 	// They may be absent, if the database contains no documents.  If it
240 	// has user metadata we'll still get here.
241 	if (is_metainfo_key(cur->key)) {
242 	    const char * data = cur->tag.data();
243 	    const char * end = data + cur->tag.size();
244 	    Xapian::docid dummy_did = 0;
245 	    if (!unpack_uint(&data, end, &dummy_did)) {
246 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
247 	    }
248 
249 	    Xapian::termcount doclen_lbound_tmp;
250 	    if (!unpack_uint(&data, end, &doclen_lbound_tmp)) {
251 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
252 	    }
253 	    doclen_lbound = min(doclen_lbound, doclen_lbound_tmp);
254 
255 	    Xapian::termcount wdf_ubound_tmp;
256 	    if (!unpack_uint(&data, end, &wdf_ubound_tmp)) {
257 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
258 	    }
259 	    wdf_ubound = max(wdf_ubound, wdf_ubound_tmp);
260 
261 	    Xapian::termcount doclen_ubound_tmp;
262 	    if (!unpack_uint(&data, end, &doclen_ubound_tmp)) {
263 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
264 	    }
265 	    doclen_ubound_tmp += wdf_ubound_tmp;
266 	    doclen_ubound = max(doclen_ubound, doclen_ubound_tmp);
267 
268 	    totlen_t totlen = 0;
269 	    if (!unpack_uint_last(&data, end, &totlen)) {
270 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
271 	    }
272 	    tot_totlen += totlen;
273 	    if (tot_totlen < totlen) {
274 		throw "totlen wrapped!";
275 	    }
276 	    if (cur->next()) {
277 		pq.push(cur);
278 	    } else {
279 		delete cur;
280 	    }
281 	} else {
282 	    pq.push(cur);
283 	}
284     }
285 
286     // Don't write the metainfo key for a totally empty database.
287     if (last_docid) {
288 	if (doclen_lbound > doclen_ubound)
289 	    doclen_lbound = doclen_ubound;
290 	string tag;
291 	pack_uint(tag, last_docid);
292 	pack_uint(tag, doclen_lbound);
293 	pack_uint(tag, wdf_ubound);
294 	pack_uint(tag, doclen_ubound - wdf_ubound);
295 	pack_uint_last(tag, tot_totlen);
296 	out->add(string(1, '\0'), tag);
297     }
298 
299     string last_key;
300     {
301 	// Merge user metadata.
302 	vector<string> tags;
303 	while (!pq.empty()) {
304 	    PostlistCursor * cur = pq.top();
305 	    const string& key = cur->key;
306 	    if (!is_user_metadata_key(key)) break;
307 
308 	    if (key != last_key) {
309 		if (tags.size() > 1) {
310 		    Assert(!last_key.empty());
311 		    // FIXME: It would be better to merge all duplicates for a
312 		    // key in one call, but currently we don't in multipass
313 		    // mode.
314 		    out->add(last_key,
315 			     compactor.resolve_duplicate_metadata(last_key,
316 								  tags.size(),
317 								  &tags[0]));
318 		} else if (tags.size() == 1) {
319 		    Assert(!last_key.empty());
320 		    out->add(last_key, tags[0]);
321 		}
322 		tags.resize(0);
323 		last_key = key;
324 	    }
325 	    tags.push_back(cur->tag);
326 
327 	    pq.pop();
328 	    if (cur->next()) {
329 		pq.push(cur);
330 	    } else {
331 		delete cur;
332 	    }
333 	}
334 	if (tags.size() > 1) {
335 	    Assert(!last_key.empty());
336 	    out->add(last_key,
337 		     compactor.resolve_duplicate_metadata(last_key,
338 							  tags.size(),
339 							  &tags[0]));
340 	} else if (tags.size() == 1) {
341 	    Assert(!last_key.empty());
342 	    out->add(last_key, tags[0]);
343 	}
344     }
345 
346     {
347 	// Merge valuestats.
348 	Xapian::doccount freq = 0;
349 	string lbound, ubound;
350 
351 	while (!pq.empty()) {
352 	    PostlistCursor * cur = pq.top();
353 	    const string& key = cur->key;
354 	    if (!is_valuestats_key(key)) break;
355 	    if (key != last_key) {
356 		// For the first valuestats key, last_key will be the previous
357 		// key we wrote, which we don't want to overwrite.  This is the
358 		// only time that freq will be 0, so check that.
359 		if (freq) {
360 		    out->add(last_key, encode_valuestats(freq, lbound, ubound));
361 		    freq = 0;
362 		}
363 		last_key = key;
364 	    }
365 
366 	    const string & tag = cur->tag;
367 
368 	    const char * pos = tag.data();
369 	    const char * end = pos + tag.size();
370 
371 	    Xapian::doccount f;
372 	    string l, u;
373 	    if (!unpack_uint(&pos, end, &f)) {
374 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
375 		throw Xapian::RangeError("Frequency statistic in value table is too large");
376 	    }
377 	    if (!unpack_string(&pos, end, l)) {
378 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
379 		throw Xapian::RangeError("Lower bound in value table is too large");
380 	    }
381 	    size_t len = end - pos;
382 	    if (len == 0) {
383 		u = l;
384 	    } else {
385 		u.assign(pos, len);
386 	    }
387 	    if (freq == 0) {
388 		freq = f;
389 		lbound = l;
390 		ubound = u;
391 	    } else {
392 		freq += f;
393 		if (l < lbound) lbound = l;
394 		if (u > ubound) ubound = u;
395 	    }
396 
397 	    pq.pop();
398 	    if (cur->next()) {
399 		pq.push(cur);
400 	    } else {
401 		delete cur;
402 	    }
403 	}
404 
405 	if (freq) {
406 	    out->add(last_key, encode_valuestats(freq, lbound, ubound));
407 	}
408     }
409 
410     // Merge valuestream chunks.
411     while (!pq.empty()) {
412 	PostlistCursor * cur = pq.top();
413 	const string & key = cur->key;
414 	if (!is_valuechunk_key(key)) break;
415 	Assert(!is_user_metadata_key(key));
416 	out->add(key, cur->tag);
417 	pq.pop();
418 	if (cur->next()) {
419 	    pq.push(cur);
420 	} else {
421 	    delete cur;
422 	}
423     }
424 
425     Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
426     vector<pair<Xapian::docid, string> > tags;
427     while (true) {
428 	PostlistCursor * cur = NULL;
429 	if (!pq.empty()) {
430 	    cur = pq.top();
431 	    pq.pop();
432 	}
433 	Assert(cur == NULL || !is_user_metadata_key(cur->key));
434 	if (cur == NULL || cur->key != last_key) {
435 	    if (!tags.empty()) {
436 		string first_tag;
437 		pack_uint(first_tag, tf);
438 		pack_uint(first_tag, cf);
439 		pack_uint(first_tag, tags[0].first - 1);
440 		string tag = tags[0].second;
441 		tag[0] = (tags.size() == 1) ? '1' : '0';
442 		first_tag += tag;
443 		out->add(last_key, first_tag);
444 
445 		string term;
446 		if (!is_doclenchunk_key(last_key)) {
447 		    const char * p = last_key.data();
448 		    const char * end = p + last_key.size();
449 		    if (!unpack_string_preserving_sort(&p, end, term) || p != end)
450 			throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
451 		}
452 
453 		vector<pair<Xapian::docid, string> >::const_iterator i;
454 		i = tags.begin();
455 		while (++i != tags.end()) {
456 		    tag = i->second;
457 		    tag[0] = (i + 1 == tags.end()) ? '1' : '0';
458 		    out->add(pack_chert_postlist_key(term, i->first), tag);
459 		}
460 	    }
461 	    tags.clear();
462 	    if (cur == NULL) break;
463 	    tf = cf = 0;
464 	    last_key = cur->key;
465 	}
466 	tf += cur->tf;
467 	cf += cur->cf;
468 	tags.push_back(make_pair(cur->firstdid, cur->tag));
469 	if (cur->next()) {
470 	    pq.push(cur);
471 	} else {
472 	    delete cur;
473 	}
474     }
475 }
476 
477 struct MergeCursor : public ChertCursor {
MergeCursorChertCompact::MergeCursor478     MergeCursor(ChertTable *in) : ChertCursor(in) {
479 	find_entry(string());
480 	next();
481     }
482 
~MergeCursorChertCompact::MergeCursor483     ~MergeCursor() {
484 	delete ChertCursor::get_table();
485     }
486 };
487 
488 struct CursorGt {
489     /// Return true if and only if a's key is strictly greater than b's key.
operator ()ChertCompact::CursorGt490     bool operator()(const ChertCursor *a, const ChertCursor *b) {
491 	if (b->after_end()) return false;
492 	if (a->after_end()) return true;
493 	return (a->current_key > b->current_key);
494     }
495 };
496 
497 static void
merge_spellings(ChertTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev)498 merge_spellings(ChertTable * out,
499 		vector<string>::const_iterator b,
500 		vector<string>::const_iterator e,
501 		vector<chert_revision_number_t>::const_iterator rev)
502 {
503     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
504     for ( ; b != e; ++b, ++rev) {
505 	ChertTable *in = new ChertTable("spelling", *b, true, DONT_COMPRESS, true);
506 	if (!in->open(*rev)) {
507 	    failed_to_open_at_rev(*b, *rev);
508 	}
509 	if (!in->empty()) {
510 	    // The MergeCursor takes ownership of ChertTable in and is
511 	    // responsible for deleting it.
512 	    pq.push(new MergeCursor(in));
513 	} else {
514 	    delete in;
515 	}
516     }
517 
518     while (!pq.empty()) {
519 	MergeCursor * cur = pq.top();
520 	pq.pop();
521 
522 	string key = cur->current_key;
523 	if (pq.empty() || pq.top()->current_key > key) {
524 	    // No need to merge the tags, just copy the (possibly compressed)
525 	    // tag value.
526 	    bool compressed = cur->read_tag(true);
527 	    out->add(key, cur->current_tag, compressed);
528 	    if (cur->next()) {
529 		pq.push(cur);
530 	    } else {
531 		delete cur;
532 	    }
533 	    continue;
534 	}
535 
536 	// Merge tag values with the same key:
537 	string tag;
538 	if (key[0] != 'W') {
539 	    // We just want the union of words, so copy over the first instance
540 	    // and skip any identical ones.
541 	    priority_queue<PrefixCompressedStringItor *,
542 			   vector<PrefixCompressedStringItor *>,
543 			   PrefixCompressedStringItorGt> pqtag;
544 	    // Stick all the MergeCursor pointers in a vector because their
545 	    // current_tag members must remain valid while we're merging their
546 	    // tags, but we need to call next() on them all afterwards.
547 	    vector<MergeCursor *> vec;
548 	    vec.reserve(pq.size());
549 
550 	    while (true) {
551 		cur->read_tag();
552 		pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
553 		vec.push_back(cur);
554 		if (pq.empty() || pq.top()->current_key != key) break;
555 		cur = pq.top();
556 		pq.pop();
557 	    }
558 
559 	    PrefixCompressedStringWriter wr(tag);
560 	    string lastword;
561 	    while (!pqtag.empty()) {
562 		PrefixCompressedStringItor * it = pqtag.top();
563 		pqtag.pop();
564 		string word = **it;
565 		if (word != lastword) {
566 		    lastword = word;
567 		    wr.append(lastword);
568 		}
569 		++*it;
570 		if (!it->at_end()) {
571 		    pqtag.push(it);
572 		} else {
573 		    delete it;
574 		}
575 	    }
576 
577 	    vector<MergeCursor *>::const_iterator i;
578 	    for (i = vec.begin(); i != vec.end(); ++i) {
579 		cur = *i;
580 		if (cur->next()) {
581 		    pq.push(cur);
582 		} else {
583 		    delete cur;
584 		}
585 	    }
586 	} else {
587 	    // We want to sum the frequencies from tags for the same key.
588 	    Xapian::termcount tot_freq = 0;
589 	    while (true) {
590 		cur->read_tag();
591 		Xapian::termcount freq;
592 		const char * p = cur->current_tag.data();
593 		const char * end = p + cur->current_tag.size();
594 		if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
595 		    throw Xapian::DatabaseCorruptError("Bad spelling word freq");
596 		}
597 		tot_freq += freq;
598 		if (cur->next()) {
599 		    pq.push(cur);
600 		} else {
601 		    delete cur;
602 		}
603 		if (pq.empty() || pq.top()->current_key != key) break;
604 		cur = pq.top();
605 		pq.pop();
606 	    }
607 	    tag.resize(0);
608 	    pack_uint_last(tag, tot_freq);
609 	}
610 	out->add(key, tag);
611     }
612 }
613 
614 static void
merge_synonyms(ChertTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<chert_revision_number_t>::const_iterator rev)615 merge_synonyms(ChertTable * out,
616 	       vector<string>::const_iterator b,
617 	       vector<string>::const_iterator e,
618 	       vector<chert_revision_number_t>::const_iterator rev)
619 {
620     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
621     for ( ; b != e; ++b, ++rev) {
622 	ChertTable *in = new ChertTable("synonym", *b, true, DONT_COMPRESS, true);
623 	if (!in->open(*rev)) {
624 	    failed_to_open_at_rev(*b, *rev);
625 	}
626 	if (!in->empty()) {
627 	    // The MergeCursor takes ownership of ChertTable in and is
628 	    // responsible for deleting it.
629 	    pq.push(new MergeCursor(in));
630 	} else {
631 	    delete in;
632 	}
633     }
634 
635     while (!pq.empty()) {
636 	MergeCursor * cur = pq.top();
637 	pq.pop();
638 
639 	string key = cur->current_key;
640 	if (pq.empty() || pq.top()->current_key > key) {
641 	    // No need to merge the tags, just copy the (possibly compressed)
642 	    // tag value.
643 	    bool compressed = cur->read_tag(true);
644 	    out->add(key, cur->current_tag, compressed);
645 	    if (cur->next()) {
646 		pq.push(cur);
647 	    } else {
648 		delete cur;
649 	    }
650 	    continue;
651 	}
652 
653 	// Merge tag values with the same key:
654 	string tag;
655 
656 	// We just want the union of words, so copy over the first instance
657 	// and skip any identical ones.
658 	priority_queue<ByteLengthPrefixedStringItor *,
659 		       vector<ByteLengthPrefixedStringItor *>,
660 		       ByteLengthPrefixedStringItorGt> pqtag;
661 	vector<MergeCursor *> vec;
662 
663 	while (true) {
664 	    cur->read_tag();
665 	    pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
666 	    vec.push_back(cur);
667 	    if (pq.empty() || pq.top()->current_key != key) break;
668 	    cur = pq.top();
669 	    pq.pop();
670 	}
671 
672 	string lastword;
673 	while (!pqtag.empty()) {
674 	    ByteLengthPrefixedStringItor * it = pqtag.top();
675 	    pqtag.pop();
676 	    if (**it != lastword) {
677 		lastword = **it;
678 		tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
679 		tag += lastword;
680 	    }
681 	    ++*it;
682 	    if (!it->at_end()) {
683 		pqtag.push(it);
684 	    } else {
685 		delete it;
686 	    }
687 	}
688 
689 	vector<MergeCursor *>::const_iterator i;
690 	for (i = vec.begin(); i != vec.end(); ++i) {
691 	    cur = *i;
692 	    if (cur->next()) {
693 		pq.push(cur);
694 	    } else {
695 		delete cur;
696 	    }
697 	}
698 
699 	out->add(key, tag);
700     }
701 }
702 
703 static void
multimerge_postlists(Xapian::Compactor & compactor,ChertTable * out,const char * tmpdir,Xapian::docid last_docid,vector<string> tmp,vector<chert_revision_number_t> revs,vector<Xapian::docid> off)704 multimerge_postlists(Xapian::Compactor & compactor,
705 		     ChertTable * out, const char * tmpdir,
706 		     Xapian::docid last_docid,
707 		     vector<string> tmp, vector<chert_revision_number_t> revs,
708 		     vector<Xapian::docid> off)
709 {
710     unsigned int c = 0;
711     while (tmp.size() > 3) {
712 	vector<string> tmpout;
713 	tmpout.reserve(tmp.size() / 2);
714 	vector<Xapian::docid> newoff;
715 	newoff.resize(tmp.size() / 2);
716 	vector<chert_revision_number_t> newrevs;
717 	newrevs.reserve(tmp.size() / 2);
718 	for (unsigned int i = 0, j; i < tmp.size(); i = j) {
719 	    j = i + 2;
720 	    if (j == tmp.size() - 1) ++j;
721 
722 	    string dest = tmpdir;
723 	    char buf[64];
724 	    sprintf(buf, "/tmp%u_%u.", c, i / 2);
725 	    dest += buf;
726 
727 	    // Don't compress temporary tables, even if the final table would
728 	    // be.
729 	    ChertTable tmptab("postlist", dest, false);
730 	    // Use maximum blocksize for temporary tables.
731 	    tmptab.create_and_open(65536);
732 
733 	    merge_postlists(compactor, &tmptab, off.begin() + i,
734 			    tmp.begin() + i, tmp.begin() + j,
735 			    revs.begin() + i, last_docid);
736 	    if (c > 0) {
737 		for (unsigned int k = i; k < j; ++k) {
738 		    unlink((tmp[k] + "DB").c_str());
739 		    unlink((tmp[k] + "baseA").c_str());
740 		    unlink((tmp[k] + "baseB").c_str());
741 		}
742 	    }
743 	    tmpout.push_back(dest);
744 	    tmptab.flush_db();
745 	    tmptab.commit(1);
746 	    newrevs.push_back(1);
747 	}
748 	swap(tmp, tmpout);
749 	swap(off, newoff);
750 	swap(revs, newrevs);
751 	++c;
752     }
753     merge_postlists(compactor,
754 		    out, off.begin(), tmp.begin(), tmp.end(), revs.begin(),
755 		    last_docid);
756     if (c > 0) {
757 	for (size_t k = 0; k < tmp.size(); ++k) {
758 	    unlink((tmp[k] + "DB").c_str());
759 	    unlink((tmp[k] + "baseA").c_str());
760 	    unlink((tmp[k] + "baseB").c_str());
761 	}
762     }
763 }
764 
765 static void
merge_docid_keyed(const char * tablename,ChertTable * out,const vector<string> & inputs,const vector<chert_revision_number_t> & revs,const vector<Xapian::docid> & offset,bool lazy)766 merge_docid_keyed(const char * tablename,
767 		  ChertTable *out, const vector<string> & inputs,
768 		  const vector<chert_revision_number_t> & revs,
769 		  const vector<Xapian::docid> & offset, bool lazy)
770 {
771     for (size_t i = 0; i < inputs.size(); ++i) {
772 	Xapian::docid off = offset[i];
773 
774 	ChertTable in(tablename, inputs[i], true, DONT_COMPRESS, lazy);
775 	if (!in.open(revs[i])) {
776 	    failed_to_open_at_rev(inputs[i], revs[i]);
777 	}
778 	if (in.empty()) continue;
779 
780 	ChertCursor cur(&in);
781 	cur.find_entry(string());
782 
783 	string key;
784 	while (cur.next()) {
785 	    // Adjust the key if this isn't the first database.
786 	    if (off) {
787 		Xapian::docid did;
788 		const char * d = cur.current_key.data();
789 		const char * e = d + cur.current_key.size();
790 		if (!unpack_uint_preserving_sort(&d, e, &did)) {
791 		    string msg = "Bad key in ";
792 		    msg += inputs[i];
793 		    throw Xapian::DatabaseCorruptError(msg);
794 		}
795 		did += off;
796 		key.resize(0);
797 		pack_uint_preserving_sort(key, did);
798 		if (d != e) {
799 		    // Copy over the termname for the position table.
800 		    key.append(d, e - d);
801 		}
802 	    } else {
803 		key = cur.current_key;
804 	    }
805 	    bool compressed = cur.read_tag(true);
806 	    out->add(key, cur.current_tag, compressed);
807 	}
808     }
809 }
810 
811 }
812 
813 using namespace ChertCompact;
814 
815 void
compact_chert(Xapian::Compactor & compactor,const char * destdir,const vector<string> & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,bool multipass,Xapian::docid last_docid)816 compact_chert(Xapian::Compactor & compactor,
817 	      const char * destdir, const vector<string> & sources,
818 	      const vector<Xapian::docid> & offset, size_t block_size,
819 	      Xapian::Compactor::compaction_level compaction, bool multipass,
820 	      Xapian::docid last_docid) {
821     // Get the revisions of each database to use to ensure we don't read tables
822     // at different revisions from any of them.
823     vector<chert_revision_number_t> revs;
824     revs.reserve(sources.size());
825     for (vector<string>::const_iterator i = sources.begin();
826 	 i != sources.end(); ++i) {
827 	ChertDatabase db(*i);
828 	revs.push_back(db.get_revision_number());
829     }
830 
831     enum table_type {
832 	POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
833     };
834     struct table_list {
835 	// The "base name" of the table.
836 	const char * name;
837 	// The type.
838 	table_type type;
839 	// zlib compression strategy to use on tags.
840 	int compress_strategy;
841 	// Create tables after position lazily.
842 	bool lazy;
843     };
844 
845     static const table_list tables[] = {
846 	// name		type		compress_strategy	lazy
847 	{ "postlist",	POSTLIST,	DONT_COMPRESS,		false },
848 	{ "record",	RECORD,		Z_DEFAULT_STRATEGY,	false },
849 	{ "termlist",	TERMLIST,	Z_DEFAULT_STRATEGY,	false },
850 	{ "position",	POSITION,	DONT_COMPRESS,		true },
851 	{ "spelling",	SPELLING,	Z_DEFAULT_STRATEGY,	true },
852 	{ "synonym",	SYNONYM,	Z_DEFAULT_STRATEGY,	true }
853     };
854     const table_list * tables_end = tables +
855 	(sizeof(tables) / sizeof(tables[0]));
856 
857     for (const table_list * t = tables; t < tables_end; ++t) {
858 	// The postlist table requires an N-way merge, adjusting the
859 	// headers of various blocks.  The spelling and synonym tables also
860 	// need special handling.  The other tables have keys sorted in
861 	// docid order, so we can merge them by simply copying all the keys
862 	// from each source table in turn.
863 	compactor.set_status(t->name, string());
864 
865 	string dest = destdir;
866 	dest += '/';
867 	dest += t->name;
868 	dest += '.';
869 
870 	bool output_will_exist = !t->lazy;
871 
872 	// Sometimes stat can fail for benign reasons (e.g. >= 2GB file
873 	// on certain systems).
874 	bool bad_stat = false;
875 
876 	off_t in_size = 0;
877 
878 	vector<string> inputs;
879 	inputs.reserve(sources.size());
880 	size_t inputs_present = 0;
881 	for (vector<string>::const_iterator src = sources.begin();
882 	     src != sources.end(); ++src) {
883 	    string s(*src);
884 	    s += t->name;
885 	    s += '.';
886 
887 	    struct stat sb;
888 	    if (stat(s + "DB", &sb) == 0) {
889 		in_size += sb.st_size / 1024;
890 		output_will_exist = true;
891 		++inputs_present;
892 	    } else if (errno != ENOENT) {
893 		// We get ENOENT for an optional table.
894 		bad_stat = true;
895 		output_will_exist = true;
896 		++inputs_present;
897 	    }
898 	    inputs.push_back(s);
899 	}
900 
901 	// If any inputs lack a termlist table, suppress it in the output.
902 	if (t->type == TERMLIST && inputs_present != sources.size()) {
903 	    if (inputs_present != 0) {
904 		string m = str(inputs_present);
905 		m += " of ";
906 		m += str(sources.size());
907 		m += " inputs present, so suppressing output";
908 		compactor.set_status(t->name, m);
909 		continue;
910 	    }
911 	    output_will_exist = false;
912 	}
913 
914 	if (!output_will_exist) {
915 	    compactor.set_status(t->name, "doesn't exist");
916 	    continue;
917 	}
918 
919 	ChertTable out(t->name, dest, false, t->compress_strategy, t->lazy);
920 	if (!t->lazy) {
921 	    out.create_and_open(block_size);
922 	} else {
923 	    out.erase();
924 	    out.set_block_size(block_size);
925 	}
926 
927 	out.set_full_compaction(compaction != compactor.STANDARD);
928 	if (compaction == compactor.FULLER) out.set_max_item_size(1);
929 
930 	switch (t->type) {
931 	    case POSTLIST:
932 		if (multipass && inputs.size() > 3) {
933 		    multimerge_postlists(compactor, &out, destdir, last_docid,
934 					 inputs, revs, offset);
935 		} else {
936 		    merge_postlists(compactor, &out, offset.begin(),
937 				    inputs.begin(), inputs.end(),
938 				    revs.begin(), last_docid);
939 		}
940 		break;
941 	    case SPELLING:
942 		merge_spellings(&out, inputs.begin(), inputs.end(),
943 				revs.begin());
944 		break;
945 	    case SYNONYM:
946 		merge_synonyms(&out, inputs.begin(), inputs.end(),
947 			       revs.begin());
948 		break;
949 	    default:
950 		// Position, Record, Termlist
951 		merge_docid_keyed(t->name, &out, inputs, revs, offset, t->lazy);
952 		break;
953 	}
954 
955 	// Commit as revision 1.
956 	out.flush_db();
957 	out.commit(1);
958 
959 	off_t out_size = 0;
960 	if (!bad_stat) {
961 	    struct stat sb;
962 	    if (stat(dest + "DB", &sb) == 0) {
963 		out_size = sb.st_size / 1024;
964 	    } else {
965 		bad_stat = (errno != ENOENT);
966 	    }
967 	}
968 	if (bad_stat) {
969 	    compactor.set_status(t->name, "Done (couldn't stat all the DB files)");
970 	} else {
971 	    string status;
972 	    if (out_size == in_size) {
973 		status = "Size unchanged (";
974 	    } else {
975 		off_t delta;
976 		if (out_size < in_size) {
977 		    delta = in_size - out_size;
978 		    status = "Reduced by ";
979 		} else {
980 		    delta = out_size - in_size;
981 		    status = "INCREASED by ";
982 		}
983 		if (in_size) {
984 		    status += str(100 * delta / in_size);
985 		    status += "% ";
986 		}
987 		status += str(delta);
988 		status += "K (";
989 		status += str(in_size);
990 		status += "K -> ";
991 	    }
992 	    status += str(out_size);
993 	    status += "K)";
994 	    compactor.set_status(t->name, status);
995 	}
996     }
997 }
998