1 /** @file
2  * @brief Compact a glass database, or merge and compact several.
3  */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28 
29 #include "autoptr.h"
30 #include <algorithm>
31 #include <queue>
32 
33 #include <cerrno>
34 #include <cstdio>
35 
36 #include "backends/flint_lock.h"
37 #include "glass_database.h"
38 #include "glass_defs.h"
39 #include "glass_table.h"
40 #include "glass_cursor.h"
41 #include "glass_version.h"
42 #include "filetests.h"
43 #include "internaltypes.h"
44 #include "pack.h"
45 #include "backends/valuestats.h"
46 
47 #include "../byte_length_strings.h"
48 #include "../prefix_compressed_strings.h"
49 
50 using namespace std;
51 
52 // Put all the helpers in a namespace to avoid symbols colliding with those of
53 // the same name in other flint-derived backends.
54 namespace GlassCompact {
55 
56 static inline bool
is_user_metadata_key(const string & key)57 is_user_metadata_key(const string & key)
58 {
59     return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
60 }
61 
62 static inline bool
is_valuestats_key(const string & key)63 is_valuestats_key(const string & key)
64 {
65     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
66 }
67 
68 static inline bool
is_valuechunk_key(const string & key)69 is_valuechunk_key(const string & key)
70 {
71     return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
72 }
73 
74 static inline bool
is_doclenchunk_key(const string & key)75 is_doclenchunk_key(const string & key)
76 {
77     return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
78 }
79 
80 class PostlistCursor : private GlassCursor {
81     Xapian::docid offset;
82 
83   public:
84     string key, tag;
85     Xapian::docid firstdid;
86     Xapian::termcount tf, cf;
87 
PostlistCursor(GlassTable * in,Xapian::docid offset_)88     PostlistCursor(GlassTable *in, Xapian::docid offset_)
89 	: GlassCursor(in), offset(offset_), firstdid(0)
90     {
91 	find_entry(string());
92 	next();
93     }
94 
next()95     bool next() {
96 	if (!GlassCursor::next()) return false;
97 	// We put all chunks into the non-initial chunk form here, then fix up
98 	// the first chunk for each term in the merged database as we merge.
99 	read_tag();
100 	key = current_key;
101 	tag = current_tag;
102 	tf = cf = 0;
103 	if (is_user_metadata_key(key)) return true;
104 	if (is_valuestats_key(key)) return true;
105 	if (is_valuechunk_key(key)) {
106 	    const char * p = key.data();
107 	    const char * end = p + key.length();
108 	    p += 2;
109 	    Xapian::valueno slot;
110 	    if (!unpack_uint(&p, end, &slot))
111 		throw Xapian::DatabaseCorruptError("bad value key");
112 	    Xapian::docid did;
113 	    if (!unpack_uint_preserving_sort(&p, end, &did))
114 		throw Xapian::DatabaseCorruptError("bad value key");
115 	    did += offset;
116 
117 	    key.assign("\0\xd8", 2);
118 	    pack_uint(key, slot);
119 	    pack_uint_preserving_sort(key, did);
120 	    return true;
121 	}
122 
123 	// Adjust key if this is *NOT* an initial chunk.
124 	// key is: pack_string_preserving_sort(key, tname)
125 	// plus optionally: pack_uint_preserving_sort(key, did)
126 	const char * d = key.data();
127 	const char * e = d + key.size();
128 	if (is_doclenchunk_key(key)) {
129 	    d += 2;
130 	} else {
131 	    string tname;
132 	    if (!unpack_string_preserving_sort(&d, e, tname))
133 		throw Xapian::DatabaseCorruptError("Bad postlist key");
134 	}
135 
136 	if (d == e) {
137 	    // This is an initial chunk for a term, so adjust tag header.
138 	    d = tag.data();
139 	    e = d + tag.size();
140 	    if (!unpack_uint(&d, e, &tf) ||
141 		!unpack_uint(&d, e, &cf) ||
142 		!unpack_uint(&d, e, &firstdid)) {
143 		throw Xapian::DatabaseCorruptError("Bad postlist key");
144 	    }
145 	    ++firstdid;
146 	    tag.erase(0, d - tag.data());
147 	} else {
148 	    // Not an initial chunk, so adjust key.
149 	    size_t tmp = d - key.data();
150 	    if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
151 		throw Xapian::DatabaseCorruptError("Bad postlist key");
152 	    if (is_doclenchunk_key(key)) {
153 		key.erase(tmp);
154 	    } else {
155 		key.erase(tmp - 1);
156 	    }
157 	}
158 	firstdid += offset;
159 	return true;
160     }
161 };
162 
163 class PostlistCursorGt {
164   public:
165     /** Return true if and only if a's key is strictly greater than b's key.
166      */
operator ()(const PostlistCursor * a,const PostlistCursor * b) const167     bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
168 	if (a->key > b->key) return true;
169 	if (a->key != b->key) return false;
170 	return (a->firstdid > b->firstdid);
171     }
172 };
173 
174 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)175 encode_valuestats(Xapian::doccount freq,
176 		  const string & lbound, const string & ubound)
177 {
178     string value;
179     pack_uint(value, freq);
180     pack_string(value, lbound);
181     // We don't store or count empty values, so neither of the bounds
182     // can be empty.  So we can safely store an empty upper bound when
183     // the bounds are equal.
184     if (lbound != ubound) value += ubound;
185     return value;
186 }
187 
188 static void
merge_postlists(Xapian::Compactor * compactor,GlassTable * out,vector<Xapian::docid>::const_iterator offset,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)189 merge_postlists(Xapian::Compactor * compactor,
190 		GlassTable * out, vector<Xapian::docid>::const_iterator offset,
191 		vector<GlassTable*>::const_iterator b,
192 		vector<GlassTable*>::const_iterator e)
193 {
194     priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
195     for ( ; b != e; ++b, ++offset) {
196 	GlassTable *in = *b;
197 	if (in->empty()) {
198 	    // Skip empty tables.
199 	    continue;
200 	}
201 
202 	pq.push(new PostlistCursor(in, *offset));
203     }
204 
205     string last_key;
206     {
207 	// Merge user metadata.
208 	vector<string> tags;
209 	while (!pq.empty()) {
210 	    PostlistCursor * cur = pq.top();
211 	    const string& key = cur->key;
212 	    if (!is_user_metadata_key(key)) break;
213 
214 	    if (key != last_key) {
215 		if (!tags.empty()) {
216 		    if (tags.size() > 1 && compactor) {
217 			Assert(!last_key.empty());
218 			// FIXME: It would be better to merge all duplicates
219 			// for a key in one call, but currently we don't in
220 			// multipass mode.
221 			const string & resolved_tag =
222 			    compactor->resolve_duplicate_metadata(last_key,
223 								  tags.size(),
224 								  &tags[0]);
225 			if (!resolved_tag.empty())
226 			    out->add(last_key, resolved_tag);
227 		    } else {
228 			Assert(!last_key.empty());
229 			out->add(last_key, tags[0]);
230 		    }
231 		    tags.resize(0);
232 		}
233 		last_key = key;
234 	    }
235 	    tags.push_back(cur->tag);
236 
237 	    pq.pop();
238 	    if (cur->next()) {
239 		pq.push(cur);
240 	    } else {
241 		delete cur;
242 	    }
243 	}
244 	if (!tags.empty()) {
245 	    if (tags.size() > 1 && compactor) {
246 		Assert(!last_key.empty());
247 		const string & resolved_tag =
248 		    compactor->resolve_duplicate_metadata(last_key,
249 							  tags.size(),
250 							  &tags[0]);
251 		if (!resolved_tag.empty())
252 		    out->add(last_key, resolved_tag);
253 	    } else {
254 		Assert(!last_key.empty());
255 		out->add(last_key, tags[0]);
256 	    }
257 	}
258     }
259 
260     {
261 	// Merge valuestats.
262 	Xapian::doccount freq = 0;
263 	string lbound, ubound;
264 
265 	while (!pq.empty()) {
266 	    PostlistCursor * cur = pq.top();
267 	    const string& key = cur->key;
268 	    if (!is_valuestats_key(key)) break;
269 	    if (key != last_key) {
270 		// For the first valuestats key, last_key will be the previous
271 		// key we wrote, which we don't want to overwrite.  This is the
272 		// only time that freq will be 0, so check that.
273 		if (freq) {
274 		    out->add(last_key, encode_valuestats(freq, lbound, ubound));
275 		    freq = 0;
276 		}
277 		last_key = key;
278 	    }
279 
280 	    const string & tag = cur->tag;
281 
282 	    const char * pos = tag.data();
283 	    const char * end = pos + tag.size();
284 
285 	    Xapian::doccount f;
286 	    string l, u;
287 	    if (!unpack_uint(&pos, end, &f)) {
288 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
289 		throw Xapian::RangeError("Frequency statistic in value table is too large");
290 	    }
291 	    if (!unpack_string(&pos, end, l)) {
292 		if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
293 		throw Xapian::RangeError("Lower bound in value table is too large");
294 	    }
295 	    size_t len = end - pos;
296 	    if (len == 0) {
297 		u = l;
298 	    } else {
299 		u.assign(pos, len);
300 	    }
301 	    if (freq == 0) {
302 		freq = f;
303 		lbound = l;
304 		ubound = u;
305 	    } else {
306 		freq += f;
307 		if (l < lbound) lbound = l;
308 		if (u > ubound) ubound = u;
309 	    }
310 
311 	    pq.pop();
312 	    if (cur->next()) {
313 		pq.push(cur);
314 	    } else {
315 		delete cur;
316 	    }
317 	}
318 
319 	if (freq) {
320 	    out->add(last_key, encode_valuestats(freq, lbound, ubound));
321 	}
322     }
323 
324     // Merge valuestream chunks.
325     while (!pq.empty()) {
326 	PostlistCursor * cur = pq.top();
327 	const string & key = cur->key;
328 	if (!is_valuechunk_key(key)) break;
329 	Assert(!is_user_metadata_key(key));
330 	out->add(key, cur->tag);
331 	pq.pop();
332 	if (cur->next()) {
333 	    pq.push(cur);
334 	} else {
335 	    delete cur;
336 	}
337     }
338 
339     Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
340     vector<pair<Xapian::docid, string>> tags;
341     while (true) {
342 	PostlistCursor * cur = NULL;
343 	if (!pq.empty()) {
344 	    cur = pq.top();
345 	    pq.pop();
346 	}
347 	Assert(cur == NULL || !is_user_metadata_key(cur->key));
348 	if (cur == NULL || cur->key != last_key) {
349 	    if (!tags.empty()) {
350 		string first_tag;
351 		pack_uint(first_tag, tf);
352 		pack_uint(first_tag, cf);
353 		pack_uint(first_tag, tags[0].first - 1);
354 		string tag = tags[0].second;
355 		tag[0] = (tags.size() == 1) ? '1' : '0';
356 		first_tag += tag;
357 		out->add(last_key, first_tag);
358 
359 		string term;
360 		if (!is_doclenchunk_key(last_key)) {
361 		    const char * p = last_key.data();
362 		    const char * end = p + last_key.size();
363 		    if (!unpack_string_preserving_sort(&p, end, term) || p != end)
364 			throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
365 		}
366 
367 		auto i = tags.begin();
368 		while (++i != tags.end()) {
369 		    tag = i->second;
370 		    tag[0] = (i + 1 == tags.end()) ? '1' : '0';
371 		    out->add(pack_glass_postlist_key(term, i->first), tag);
372 		}
373 	    }
374 	    tags.clear();
375 	    if (cur == NULL) break;
376 	    tf = cf = 0;
377 	    last_key = cur->key;
378 	}
379 	tf += cur->tf;
380 	cf += cur->cf;
381 	tags.push_back(make_pair(cur->firstdid, cur->tag));
382 	if (cur->next()) {
383 	    pq.push(cur);
384 	} else {
385 	    delete cur;
386 	}
387     }
388 }
389 
390 struct MergeCursor : public GlassCursor {
MergeCursorGlassCompact::MergeCursor391     explicit MergeCursor(GlassTable *in) : GlassCursor(in) {
392 	find_entry(string());
393 	next();
394     }
395 };
396 
397 struct CursorGt {
398     /// Return true if and only if a's key is strictly greater than b's key.
operator ()GlassCompact::CursorGt399     bool operator()(const GlassCursor *a, const GlassCursor *b) const {
400 	if (b->after_end()) return false;
401 	if (a->after_end()) return true;
402 	return (a->current_key > b->current_key);
403     }
404 };
405 
406 static void
merge_spellings(GlassTable * out,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)407 merge_spellings(GlassTable * out,
408 		vector<GlassTable*>::const_iterator b,
409 		vector<GlassTable*>::const_iterator e)
410 {
411     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
412     for ( ; b != e; ++b) {
413 	GlassTable *in = *b;
414 	if (!in->empty()) {
415 	    pq.push(new MergeCursor(in));
416 	}
417     }
418 
419     while (!pq.empty()) {
420 	MergeCursor * cur = pq.top();
421 	pq.pop();
422 
423 	string key = cur->current_key;
424 	if (pq.empty() || pq.top()->current_key > key) {
425 	    // No need to merge the tags, just copy the (possibly compressed)
426 	    // tag value.
427 	    bool compressed = cur->read_tag(true);
428 	    out->add(key, cur->current_tag, compressed);
429 	    if (cur->next()) {
430 		pq.push(cur);
431 	    } else {
432 		delete cur;
433 	    }
434 	    continue;
435 	}
436 
437 	// Merge tag values with the same key:
438 	string tag;
439 	if (key[0] != 'W') {
440 	    // We just want the union of words, so copy over the first instance
441 	    // and skip any identical ones.
442 	    priority_queue<PrefixCompressedStringItor *,
443 			   vector<PrefixCompressedStringItor *>,
444 			   PrefixCompressedStringItorGt> pqtag;
445 	    // Stick all the MergeCursor pointers in a vector because their
446 	    // current_tag members must remain valid while we're merging their
447 	    // tags, but we need to call next() on them all afterwards.
448 	    vector<MergeCursor *> vec;
449 	    vec.reserve(pq.size());
450 
451 	    while (true) {
452 		cur->read_tag();
453 		pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
454 		vec.push_back(cur);
455 		if (pq.empty() || pq.top()->current_key != key) break;
456 		cur = pq.top();
457 		pq.pop();
458 	    }
459 
460 	    PrefixCompressedStringWriter wr(tag);
461 	    string lastword;
462 	    while (!pqtag.empty()) {
463 		PrefixCompressedStringItor * it = pqtag.top();
464 		pqtag.pop();
465 		string word = **it;
466 		if (word != lastword) {
467 		    lastword = word;
468 		    wr.append(lastword);
469 		}
470 		++*it;
471 		if (!it->at_end()) {
472 		    pqtag.push(it);
473 		} else {
474 		    delete it;
475 		}
476 	    }
477 
478 	    vector<MergeCursor *>::const_iterator i;
479 	    for (i = vec.begin(); i != vec.end(); ++i) {
480 		cur = *i;
481 		if (cur->next()) {
482 		    pq.push(cur);
483 		} else {
484 		    delete cur;
485 		}
486 	    }
487 	} else {
488 	    // We want to sum the frequencies from tags for the same key.
489 	    Xapian::termcount tot_freq = 0;
490 	    while (true) {
491 		cur->read_tag();
492 		Xapian::termcount freq;
493 		const char * p = cur->current_tag.data();
494 		const char * end = p + cur->current_tag.size();
495 		if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
496 		    throw Xapian::DatabaseCorruptError("Bad spelling word freq");
497 		}
498 		tot_freq += freq;
499 		if (cur->next()) {
500 		    pq.push(cur);
501 		} else {
502 		    delete cur;
503 		}
504 		if (pq.empty() || pq.top()->current_key != key) break;
505 		cur = pq.top();
506 		pq.pop();
507 	    }
508 	    tag.resize(0);
509 	    pack_uint_last(tag, tot_freq);
510 	}
511 	out->add(key, tag);
512     }
513 }
514 
515 static void
merge_synonyms(GlassTable * out,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)516 merge_synonyms(GlassTable * out,
517 	       vector<GlassTable*>::const_iterator b,
518 	       vector<GlassTable*>::const_iterator e)
519 {
520     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
521     for ( ; b != e; ++b) {
522 	GlassTable *in = *b;
523 	if (!in->empty()) {
524 	    pq.push(new MergeCursor(in));
525 	}
526     }
527 
528     while (!pq.empty()) {
529 	MergeCursor * cur = pq.top();
530 	pq.pop();
531 
532 	string key = cur->current_key;
533 	if (pq.empty() || pq.top()->current_key > key) {
534 	    // No need to merge the tags, just copy the (possibly compressed)
535 	    // tag value.
536 	    bool compressed = cur->read_tag(true);
537 	    out->add(key, cur->current_tag, compressed);
538 	    if (cur->next()) {
539 		pq.push(cur);
540 	    } else {
541 		delete cur;
542 	    }
543 	    continue;
544 	}
545 
546 	// Merge tag values with the same key:
547 	string tag;
548 
549 	// We just want the union of words, so copy over the first instance
550 	// and skip any identical ones.
551 	priority_queue<ByteLengthPrefixedStringItor *,
552 		       vector<ByteLengthPrefixedStringItor *>,
553 		       ByteLengthPrefixedStringItorGt> pqtag;
554 	vector<MergeCursor *> vec;
555 
556 	while (true) {
557 	    cur->read_tag();
558 	    pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
559 	    vec.push_back(cur);
560 	    if (pq.empty() || pq.top()->current_key != key) break;
561 	    cur = pq.top();
562 	    pq.pop();
563 	}
564 
565 	string lastword;
566 	while (!pqtag.empty()) {
567 	    ByteLengthPrefixedStringItor * it = pqtag.top();
568 	    pqtag.pop();
569 	    if (**it != lastword) {
570 		lastword = **it;
571 		tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
572 		tag += lastword;
573 	    }
574 	    ++*it;
575 	    if (!it->at_end()) {
576 		pqtag.push(it);
577 	    } else {
578 		delete it;
579 	    }
580 	}
581 
582 	vector<MergeCursor *>::const_iterator i;
583 	for (i = vec.begin(); i != vec.end(); ++i) {
584 	    cur = *i;
585 	    if (cur->next()) {
586 		pq.push(cur);
587 	    } else {
588 		delete cur;
589 	    }
590 	}
591 
592 	out->add(key, tag);
593     }
594 }
595 
596 static void
multimerge_postlists(Xapian::Compactor * compactor,GlassTable * out,const char * tmpdir,vector<GlassTable * > tmp,vector<Xapian::docid> off)597 multimerge_postlists(Xapian::Compactor * compactor,
598 		     GlassTable * out, const char * tmpdir,
599 		     vector<GlassTable *> tmp,
600 		     vector<Xapian::docid> off)
601 {
602     unsigned int c = 0;
603     while (tmp.size() > 3) {
604 	vector<GlassTable *> tmpout;
605 	tmpout.reserve(tmp.size() / 2);
606 	vector<Xapian::docid> newoff;
607 	newoff.resize(tmp.size() / 2);
608 	for (unsigned int i = 0, j; i < tmp.size(); i = j) {
609 	    j = i + 2;
610 	    if (j == tmp.size() - 1) ++j;
611 
612 	    string dest = tmpdir;
613 	    char buf[64];
614 	    sprintf(buf, "/tmp%u_%u.", c, i / 2);
615 	    dest += buf;
616 
617 	    GlassTable * tmptab = new GlassTable("postlist", dest, false);
618 
619 	    // Use maximum blocksize for temporary tables.  And don't compress
620 	    // entries in temporary tables, even if the final table would do
621 	    // so.  Any already compressed entries will get copied in
622 	    // compressed form.
623 	    RootInfo root_info;
624 	    root_info.init(65536, 0);
625 	    const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
626 	    tmptab->create_and_open(flags, root_info);
627 
628 	    merge_postlists(compactor, tmptab, off.begin() + i,
629 			    tmp.begin() + i, tmp.begin() + j);
630 	    if (c > 0) {
631 		for (unsigned int k = i; k < j; ++k) {
632 		    unlink(tmp[k]->get_path().c_str());
633 		    delete tmp[k];
634 		    tmp[k] = NULL;
635 		}
636 	    }
637 	    tmpout.push_back(tmptab);
638 	    tmptab->flush_db();
639 	    tmptab->commit(1, &root_info);
640 	    AssertRel(root_info.get_blocksize(),==,65536);
641 	}
642 	swap(tmp, tmpout);
643 	swap(off, newoff);
644 	++c;
645     }
646     merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
647     if (c > 0) {
648 	for (size_t k = 0; k < tmp.size(); ++k) {
649 	    unlink(tmp[k]->get_path().c_str());
650 	    delete tmp[k];
651 	    tmp[k] = NULL;
652 	}
653     }
654 }
655 
656 class PositionCursor : private GlassCursor {
657     Xapian::docid offset;
658 
659   public:
660     string key;
661     Xapian::docid firstdid;
662 
PositionCursor(GlassTable * in,Xapian::docid offset_)663     PositionCursor(GlassTable *in, Xapian::docid offset_)
664 	: GlassCursor(in), offset(offset_), firstdid(0) {
665 	find_entry(string());
666 	next();
667     }
668 
next()669     bool next() {
670 	if (!GlassCursor::next()) return false;
671 	read_tag();
672 	const char * d = current_key.data();
673 	const char * e = d + current_key.size();
674 	string term;
675 	Xapian::docid did;
676 	if (!unpack_string_preserving_sort(&d, e, term) ||
677 	    !unpack_uint_preserving_sort(&d, e, &did) ||
678 	    d != e) {
679 	    throw Xapian::DatabaseCorruptError("Bad position key");
680 	}
681 
682 	key.resize(0);
683 	pack_string_preserving_sort(key, term);
684 	pack_uint_preserving_sort(key, did + offset);
685 	return true;
686     }
687 
get_tag() const688     const string & get_tag() const {
689 	return current_tag;
690     }
691 };
692 
693 class PositionCursorGt {
694   public:
695     /** Return true if and only if a's key is strictly greater than b's key.
696      */
operator ()(const PositionCursor * a,const PositionCursor * b) const697     bool operator()(const PositionCursor *a, const PositionCursor *b) const {
698 	return a->key > b->key;
699     }
700 };
701 
702 static void
merge_positions(GlassTable * out,const vector<GlassTable * > & inputs,const vector<Xapian::docid> & offset)703 merge_positions(GlassTable *out, const vector<GlassTable*> & inputs,
704 		const vector<Xapian::docid> & offset)
705 {
706     priority_queue<PositionCursor *, vector<PositionCursor *>, PositionCursorGt> pq;
707     for (size_t i = 0; i < inputs.size(); ++i) {
708 	GlassTable *in = inputs[i];
709 	if (in->empty()) {
710 	    // Skip empty tables.
711 	    continue;
712 	}
713 
714 	pq.push(new PositionCursor(in, offset[i]));
715     }
716 
717     while (!pq.empty()) {
718 	PositionCursor * cur = pq.top();
719 	pq.pop();
720 	out->add(cur->key, cur->get_tag());
721 	if (cur->next()) {
722 	    pq.push(cur);
723 	} else {
724 	    delete cur;
725 	}
726     }
727 }
728 
729 static void
merge_docid_keyed(GlassTable * out,const vector<GlassTable * > & inputs,const vector<Xapian::docid> & offset)730 merge_docid_keyed(GlassTable *out, const vector<GlassTable*> & inputs,
731 		  const vector<Xapian::docid> & offset)
732 {
733     for (size_t i = 0; i < inputs.size(); ++i) {
734 	Xapian::docid off = offset[i];
735 
736 	GlassTable * in = inputs[i];
737 	if (in->empty()) continue;
738 
739 	GlassCursor cur(in);
740 	cur.find_entry(string());
741 
742 	string key;
743 	while (cur.next()) {
744 	    // Adjust the key if this isn't the first database.
745 	    if (off) {
746 		Xapian::docid did;
747 		const char * d = cur.current_key.data();
748 		const char * e = d + cur.current_key.size();
749 		if (!unpack_uint_preserving_sort(&d, e, &did)) {
750 		    string msg = "Bad key in ";
751 		    msg += inputs[i]->get_path();
752 		    throw Xapian::DatabaseCorruptError(msg);
753 		}
754 		did += off;
755 		key.resize(0);
756 		pack_uint_preserving_sort(key, did);
757 		if (d != e) {
758 		    // Copy over anything extra in the key (e.g. the zero byte
759 		    // at the end of "used value slots" in the termlist table).
760 		    key.append(d, e - d);
761 		}
762 	    } else {
763 		key = cur.current_key;
764 	    }
765 	    bool compressed = cur.read_tag(true);
766 	    out->add(key, cur.current_tag, compressed);
767 	}
768     }
769 }
770 
771 }
772 
773 using namespace GlassCompact;
774 
775 void
compact(Xapian::Compactor * compactor,const char * destdir,int fd,const vector<Xapian::Database::Internal * > & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,unsigned flags,Xapian::docid last_docid)776 GlassDatabase::compact(Xapian::Compactor * compactor,
777 		       const char * destdir,
778 		       int fd,
779 		       const vector<Xapian::Database::Internal*> & sources,
780 		       const vector<Xapian::docid> & offset,
781 		       size_t block_size,
782 		       Xapian::Compactor::compaction_level compaction,
783 		       unsigned flags,
784 		       Xapian::docid last_docid)
785 {
786     struct table_list {
787 	// The "base name" of the table.
788 	char name[9];
789 	// The type.
790 	Glass::table_type type;
791 	// Create tables after position lazily.
792 	bool lazy;
793     };
794 
795     static const table_list tables[] = {
796 	// name		type			lazy
797 	{ "postlist",	Glass::POSTLIST,	false },
798 	{ "docdata",	Glass::DOCDATA,		true },
799 	{ "termlist",	Glass::TERMLIST,	false },
800 	{ "position",	Glass::POSITION,	true },
801 	{ "spelling",	Glass::SPELLING,	true },
802 	{ "synonym",	Glass::SYNONYM,		true }
803     };
804     const table_list * tables_end = tables +
805 	(sizeof(tables) / sizeof(tables[0]));
806 
807     const int FLAGS = Xapian::DB_DANGEROUS;
808 
809     bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
810     bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
811     if (single_file) {
812 	// FIXME: Support this combination - we need to put temporary files
813 	// somewhere.
814 	multipass = false;
815     }
816 
817     for (size_t i = 0; i != sources.size(); ++i) {
818 	auto db = static_cast<const GlassDatabase*>(sources[i]);
819 	if (db->has_uncommitted_changes()) {
820 	    const char * m =
821 		"Can't compact from a WritableDatabase with uncommitted "
822 		"changes - either call commit() first, or create a new "
823 		"Database object from the filename on disk";
824 	    throw Xapian::InvalidOperationError(m);
825 	}
826     }
827 
828     if (block_size < 2048 || block_size > 65536 ||
829 	(block_size & (block_size - 1)) != 0) {
830 	block_size = GLASS_DEFAULT_BLOCKSIZE;
831     }
832 
833     FlintLock lock(destdir ? destdir : "");
834     if (!single_file) {
835 	string explanation;
836 	FlintLock::reason why = lock.lock(true, false, explanation);
837 	if (why != FlintLock::SUCCESS) {
838 	    lock.throw_databaselockerror(why, destdir, explanation);
839 	}
840     }
841 
842     AutoPtr<GlassVersion> version_file_out;
843     if (single_file) {
844 	if (destdir) {
845 	    fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
846 	    if (fd < 0) {
847 		throw Xapian::DatabaseCreateError("open() failed", errno);
848 	    }
849 	}
850 	version_file_out.reset(new GlassVersion(fd));
851     } else {
852 	fd = -1;
853 	version_file_out.reset(new GlassVersion(destdir));
854     }
855 
856     version_file_out->create(block_size);
857     for (size_t i = 0; i != sources.size(); ++i) {
858 	GlassDatabase * db = static_cast<GlassDatabase*>(sources[i]);
859 	version_file_out->merge_stats(db->version_file);
860     }
861 
862     string fl_serialised;
863     if (single_file) {
864 	GlassFreeList fl;
865 	fl.set_first_unused_block(1); // FIXME: Assumption?
866 	fl.pack(fl_serialised);
867     }
868 
869     vector<GlassTable *> tabs;
870     tabs.reserve(tables_end - tables);
871     off_t prev_size = block_size;
872     for (const table_list * t = tables; t < tables_end; ++t) {
873 	// The postlist table requires an N-way merge, adjusting the
874 	// headers of various blocks.  The spelling and synonym tables also
875 	// need special handling.  The other tables have keys sorted in
876 	// docid order, so we can merge them by simply copying all the keys
877 	// from each source table in turn.
878 	if (compactor)
879 	    compactor->set_status(t->name, string());
880 
881 	string dest;
882 	if (!single_file) {
883 	    dest = destdir;
884 	    dest += '/';
885 	    dest += t->name;
886 	    dest += '.';
887 	}
888 
889 	bool output_will_exist = !t->lazy;
890 
891 	// Sometimes stat can fail for benign reasons (e.g. >= 2GB file
892 	// on certain systems).
893 	bool bad_stat = false;
894 
895 	// We can't currently report input sizes if there's a single file DB
896 	// amongst the inputs.
897 	bool single_file_in = false;
898 
899 	off_t in_size = 0;
900 
901 	vector<GlassTable*> inputs;
902 	inputs.reserve(sources.size());
903 	size_t inputs_present = 0;
904 	for (auto src : sources) {
905 	    GlassDatabase * db = static_cast<GlassDatabase*>(src);
906 	    GlassTable * table;
907 	    switch (t->type) {
908 		case Glass::POSTLIST:
909 		    table = &(db->postlist_table);
910 		    break;
911 		case Glass::DOCDATA:
912 		    table = &(db->docdata_table);
913 		    break;
914 		case Glass::TERMLIST:
915 		    table = &(db->termlist_table);
916 		    break;
917 		case Glass::POSITION:
918 		    table = &(db->position_table);
919 		    break;
920 		case Glass::SPELLING:
921 		    table = &(db->spelling_table);
922 		    break;
923 		case Glass::SYNONYM:
924 		    table = &(db->synonym_table);
925 		    break;
926 		default:
927 		    Assert(false);
928 		    return;
929 	    }
930 
931 	    if (db->single_file()) {
932 		if (t->lazy && table->empty()) {
933 		    // Essentially doesn't exist.
934 		} else {
935 		    // FIXME: Find actual size somehow?
936 		    // in_size += table->size() / 1024;
937 		    single_file_in = true;
938 		    output_will_exist = true;
939 		    ++inputs_present;
940 		}
941 	    } else {
942 		off_t db_size = file_size(table->get_path());
943 		if (errno == 0) {
944 		    in_size += db_size / 1024;
945 		    output_will_exist = true;
946 		    ++inputs_present;
947 		} else if (errno != ENOENT) {
948 		    // We get ENOENT for an optional table.
949 		    bad_stat = true;
950 		    output_will_exist = true;
951 		    ++inputs_present;
952 		}
953 	    }
954 	    inputs.push_back(table);
955 	}
956 
957 	// If any inputs lack a termlist table, suppress it in the output.
958 	if (t->type == Glass::TERMLIST && inputs_present != sources.size()) {
959 	    if (inputs_present != 0) {
960 		if (compactor) {
961 		    string m = str(inputs_present);
962 		    m += " of ";
963 		    m += str(sources.size());
964 		    m += " inputs present, so suppressing output";
965 		    compactor->set_status(t->name, m);
966 		}
967 		continue;
968 	    }
969 	    output_will_exist = false;
970 	}
971 
972 	if (!output_will_exist) {
973 	    if (compactor)
974 		compactor->set_status(t->name, "doesn't exist");
975 	    continue;
976 	}
977 
978 	GlassTable * out;
979 	if (single_file) {
980 	    out = new GlassTable(t->name, fd, version_file_out->get_offset(),
981 				 false, false);
982 	} else {
983 	    out = new GlassTable(t->name, dest, false, t->lazy);
984 	}
985 	tabs.push_back(out);
986 	RootInfo * root_info = version_file_out->root_to_set(t->type);
987 	if (single_file) {
988 	    root_info->set_free_list(fl_serialised);
989 	    out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
990 	} else {
991 	    out->create_and_open(FLAGS, *root_info);
992 	}
993 
994 	out->set_full_compaction(compaction != compactor->STANDARD);
995 	if (compaction == compactor->FULLER) out->set_max_item_size(1);
996 
997 	switch (t->type) {
998 	    case Glass::POSTLIST: {
999 		if (multipass && inputs.size() > 3) {
1000 		    multimerge_postlists(compactor, out, destdir,
1001 					 inputs, offset);
1002 		} else {
1003 		    merge_postlists(compactor, out, offset.begin(),
1004 				    inputs.begin(), inputs.end());
1005 		}
1006 		break;
1007 	    }
1008 	    case Glass::SPELLING:
1009 		merge_spellings(out, inputs.begin(), inputs.end());
1010 		break;
1011 	    case Glass::SYNONYM:
1012 		merge_synonyms(out, inputs.begin(), inputs.end());
1013 		break;
1014 	    case Glass::POSITION:
1015 		merge_positions(out, inputs, offset);
1016 		break;
1017 	    default:
1018 		// DocData, Termlist
1019 		merge_docid_keyed(out, inputs, offset);
1020 		break;
1021 	}
1022 
1023 	// Commit as revision 1.
1024 	out->flush_db();
1025 	out->commit(1, root_info);
1026 	out->sync();
1027 	if (single_file) fl_serialised = root_info->get_free_list();
1028 
1029 	off_t out_size = 0;
1030 	if (!bad_stat && !single_file_in) {
1031 	    off_t db_size;
1032 	    if (single_file) {
1033 		db_size = file_size(fd);
1034 	    } else {
1035 		db_size = file_size(dest + GLASS_TABLE_EXTENSION);
1036 	    }
1037 	    if (errno == 0) {
1038 		if (single_file) {
1039 		    off_t old_prev_size = max(prev_size, off_t(block_size));
1040 		    prev_size = db_size;
1041 		    db_size -= old_prev_size;
1042 		}
1043 		out_size = db_size / 1024;
1044 	    } else {
1045 		bad_stat = (errno != ENOENT);
1046 	    }
1047 	}
1048 	if (bad_stat) {
1049 	    if (compactor)
1050 		compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1051 	} else if (single_file_in) {
1052 	    if (compactor)
1053 		compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1054 	} else {
1055 	    string status;
1056 	    if (out_size == in_size) {
1057 		status = "Size unchanged (";
1058 	    } else {
1059 		off_t delta;
1060 		if (out_size < in_size) {
1061 		    delta = in_size - out_size;
1062 		    status = "Reduced by ";
1063 		} else {
1064 		    delta = out_size - in_size;
1065 		    status = "INCREASED by ";
1066 		}
1067 		if (in_size) {
1068 		    status += str(100 * delta / in_size);
1069 		    status += "% ";
1070 		}
1071 		status += str(delta);
1072 		status += "K (";
1073 		status += str(in_size);
1074 		status += "K -> ";
1075 	    }
1076 	    status += str(out_size);
1077 	    status += "K)";
1078 	    if (compactor)
1079 		compactor->set_status(t->name, status);
1080 	}
1081     }
1082 
1083     // If compacting to a single file output and all the tables are empty, pad
1084     // the output so that it isn't mistaken for a stub database when we try to
1085     // open it.  For this it needs to be a multiple of 2KB in size.
1086     if (single_file && prev_size < off_t(block_size)) {
1087 #ifdef HAVE_FTRUNCATE
1088 	if (ftruncate(fd, block_size) < 0) {
1089 	    throw Xapian::DatabaseError("Failed to set size of output database", errno);
1090 	}
1091 #else
1092 	const off_t off = block_size - 1;
1093 	if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
1094 	    throw Xapian::DatabaseError("Failed to set size of output database", errno);
1095 	}
1096 #endif
1097     }
1098 
1099     if (single_file) {
1100 	if (lseek(fd, version_file_out->get_offset(), SEEK_SET) < 0) {
1101 	    throw Xapian::DatabaseError("lseek() failed", errno);
1102 	}
1103     }
1104     version_file_out->set_last_docid(last_docid);
1105     string tmpfile = version_file_out->write(1, FLAGS);
1106     for (unsigned j = 0; j != tabs.size(); ++j) {
1107 	tabs[j]->sync();
1108     }
1109     // Commit with revision 1.
1110     version_file_out->sync(tmpfile, 1, FLAGS);
1111     for (unsigned j = 0; j != tabs.size(); ++j) {
1112 	delete tabs[j];
1113     }
1114 
1115     if (!single_file) lock.release();
1116 }
1117