1 /** @file chert_compact.cc
2  * @brief Compact a chert database, or merge and compact several.
3  */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2015 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28 
29 #include <algorithm>
30 #include <queue>
31 
32 #include <cerrno>
33 #include <cstdio>
34 
35 #include "safeunistd.h"
36 
37 #include "chert_table.h"
38 #include "chert_cursor.h"
39 #include "chert_database.h"
40 #include "filetests.h"
41 #include "internaltypes.h"
42 #include "pack.h"
43 #include "backends/valuestats.h"
44 
45 #include "../byte_length_strings.h"
46 #include "../prefix_compressed_strings.h"
47 
48 using namespace std;
49 
50 // Put all the helpers in a namespace to avoid symbols colliding with those of
51 // the same name in other flint-derived backends.
52 namespace ChertCompact {
53 
54 static inline bool
is_metainfo_key(const string & key)55 is_metainfo_key(const string & key)
56 {
57     return key.size() == 1 && key[0] == '\0';
58 }
59 
60 static inline bool
is_user_metadata_key(const string & key)61 is_user_metadata_key(const string & key)
62 {
63     return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
64 }
65 
66 static inline bool
is_valuestats_key(const string & key)67 is_valuestats_key(const string & key)
68 {
69     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
70 }
71 
72 static inline bool
is_valuechunk_key(const string & key)73 is_valuechunk_key(const string & key)
74 {
75     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
76 }
77 
78 static inline bool
is_doclenchunk_key(const string & key)79 is_doclenchunk_key(const string & key)
80 {
81     return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
82 }
83 
84 class PostlistCursor : private ChertCursor {
85     Xapian::docid offset;
86 
87   public:
88     string key, tag;
89     Xapian::docid firstdid;
90     Xapian::termcount tf, cf;
91 
PostlistCursor(ChertTable * in,Xapian::docid offset_)92     PostlistCursor(ChertTable *in, Xapian::docid offset_)
93 	: ChertCursor(in), offset(offset_), firstdid(0)
94     {
95 	find_entry(string());
96 	next();
97     }
98 
next()99     bool next() {
100 	if (!ChertCursor::next()) return false;
101 	// We put all chunks into the non-initial chunk form here, then fix up
102 	// the first chunk for each term in the merged database as we merge.
103 	read_tag();
104 	key = current_key;
105 	tag = current_tag;
106 	tf = cf = 0;
107 	if (is_metainfo_key(key)) return true;
108 	if (is_user_metadata_key(key)) return true;
109 	if (is_valuestats_key(key)) return true;
110 	if (is_valuechunk_key(key)) {
111 	    const char * p = key.data();
112 	    const char * end = p + key.length();
113 	    p += 2;
114 	    Xapian::valueno slot;
115 	    if (!unpack_uint(&p, end, &slot))
116 		throw Xapian::DatabaseCorruptError("bad value key");
117 	    Xapian::docid did;
118 	    if (!C_unpack_uint_preserving_sort(&p, end, &did))
119 		throw Xapian::DatabaseCorruptError("bad value key");
120 	    did += offset;
121 
122 	    key.assign("\0\xd8", 2);
123 	    pack_uint(key, slot);
124 	    C_pack_uint_preserving_sort(key, did);
125 	    return true;
126 	}
127 
128 	// Adjust key if this is *NOT* an initial chunk.
129 	// key is: pack_string_preserving_sort(key, tname)
130 	// plus optionally: C_pack_uint_preserving_sort(key, did)
131 	const char * d = key.data();
132 	const char * e = d + key.size();
133 	if (is_doclenchunk_key(key)) {
134 	    d += 2;
135 	} else {
136 	    string tname;
137 	    if (!unpack_string_preserving_sort(&d, e, tname))
138 		throw Xapian::DatabaseCorruptError("Bad postlist key");
139 	}
140 
141 	if (d == e) {
142 	    // This is an initial chunk for a term, so adjust tag header.
143 	    d = tag.data();
144 	    e = d + tag.size();
145 	    if (!unpack_uint(&d, e, &tf) ||
146 		!unpack_uint(&d, e, &cf) ||
147 		!unpack_uint(&d, e, &firstdid)) {
148 		throw Xapian::DatabaseCorruptError("Bad postlist key");
149 	    }
150 	    ++firstdid;
151 	    tag.erase(0, d - tag.data());
152 	} else {
153 	    // Not an initial chunk, so adjust key.
154 	    size_t tmp = d - key.data();
155 	    if (!C_unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
156 		throw Xapian::DatabaseCorruptError("Bad postlist key");
157 	    if (is_doclenchunk_key(key)) {
158 		key.erase(tmp);
159 	    } else {
160 		key.erase(tmp - 1);
161 	    }
162 	}
163 	firstdid += offset;
164 	return true;
165     }
166 };
167 
168 class PostlistCursorGt {
169   public:
170     /** Return true if and only if a's key is strictly greater than b's key.
171      */
operator ()(const PostlistCursor * a,const PostlistCursor * b) const172     bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
173 	if (a->key > b->key) return true;
174 	if (a->key != b->key) return false;
175 	return (a->firstdid > b->firstdid);
176     }
177 };
178 
179 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)180 encode_valuestats(Xapian::doccount freq,
181 		  const string & lbound, const string & ubound)
182 {
183     string value;
184     pack_uint(value, freq);
185     pack_string(value, lbound);
186     // We don't store or count empty values, so neither of the bounds
187     // can be empty.  So we can safely store an empty upper bound when
188     // the bounds are equal.
189     if (lbound != ubound) value += ubound;
190     return value;
191 }
192 
193 static void
merge_postlists(Xapian::Compactor * compactor,ChertTable * out,vector<Xapian::docid>::const_iterator offset,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e,Xapian::docid last_docid)194 merge_postlists(Xapian::Compactor * compactor,
195 		ChertTable * out, vector<Xapian::docid>::const_iterator offset,
196 		vector<ChertTable*>::const_iterator b,
197 		vector<ChertTable*>::const_iterator e,
198 		Xapian::docid last_docid)
199 {
200     Xapian::totallength tot_totlen = 0;
201     Xapian::termcount doclen_lbound = static_cast<Xapian::termcount>(-1);
202     Xapian::termcount wdf_ubound = 0;
203     Xapian::termcount doclen_ubound = 0;
204     priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
205     for ( ; b != e; ++b, ++offset) {
206 	ChertTable *in = *b;
207 	if (in->empty()) {
208 	    // Skip empty tables.
209 	    continue;
210 	}
211 
212 	PostlistCursor * cur = new PostlistCursor(in, *offset);
213 	// Merge the METAINFO tags from each database into one.
214 	// They have a key consisting of a single zero byte.
215 	// They may be absent, if the database contains no documents.  If it
216 	// has user metadata we'll still get here.
217 	if (is_metainfo_key(cur->key)) {
218 	    const char * data = cur->tag.data();
219 	    const char * end = data + cur->tag.size();
220 	    Xapian::docid dummy_did = 0;
221 	    if (!unpack_uint(&data, end, &dummy_did)) {
222 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
223 	    }
224 
225 	    Xapian::termcount doclen_lbound_tmp;
226 	    if (!unpack_uint(&data, end, &doclen_lbound_tmp)) {
227 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
228 	    }
229 	    doclen_lbound = min(doclen_lbound, doclen_lbound_tmp);
230 
231 	    Xapian::termcount wdf_ubound_tmp;
232 	    if (!unpack_uint(&data, end, &wdf_ubound_tmp)) {
233 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
234 	    }
235 	    wdf_ubound = max(wdf_ubound, wdf_ubound_tmp);
236 
237 	    Xapian::termcount doclen_ubound_tmp;
238 	    if (!unpack_uint(&data, end, &doclen_ubound_tmp)) {
239 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
240 	    }
241 	    doclen_ubound_tmp += wdf_ubound_tmp;
242 	    doclen_ubound = max(doclen_ubound, doclen_ubound_tmp);
243 
244 	    Xapian::totallength totlen = 0;
245 	    if (!unpack_uint_last(&data, end, &totlen)) {
246 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
247 	    }
248 	    tot_totlen += totlen;
249 	    if (tot_totlen < totlen) {
250 		throw "totlen wrapped!";
251 	    }
252 	    if (cur->next()) {
253 		pq.push(cur);
254 	    } else {
255 		delete cur;
256 	    }
257 	} else {
258 	    pq.push(cur);
259 	}
260     }
261 
262     // Don't write the metainfo key for a totally empty database.
263     if (last_docid) {
264 	if (doclen_lbound > doclen_ubound)
265 	    doclen_lbound = doclen_ubound;
266 	string tag;
267 	pack_uint(tag, last_docid);
268 	pack_uint(tag, doclen_lbound);
269 	pack_uint(tag, wdf_ubound);
270 	pack_uint(tag, doclen_ubound - wdf_ubound);
271 	pack_uint_last(tag, tot_totlen);
272 	out->add(string(1, '\0'), tag);
273     }
274 
275     string last_key;
276     {
277 	// Merge user metadata.
278 	vector<string> tags;
279 	while (!pq.empty()) {
280 	    PostlistCursor * cur = pq.top();
281 	    const string& key = cur->key;
282 	    if (!is_user_metadata_key(key)) break;
283 
284 	    if (key != last_key) {
285 		if (!tags.empty()) {
286 		    if (tags.size() > 1 && compactor) {
287 			Assert(!last_key.empty());
288 			// FIXME: It would be better to merge all duplicates
289 			// for a key in one call, but currently we don't in
290 			// multipass mode.
291 			const string & resolved_tag =
292 			    compactor->resolve_duplicate_metadata(last_key,
293 								  tags.size(),
294 								  &tags[0]);
295 			if (!resolved_tag.empty())
296 			    out->add(last_key, resolved_tag);
297 		    } else {
298 			Assert(!last_key.empty());
299 			out->add(last_key, tags[0]);
300 		    }
301 		    tags.resize(0);
302 		}
303 		last_key = key;
304 	    }
305 	    tags.push_back(cur->tag);
306 
307 	    pq.pop();
308 	    if (cur->next()) {
309 		pq.push(cur);
310 	    } else {
311 		delete cur;
312 	    }
313 	}
314 	if (!tags.empty()) {
315 	    if (tags.size() > 1 && compactor) {
316 		Assert(!last_key.empty());
317 		const string & resolved_tag =
318 		    compactor->resolve_duplicate_metadata(last_key,
319 							  tags.size(),
320 							  &tags[0]);
321 		if (!resolved_tag.empty())
322 		    out->add(last_key, resolved_tag);
323 	    } else {
324 		Assert(!last_key.empty());
325 		out->add(last_key, tags[0]);
326 	    }
327 	}
328     }
329 
330     {
331 	// Merge valuestats.
332 	Xapian::doccount freq = 0;
333 	string lbound, ubound;
334 
335 	while (!pq.empty()) {
336 	    PostlistCursor * cur = pq.top();
337 	    const string& key = cur->key;
338 	    if (!is_valuestats_key(key)) break;
339 	    if (key != last_key) {
340 		// For the first valuestats key, last_key will be the previous
341 		// key we wrote, which we don't want to overwrite.  This is the
342 		// only time that freq will be 0, so check that.
343 		if (freq) {
344 		    out->add(last_key, encode_valuestats(freq, lbound, ubound));
345 		    freq = 0;
346 		}
347 		last_key = key;
348 	    }
349 
350 	    const string & tag = cur->tag;
351 
352 	    const char * pos = tag.data();
353 	    const char * end = pos + tag.size();
354 
355 	    Xapian::doccount f;
356 	    string l, u;
357 	    if (!unpack_uint(&pos, end, &f)) {
358 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
359 		throw Xapian::RangeError("Frequency statistic in value table is too large");
360 	    }
361 	    if (!unpack_string(&pos, end, l)) {
362 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
363 		throw Xapian::RangeError("Lower bound in value table is too large");
364 	    }
365 	    size_t len = end - pos;
366 	    if (len == 0) {
367 		u = l;
368 	    } else {
369 		u.assign(pos, len);
370 	    }
371 	    if (freq == 0) {
372 		freq = f;
373 		lbound = l;
374 		ubound = u;
375 	    } else {
376 		freq += f;
377 		if (l < lbound) lbound = l;
378 		if (u > ubound) ubound = u;
379 	    }
380 
381 	    pq.pop();
382 	    if (cur->next()) {
383 		pq.push(cur);
384 	    } else {
385 		delete cur;
386 	    }
387 	}
388 
389 	if (freq) {
390 	    out->add(last_key, encode_valuestats(freq, lbound, ubound));
391 	}
392     }
393 
394     // Merge valuestream chunks.
395     while (!pq.empty()) {
396 	PostlistCursor * cur = pq.top();
397 	const string & key = cur->key;
398 	if (!is_valuechunk_key(key)) break;
399 	Assert(!is_user_metadata_key(key));
400 	out->add(key, cur->tag);
401 	pq.pop();
402 	if (cur->next()) {
403 	    pq.push(cur);
404 	} else {
405 	    delete cur;
406 	}
407     }
408 
409     Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
410     vector<pair<Xapian::docid, string> > tags;
411     while (true) {
412 	PostlistCursor * cur = NULL;
413 	if (!pq.empty()) {
414 	    cur = pq.top();
415 	    pq.pop();
416 	}
417 	Assert(cur == NULL || !is_user_metadata_key(cur->key));
418 	if (cur == NULL || cur->key != last_key) {
419 	    if (!tags.empty()) {
420 		string first_tag;
421 		pack_uint(first_tag, tf);
422 		pack_uint(first_tag, cf);
423 		pack_uint(first_tag, tags[0].first - 1);
424 		string tag = tags[0].second;
425 		tag[0] = (tags.size() == 1) ? '1' : '0';
426 		first_tag += tag;
427 		out->add(last_key, first_tag);
428 
429 		string term;
430 		if (!is_doclenchunk_key(last_key)) {
431 		    const char * p = last_key.data();
432 		    const char * end = p + last_key.size();
433 		    if (!unpack_string_preserving_sort(&p, end, term) || p != end)
434 			throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
435 		}
436 
437 		vector<pair<Xapian::docid, string> >::const_iterator i;
438 		i = tags.begin();
439 		while (++i != tags.end()) {
440 		    tag = i->second;
441 		    tag[0] = (i + 1 == tags.end()) ? '1' : '0';
442 		    out->add(pack_chert_postlist_key(term, i->first), tag);
443 		}
444 	    }
445 	    tags.clear();
446 	    if (cur == NULL) break;
447 	    tf = cf = 0;
448 	    last_key = cur->key;
449 	}
450 	tf += cur->tf;
451 	cf += cur->cf;
452 	tags.push_back(make_pair(cur->firstdid, cur->tag));
453 	if (cur->next()) {
454 	    pq.push(cur);
455 	} else {
456 	    delete cur;
457 	}
458     }
459 }
460 
461 struct MergeCursor : public ChertCursor {
MergeCursorChertCompact::MergeCursor462     explicit MergeCursor(ChertTable *in) : ChertCursor(in) {
463 	find_entry(string());
464 	next();
465     }
466 };
467 
468 struct CursorGt {
469     /// Return true if and only if a's key is strictly greater than b's key.
operator ()ChertCompact::CursorGt470     bool operator()(const ChertCursor *a, const ChertCursor *b) const {
471 	if (b->after_end()) return false;
472 	if (a->after_end()) return true;
473 	return (a->current_key > b->current_key);
474     }
475 };
476 
477 static void
merge_spellings(ChertTable * out,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e)478 merge_spellings(ChertTable * out,
479 		vector<ChertTable*>::const_iterator b,
480 		vector<ChertTable*>::const_iterator e)
481 {
482     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
483     for ( ; b != e; ++b) {
484 	ChertTable *in = *b;
485 	if (!in->empty()) {
486 	    pq.push(new MergeCursor(in));
487 	}
488     }
489 
490     while (!pq.empty()) {
491 	MergeCursor * cur = pq.top();
492 	pq.pop();
493 
494 	string key = cur->current_key;
495 	if (pq.empty() || pq.top()->current_key > key) {
496 	    // No need to merge the tags, just copy the (possibly compressed)
497 	    // tag value.
498 	    bool compressed = cur->read_tag(true);
499 	    out->add(key, cur->current_tag, compressed);
500 	    if (cur->next()) {
501 		pq.push(cur);
502 	    } else {
503 		delete cur;
504 	    }
505 	    continue;
506 	}
507 
508 	// Merge tag values with the same key:
509 	string tag;
510 	if (key[0] != 'W') {
511 	    // We just want the union of words, so copy over the first instance
512 	    // and skip any identical ones.
513 	    priority_queue<PrefixCompressedStringItor *,
514 			   vector<PrefixCompressedStringItor *>,
515 			   PrefixCompressedStringItorGt> pqtag;
516 	    // Stick all the MergeCursor pointers in a vector because their
517 	    // current_tag members must remain valid while we're merging their
518 	    // tags, but we need to call next() on them all afterwards.
519 	    vector<MergeCursor *> vec;
520 	    vec.reserve(pq.size());
521 
522 	    while (true) {
523 		cur->read_tag();
524 		pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
525 		vec.push_back(cur);
526 		if (pq.empty() || pq.top()->current_key != key) break;
527 		cur = pq.top();
528 		pq.pop();
529 	    }
530 
531 	    PrefixCompressedStringWriter wr(tag);
532 	    string lastword;
533 	    while (!pqtag.empty()) {
534 		PrefixCompressedStringItor * it = pqtag.top();
535 		pqtag.pop();
536 		string word = **it;
537 		if (word != lastword) {
538 		    lastword = word;
539 		    wr.append(lastword);
540 		}
541 		++*it;
542 		if (!it->at_end()) {
543 		    pqtag.push(it);
544 		} else {
545 		    delete it;
546 		}
547 	    }
548 
549 	    vector<MergeCursor *>::const_iterator i;
550 	    for (i = vec.begin(); i != vec.end(); ++i) {
551 		cur = *i;
552 		if (cur->next()) {
553 		    pq.push(cur);
554 		} else {
555 		    delete cur;
556 		}
557 	    }
558 	} else {
559 	    // We want to sum the frequencies from tags for the same key.
560 	    Xapian::termcount tot_freq = 0;
561 	    while (true) {
562 		cur->read_tag();
563 		Xapian::termcount freq;
564 		const char * p = cur->current_tag.data();
565 		const char * end = p + cur->current_tag.size();
566 		if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
567 		    throw Xapian::DatabaseCorruptError("Bad spelling word freq");
568 		}
569 		tot_freq += freq;
570 		if (cur->next()) {
571 		    pq.push(cur);
572 		} else {
573 		    delete cur;
574 		}
575 		if (pq.empty() || pq.top()->current_key != key) break;
576 		cur = pq.top();
577 		pq.pop();
578 	    }
579 	    tag.resize(0);
580 	    pack_uint_last(tag, tot_freq);
581 	}
582 	out->add(key, tag);
583     }
584 }
585 
586 static void
merge_synonyms(ChertTable * out,vector<ChertTable * >::const_iterator b,vector<ChertTable * >::const_iterator e)587 merge_synonyms(ChertTable * out,
588 	       vector<ChertTable*>::const_iterator b,
589 	       vector<ChertTable*>::const_iterator e)
590 {
591     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
592     for ( ; b != e; ++b) {
593 	ChertTable *in = *b;
594 	if (!in->empty()) {
595 	    pq.push(new MergeCursor(in));
596 	}
597     }
598 
599     while (!pq.empty()) {
600 	MergeCursor * cur = pq.top();
601 	pq.pop();
602 
603 	string key = cur->current_key;
604 	if (pq.empty() || pq.top()->current_key > key) {
605 	    // No need to merge the tags, just copy the (possibly compressed)
606 	    // tag value.
607 	    bool compressed = cur->read_tag(true);
608 	    out->add(key, cur->current_tag, compressed);
609 	    if (cur->next()) {
610 		pq.push(cur);
611 	    } else {
612 		delete cur;
613 	    }
614 	    continue;
615 	}
616 
617 	// Merge tag values with the same key:
618 	string tag;
619 
620 	// We just want the union of words, so copy over the first instance
621 	// and skip any identical ones.
622 	priority_queue<ByteLengthPrefixedStringItor *,
623 		       vector<ByteLengthPrefixedStringItor *>,
624 		       ByteLengthPrefixedStringItorGt> pqtag;
625 	vector<MergeCursor *> vec;
626 
627 	while (true) {
628 	    cur->read_tag();
629 	    pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
630 	    vec.push_back(cur);
631 	    if (pq.empty() || pq.top()->current_key != key) break;
632 	    cur = pq.top();
633 	    pq.pop();
634 	}
635 
636 	string lastword;
637 	while (!pqtag.empty()) {
638 	    ByteLengthPrefixedStringItor * it = pqtag.top();
639 	    pqtag.pop();
640 	    if (**it != lastword) {
641 		lastword = **it;
642 		tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
643 		tag += lastword;
644 	    }
645 	    ++*it;
646 	    if (!it->at_end()) {
647 		pqtag.push(it);
648 	    } else {
649 		delete it;
650 	    }
651 	}
652 
653 	vector<MergeCursor *>::const_iterator i;
654 	for (i = vec.begin(); i != vec.end(); ++i) {
655 	    cur = *i;
656 	    if (cur->next()) {
657 		pq.push(cur);
658 	    } else {
659 		delete cur;
660 	    }
661 	}
662 
663 	out->add(key, tag);
664     }
665 }
666 
667 static void
multimerge_postlists(Xapian::Compactor * compactor,ChertTable * out,const char * tmpdir,vector<ChertTable * > tmp,vector<Xapian::docid> off,Xapian::docid last_docid)668 multimerge_postlists(Xapian::Compactor * compactor,
669 		     ChertTable * out, const char * tmpdir,
670 		     vector<ChertTable *> tmp,
671 		     vector<Xapian::docid> off,
672 		     Xapian::docid last_docid)
673 {
674     unsigned int c = 0;
675     while (tmp.size() > 3) {
676 	vector<ChertTable *> tmpout;
677 	tmpout.reserve(tmp.size() / 2);
678 	vector<Xapian::docid> newoff;
679 	newoff.resize(tmp.size() / 2);
680 	for (unsigned int i = 0, j; i < tmp.size(); i = j) {
681 	    j = i + 2;
682 	    if (j == tmp.size() - 1) ++j;
683 
684 	    string dest = tmpdir;
685 	    char buf[64];
686 	    sprintf(buf, "/tmp%u_%u.", c, i / 2);
687 	    dest += buf;
688 
689 	    // Don't compress temporary tables, even if the final table would
690 	    // be.
691 	    ChertTable * tmptab = new ChertTable("postlist", dest, false);
692 	    // Use maximum blocksize for temporary tables.
693 	    tmptab->create_and_open(65536);
694 
695 	    merge_postlists(compactor, tmptab, off.begin() + i,
696 			    tmp.begin() + i, tmp.begin() + j,
697 			    last_docid);
698 	    if (c > 0) {
699 		for (unsigned int k = i; k < j; ++k) {
700 		    unlink((tmp[k]->get_path() + "DB").c_str());
701 		    unlink((tmp[k]->get_path() + "baseA").c_str());
702 		    unlink((tmp[k]->get_path() + "baseB").c_str());
703 		    delete tmp[k];
704 		    tmp[k] = NULL;
705 		}
706 	    }
707 	    tmpout.push_back(tmptab);
708 	    tmptab->flush_db();
709 	    tmptab->commit(1);
710 	}
711 	swap(tmp, tmpout);
712 	swap(off, newoff);
713 	++c;
714     }
715     merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end(),
716 		    last_docid);
717     if (c > 0) {
718 	for (size_t k = 0; k < tmp.size(); ++k) {
719 	    unlink((tmp[k]->get_path() + "DB").c_str());
720 	    unlink((tmp[k]->get_path() + "baseA").c_str());
721 	    unlink((tmp[k]->get_path() + "baseB").c_str());
722 	    delete tmp[k];
723 	    tmp[k] = NULL;
724 	}
725     }
726 }
727 
728 static void
merge_docid_keyed(ChertTable * out,const vector<ChertTable * > & inputs,const vector<Xapian::docid> & offset)729 merge_docid_keyed(ChertTable *out, const vector<ChertTable*> & inputs,
730 		  const vector<Xapian::docid> & offset)
731 {
732     for (size_t i = 0; i < inputs.size(); ++i) {
733 	Xapian::docid off = offset[i];
734 
735 	ChertTable * in = inputs[i];
736 	if (in->empty()) continue;
737 
738 	ChertCursor cur(in);
739 	cur.find_entry(string());
740 
741 	string key;
742 	while (cur.next()) {
743 	    // Adjust the key if this isn't the first database.
744 	    if (off) {
745 		Xapian::docid did;
746 		const char * d = cur.current_key.data();
747 		const char * e = d + cur.current_key.size();
748 		if (!C_unpack_uint_preserving_sort(&d, e, &did)) {
749 		    string msg = "Bad key in ";
750 		    msg += inputs[i]->get_path();
751 		    msg += "DB";
752 		    throw Xapian::DatabaseCorruptError(msg);
753 		}
754 		did += off;
755 		key.resize(0);
756 		C_pack_uint_preserving_sort(key, did);
757 		if (d != e) {
758 		    // Copy over the termname for the position table.
759 		    key.append(d, e - d);
760 		}
761 	    } else {
762 		key = cur.current_key;
763 	    }
764 	    bool compressed = cur.read_tag(true);
765 	    out->add(key, cur.current_tag, compressed);
766 	}
767     }
768 }
769 
770 }
771 
772 using namespace ChertCompact;
773 
774 void
compact(Xapian::Compactor * compactor,const char * destdir,const vector<Xapian::Database::Internal * > & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,unsigned flags,Xapian::docid last_docid)775 ChertDatabase::compact(Xapian::Compactor * compactor,
776 		       const char * destdir,
777 		       const vector<Xapian::Database::Internal*> & sources,
778 		       const vector<Xapian::docid> & offset,
779 		       size_t block_size,
780 		       Xapian::Compactor::compaction_level compaction,
781 		       unsigned flags,
782 		       Xapian::docid last_docid)
783 {
784     enum table_type {
785 	POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
786     };
787     struct table_list {
788 	// The "base name" of the table.
789 	const char * name;
790 	// The type.
791 	table_type type;
792 	// zlib compression strategy to use on tags.
793 	int compress_strategy;
794 	// Create tables after position lazily.
795 	bool lazy;
796     };
797 
798     static const table_list tables[] = {
799 	// name		type		compress_strategy	lazy
800 	{ "postlist",	POSTLIST,	DONT_COMPRESS,		false },
801 	{ "record",	RECORD,		Z_DEFAULT_STRATEGY,	false },
802 	{ "termlist",	TERMLIST,	Z_DEFAULT_STRATEGY,	false },
803 	{ "position",	POSITION,	DONT_COMPRESS,		true },
804 	{ "spelling",	SPELLING,	Z_DEFAULT_STRATEGY,	true },
805 	{ "synonym",	SYNONYM,	Z_DEFAULT_STRATEGY,	true }
806     };
807     const table_list * tables_end = tables +
808 	(sizeof(tables) / sizeof(tables[0]));
809 
810     bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
811     bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
812     if (single_file) {
813 	throw Xapian::InvalidOperationError("chert doesn't support single file databases");
814     }
815 
816     for (size_t i = 0; i != sources.size(); ++i) {
817 	ChertDatabase * db = static_cast<ChertDatabase*>(sources[i]);
818 	if (db->has_uncommitted_changes()) {
819 	    const char * m =
820 		"Can't compact from a WritableDatabase with uncommitted "
821 		"changes - either call commit() first, or create a new "
822 		"Database object from the filename on disk";
823 	    throw Xapian::InvalidOperationError(m);
824 	}
825     }
826 
827     if (block_size < 2048 || block_size > 65536 ||
828 	(block_size & (block_size - 1)) != 0) {
829 	block_size = CHERT_DEFAULT_BLOCK_SIZE;
830     }
831 
832     FlintLock lock(destdir);
833     {
834 	string explanation;
835 	FlintLock::reason why = lock.lock(true, false, explanation);
836 	if (why != FlintLock::SUCCESS) {
837 	    lock.throw_databaselockerror(why, destdir, explanation);
838 	}
839     }
840 
841     vector<ChertTable *> tabs;
842     tabs.reserve(tables_end - tables);
843     for (const table_list * t = tables; t < tables_end; ++t) {
844 	// The postlist table requires an N-way merge, adjusting the
845 	// headers of various blocks.  The spelling and synonym tables also
846 	// need special handling.  The other tables have keys sorted in
847 	// docid order, so we can merge them by simply copying all the keys
848 	// from each source table in turn.
849 	if (compactor)
850 	    compactor->set_status(t->name, string());
851 
852 	string dest = destdir;
853 	dest += '/';
854 	dest += t->name;
855 	dest += '.';
856 
857 	bool output_will_exist = !t->lazy;
858 
859 	// Sometimes stat can fail for benign reasons (e.g. >= 2GB file
860 	// on certain systems).
861 	bool bad_stat = false;
862 
863 	off_t in_size = 0;
864 
865 	vector<ChertTable*> inputs;
866 	inputs.reserve(sources.size());
867 	size_t inputs_present = 0;
868 	for (auto src : sources) {
869 	    ChertDatabase * db = static_cast<ChertDatabase*>(src);
870 	    ChertTable * table;
871 	    switch (t->type) {
872 		case POSTLIST:
873 		    table = &(db->postlist_table);
874 		    break;
875 		case RECORD:
876 		    table = &(db->record_table);
877 		    break;
878 		case TERMLIST:
879 		    table = &(db->termlist_table);
880 		    break;
881 		case POSITION:
882 		    table = &(db->position_table);
883 		    break;
884 		case SPELLING:
885 		    table = &(db->spelling_table);
886 		    break;
887 		case SYNONYM:
888 		    table = &(db->synonym_table);
889 		    break;
890 		default:
891 		    Assert(false);
892 		    return;
893 	    }
894 
895 	    off_t db_size = file_size(table->get_path() + "DB");
896 	    if (errno == 0) {
897 		in_size += db_size / 1024;
898 		output_will_exist = true;
899 		++inputs_present;
900 	    } else if (errno != ENOENT) {
901 		// We get ENOENT for an optional table.
902 		bad_stat = true;
903 		output_will_exist = true;
904 		++inputs_present;
905 	    }
906 	    inputs.push_back(table);
907 	}
908 
909 	// If any inputs lack a termlist table, suppress it in the output.
910 	if (t->type == TERMLIST && inputs_present != sources.size()) {
911 	    if (inputs_present != 0) {
912 		if (compactor) {
913 		    string m = str(inputs_present);
914 		    m += " of ";
915 		    m += str(sources.size());
916 		    m += " inputs present, so suppressing output";
917 		    compactor->set_status(t->name, m);
918 		}
919 		continue;
920 	    }
921 	    output_will_exist = false;
922 	}
923 
924 	if (!output_will_exist) {
925 	    if (compactor)
926 		compactor->set_status(t->name, "doesn't exist");
927 	    continue;
928 	}
929 
930 	ChertTable out(t->name, dest, false, t->compress_strategy, t->lazy);
931 	if (!t->lazy) {
932 	    out.create_and_open(block_size);
933 	} else {
934 	    out.erase();
935 	    out.set_block_size(block_size);
936 	}
937 
938 	out.set_full_compaction(compaction != compactor->STANDARD);
939 	if (compaction == compactor->FULLER) out.set_max_item_size(1);
940 
941 	switch (t->type) {
942 	    case POSTLIST:
943 		if (multipass && inputs.size() > 3) {
944 		    multimerge_postlists(compactor, &out, destdir, inputs,
945 					 offset, last_docid);
946 		} else {
947 		    merge_postlists(compactor, &out, offset.begin(),
948 				    inputs.begin(), inputs.end(),
949 				    last_docid);
950 		}
951 		break;
952 	    case SPELLING:
953 		merge_spellings(&out, inputs.begin(), inputs.end());
954 		break;
955 	    case SYNONYM:
956 		merge_synonyms(&out, inputs.begin(), inputs.end());
957 		break;
958 	    default:
959 		// Position, Record, Termlist
960 		merge_docid_keyed(&out, inputs, offset);
961 		break;
962 	}
963 
964 	// Commit as revision 1.
965 	out.flush_db();
966 	out.commit(1);
967 
968 	off_t out_size = 0;
969 	if (!bad_stat) {
970 	    off_t db_size = file_size(dest + "DB");
971 	    if (errno == 0) {
972 		out_size = db_size / 1024;
973 	    } else {
974 		bad_stat = (errno != ENOENT);
975 	    }
976 	}
977 	if (bad_stat) {
978 	    if (compactor)
979 		compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
980 	} else {
981 	    string status;
982 	    if (out_size == in_size) {
983 		status = "Size unchanged (";
984 	    } else {
985 		off_t delta;
986 		if (out_size < in_size) {
987 		    delta = in_size - out_size;
988 		    status = "Reduced by ";
989 		} else {
990 		    delta = out_size - in_size;
991 		    status = "INCREASED by ";
992 		}
993 		if (in_size) {
994 		    status += str(100 * delta / in_size);
995 		    status += "% ";
996 		}
997 		status += str(delta);
998 		status += "K (";
999 		status += str(in_size);
1000 		status += "K -> ";
1001 	    }
1002 	    status += str(out_size);
1003 	    status += "K)";
1004 	    if (compactor)
1005 		compactor->set_status(t->name, status);
1006 	}
1007     }
1008 }
1009