1 /** @file flint_compact.cc
2  * @brief Compact a flint database, or merge and compact several.
3  */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2013,2015 Olly Betts
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License as
8  * published by the Free Software Foundation; either version 2 of the
9  * License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
19  * USA
20  */
21 
22 #include <config.h>
23 
24 #include <xapian/compactor.h>
25 
26 #include <algorithm>
27 #include <queue>
28 
29 #include <cstdio>
30 
31 #include "safeerrno.h"
32 #include <sys/types.h>
33 #include "safesysstat.h"
34 
35 #include "flint_table.h"
36 #include "flint_compact.h"
37 #include "flint_cursor.h"
38 #include "flint_utils.h"
39 #include "flint_database.h"
40 #include "internaltypes.h"
41 #include "noreturn.h"
42 #include "utils.h"
43 
44 #include "../byte_length_strings.h"
45 #include "../prefix_compressed_strings.h"
46 #include <xapian.h>
47 
48 using namespace std;
49 
50 XAPIAN_NORETURN(
51 static void failed_to_open_at_rev(string, flint_revision_number_t));
52 static void
failed_to_open_at_rev(string m,flint_revision_number_t rev)53 failed_to_open_at_rev(string m, flint_revision_number_t rev)
54 {
55     m += ": Couldn't open at revision ";
56     m += str(rev);
57     throw Xapian::DatabaseError(m);
58 }
59 
60 // Put all the helpers in a namespace to avoid symbols colliding with those of
61 // the same name in chert_compact.cc.
62 namespace FlintCompact {
63 
64 static inline bool
is_metainfo_key(const string & key)65 is_metainfo_key(const string & key)
66 {
67     return key.size() == 1 && key[0] == '\0';
68 }
69 
70 static inline bool
is_user_metadata_key(const string & key)71 is_user_metadata_key(const string & key)
72 {
73     return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
74 }
75 
76 class PostlistCursor : private FlintCursor {
77     Xapian::docid offset;
78 
79   public:
80     string key, tag;
81     Xapian::docid firstdid;
82     Xapian::termcount tf, cf;
83 
PostlistCursor(FlintTable * in,Xapian::docid offset_)84     PostlistCursor(FlintTable *in, Xapian::docid offset_)
85 	: FlintCursor(in), offset(offset_), firstdid(0)
86     {
87 	find_entry(string());
88 	next();
89     }
90 
~PostlistCursor()91     ~PostlistCursor()
92     {
93 	delete FlintCursor::get_table();
94     }
95 
next()96     bool next() {
97 	if (!FlintCursor::next()) return false;
98 	// We put all chunks into the non-initial chunk form here, then fix up
99 	// the first chunk for each term in the merged database as we merge.
100 	read_tag();
101 	key = current_key;
102 	tag = current_tag;
103 	tf = cf = 0;
104 	if (is_metainfo_key(key)) return true;
105 	if (is_user_metadata_key(key)) return true;
106 	// Adjust key if this is *NOT* an initial chunk.
107 	// key is: F_pack_string_preserving_sort(tname)
108 	// plus optionally: F_pack_uint_preserving_sort(did)
109 	const char * d = key.data();
110 	const char * e = d + key.size();
111 	string tname;
112 	if (!F_unpack_string_preserving_sort(&d, e, tname))
113 	    throw Xapian::DatabaseCorruptError("Bad postlist key");
114 	if (d == e) {
115 	    // This is an initial chunk for a term, so adjust tag header.
116 	    d = tag.data();
117 	    e = d + tag.size();
118 	    if (!F_unpack_uint(&d, e, &tf) ||
119 		!F_unpack_uint(&d, e, &cf) ||
120 		!F_unpack_uint(&d, e, &firstdid)) {
121 		throw Xapian::DatabaseCorruptError("Bad postlist tag");
122 	    }
123 	    ++firstdid;
124 	    tag.erase(0, d - tag.data());
125 	} else {
126 	    // Not an initial chunk, so adjust key.
127 	    size_t tmp = d - key.data();
128 	    if (!F_unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
129 		throw Xapian::DatabaseCorruptError("Bad postlist key");
130 	    key.erase(tmp);
131 	}
132 	firstdid += offset;
133 	return true;
134     }
135 };
136 
137 class PostlistCursorGt {
138   public:
139     /** Return true if and only if a's key is strictly greater than b's key.
140      */
operator ()(const PostlistCursor * a,const PostlistCursor * b)141     bool operator()(const PostlistCursor *a, const PostlistCursor *b) {
142 	if (a->key > b->key) return true;
143 	if (a->key != b->key) return false;
144 	return (a->firstdid > b->firstdid);
145     }
146 };
147 
148 static void
merge_postlists(Xapian::Compactor & compactor,FlintTable * out,vector<Xapian::docid>::const_iterator offset,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev,Xapian::docid last_docid)149 merge_postlists(Xapian::Compactor & compactor,
150 		FlintTable * out, vector<Xapian::docid>::const_iterator offset,
151 		vector<string>::const_iterator b,
152 		vector<string>::const_iterator e,
153 		vector<flint_revision_number_t>::const_iterator rev,
154 		Xapian::docid last_docid)
155 {
156     totlen_t tot_totlen = 0;
157     priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
158     for ( ; b != e; ++b, ++offset, ++rev) {
159 	FlintTable *in = new FlintTable("postlist", *b, true);
160 	if (!in->open(*rev)) {
161 	    failed_to_open_at_rev(*b, *rev);
162 	}
163 	if (in->empty()) {
164 	    // Skip empty tables.
165 	    delete in;
166 	    continue;
167 	}
168 
169 	// PostlistCursor takes ownership of FlintTable in and is
170 	// responsible for deleting it.
171 	PostlistCursor * cur = new PostlistCursor(in, *offset);
172 	// Merge the METAINFO tags from each database into one.
173 	// They have a key consisting of a single zero byte.
174 	// They may be absent, if the database contains no documents.  If it
175 	// has user metadata we'll still get here.
176 	if (is_metainfo_key(cur->key)) {
177 	    const char * data = cur->tag.data();
178 	    const char * end = data + cur->tag.size();
179 	    Xapian::docid dummy_did = 0;
180 	    if (!F_unpack_uint(&data, end, &dummy_did)) {
181 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
182 	    }
183 	    totlen_t totlen = 0;
184 	    if (!F_unpack_uint_last(&data, end, &totlen)) {
185 		throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
186 	    }
187 	    tot_totlen += totlen;
188 	    if (tot_totlen < totlen) {
189 		throw "totlen wrapped!";
190 	    }
191 	    if (cur->next()) {
192 		pq.push(cur);
193 	    } else {
194 		delete cur;
195 	    }
196 	} else {
197 	    pq.push(cur);
198 	}
199     }
200 
201     {
202 	string tag = F_pack_uint(last_docid);
203 	tag += F_pack_uint_last(tot_totlen);
204 	out->add(string(1, '\0'), tag);
205     }
206 
207     string last_key;
208     {
209 	// Merge user metadata.
210 	vector<string> tags;
211 	while (!pq.empty()) {
212 	    PostlistCursor * cur = pq.top();
213 	    const string& key = cur->key;
214 	    if (!is_user_metadata_key(key)) break;
215 
216 	    if (key != last_key) {
217 		if (tags.size() > 1) {
218 		    Assert(!last_key.empty());
219 		    out->add(last_key,
220 			     compactor.resolve_duplicate_metadata(last_key,
221 								  tags.size(),
222 								  &tags[0]));
223 		} else if (tags.size() == 1) {
224 		    Assert(!last_key.empty());
225 		    out->add(last_key, tags[0]);
226 		}
227 		tags.resize(0);
228 		last_key = key;
229 	    }
230 	    tags.push_back(cur->tag);
231 
232 	    pq.pop();
233 	    if (cur->next()) {
234 		pq.push(cur);
235 	    } else {
236 		delete cur;
237 	    }
238 	}
239 	if (tags.size() > 1) {
240 	    Assert(!last_key.empty());
241 	    out->add(last_key,
242 		     compactor.resolve_duplicate_metadata(last_key,
243 							  tags.size(),
244 							  &tags[0]));
245 	} else if (tags.size() == 1) {
246 	    Assert(!last_key.empty());
247 	    out->add(last_key, tags[0]);
248 	}
249     }
250 
251     Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
252     vector<pair<Xapian::docid, string> > tags;
253     while (true) {
254 	PostlistCursor * cur = NULL;
255 	if (!pq.empty()) {
256 	    cur = pq.top();
257 	    pq.pop();
258 	}
259 	Assert(cur == NULL || !is_user_metadata_key(cur->key));
260 	if (cur == NULL || cur->key != last_key) {
261 	    if (!tags.empty()) {
262 		string first_tag = F_pack_uint(tf);
263 		first_tag += F_pack_uint(cf);
264 		first_tag += F_pack_uint(tags[0].first - 1);
265 		string tag = tags[0].second;
266 		tag[0] = (tags.size() == 1) ? '1' : '0';
267 		first_tag += tag;
268 		out->add(last_key, first_tag);
269 		vector<pair<Xapian::docid, string> >::const_iterator i;
270 		i = tags.begin();
271 		while (++i != tags.end()) {
272 		    string new_key = last_key;
273 		    new_key += F_pack_uint_preserving_sort(i->first);
274 		    tag = i->second;
275 		    tag[0] = (i + 1 == tags.end()) ? '1' : '0';
276 		    out->add(new_key, tag);
277 		}
278 	    }
279 	    tags.clear();
280 	    if (cur == NULL) break;
281 	    tf = cf = 0;
282 	    last_key = cur->key;
283 	}
284 	tf += cur->tf;
285 	cf += cur->cf;
286 	tags.push_back(make_pair(cur->firstdid, cur->tag));
287 	if (cur->next()) {
288 	    pq.push(cur);
289 	} else {
290 	    delete cur;
291 	}
292     }
293 }
294 
295 struct MergeCursor : public FlintCursor {
MergeCursorFlintCompact::MergeCursor296     MergeCursor(FlintTable *in) : FlintCursor(in) {
297 	find_entry(string());
298 	next();
299     }
300 
~MergeCursorFlintCompact::MergeCursor301     ~MergeCursor() {
302 	delete FlintCursor::get_table();
303     }
304 };
305 
306 struct CursorGt {
307     /// Return true if and only if a's key is strictly greater than b's key.
operator ()FlintCompact::CursorGt308     bool operator()(const FlintCursor *a, const FlintCursor *b) {
309 	if (b->after_end()) return false;
310 	if (a->after_end()) return true;
311 	return (a->current_key > b->current_key);
312     }
313 };
314 
315 static void
merge_spellings(FlintTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev)316 merge_spellings(FlintTable * out,
317 		vector<string>::const_iterator b,
318 		vector<string>::const_iterator e,
319 		vector<flint_revision_number_t>::const_iterator rev)
320 {
321     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
322     for ( ; b != e; ++b, ++rev) {
323 	FlintTable *in = new FlintTable("spelling", *b, true, DONT_COMPRESS, true);
324 	if (!in->open(*rev)) {
325 	    failed_to_open_at_rev(*b, *rev);
326 	}
327 	if (!in->empty()) {
328 	    // The MergeCursor takes ownership of FlintTable in and is
329 	    // responsible for deleting it.
330 	    pq.push(new MergeCursor(in));
331 	} else {
332 	    delete in;
333 	}
334     }
335 
336     while (!pq.empty()) {
337 	MergeCursor * cur = pq.top();
338 	pq.pop();
339 
340 	string key = cur->current_key;
341 	if (pq.empty() || pq.top()->current_key > key) {
342 	    // No need to merge the tags, just copy the (possibly compressed)
343 	    // tag value.
344 	    bool compressed = cur->read_tag(true);
345 	    out->add(key, cur->current_tag, compressed);
346 	    if (cur->next()) {
347 		pq.push(cur);
348 	    } else {
349 		delete cur;
350 	    }
351 	    continue;
352 	}
353 
354 	// Merge tag values with the same key:
355 	string tag;
356 	if (key[0] != 'W') {
357 	    // We just want the union of words, so copy over the first instance
358 	    // and skip any identical ones.
359 	    priority_queue<PrefixCompressedStringItor *,
360 			   vector<PrefixCompressedStringItor *>,
361 			   PrefixCompressedStringItorGt> pqtag;
362 	    // Stick all the MergeCursor pointers in a vector because their
363 	    // current_tag members must remain valid while we're merging their
364 	    // tags, but we need to call next() on them all afterwards.
365 	    vector<MergeCursor *> vec;
366 	    vec.reserve(pq.size());
367 
368 	    while (true) {
369 		cur->read_tag();
370 		pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
371 		vec.push_back(cur);
372 		if (pq.empty() || pq.top()->current_key != key) break;
373 		cur = pq.top();
374 		pq.pop();
375 	    }
376 
377 	    PrefixCompressedStringWriter wr(tag);
378 	    string lastword;
379 	    while (!pqtag.empty()) {
380 		PrefixCompressedStringItor * it = pqtag.top();
381 		string word = **it;
382 		if (word != lastword) {
383 		    lastword = word;
384 		    wr.append(lastword);
385 		}
386 		++*it;
387 		pqtag.pop();
388 		if (!it->at_end()) {
389 		    pqtag.push(it);
390 		} else {
391 		    delete it;
392 		}
393 	    }
394 
395 	    vector<MergeCursor *>::const_iterator i;
396 	    for (i = vec.begin(); i != vec.end(); ++i) {
397 		cur = *i;
398 		if (cur->next()) {
399 		    pq.push(cur);
400 		} else {
401 		    delete cur;
402 		}
403 	    }
404 	} else {
405 	    // We want to sum the frequencies from tags for the same key.
406 	    Xapian::termcount tot_freq = 0;
407 	    while (true) {
408 		cur->read_tag();
409 		Xapian::termcount freq;
410 		const char * p = cur->current_tag.data();
411 		const char * end = p + cur->current_tag.size();
412 		if (!F_unpack_uint_last(&p, end, &freq) || freq == 0) {
413 		    throw Xapian::DatabaseCorruptError("Bad spelling word freq");
414 		}
415 		tot_freq += freq;
416 		if (cur->next()) {
417 		    pq.push(cur);
418 		} else {
419 		    delete cur;
420 		}
421 		if (pq.empty() || pq.top()->current_key != key) break;
422 		cur = pq.top();
423 		pq.pop();
424 	    }
425 	    tag = F_pack_uint_last(tot_freq);
426 	}
427 	out->add(key, tag);
428     }
429 }
430 
431 static void
merge_synonyms(FlintTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev)432 merge_synonyms(FlintTable * out,
433 	       vector<string>::const_iterator b,
434 	       vector<string>::const_iterator e,
435 	       vector<flint_revision_number_t>::const_iterator rev)
436 {
437     priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
438     for ( ; b != e; ++b, ++rev) {
439 	FlintTable *in = new FlintTable("synonym", *b, true, DONT_COMPRESS, true);
440 	if (!in->open(*rev)) {
441 	    failed_to_open_at_rev(*b, *rev);
442 	}
443 	if (!in->empty()) {
444 	    // The MergeCursor takes ownership of FlintTable in and is
445 	    // responsible for deleting it.
446 	    pq.push(new MergeCursor(in));
447 	} else {
448 	    delete in;
449 	}
450     }
451 
452     while (!pq.empty()) {
453 	MergeCursor * cur = pq.top();
454 	pq.pop();
455 
456 	string key = cur->current_key;
457 	if (pq.empty() || pq.top()->current_key > key) {
458 	    // No need to merge the tags, just copy the (possibly compressed)
459 	    // tag value.
460 	    bool compressed = cur->read_tag(true);
461 	    out->add(key, cur->current_tag, compressed);
462 	    if (cur->next()) {
463 		pq.push(cur);
464 	    } else {
465 		delete cur;
466 	    }
467 	    continue;
468 	}
469 
470 	// Merge tag values with the same key:
471 	string tag;
472 
473 	// We just want the union of words, so copy over the first instance
474 	// and skip any identical ones.
475 	priority_queue<ByteLengthPrefixedStringItor *,
476 		       vector<ByteLengthPrefixedStringItor *>,
477 		       ByteLengthPrefixedStringItorGt> pqtag;
478 	vector<MergeCursor *> vec;
479 
480 	while (true) {
481 	    cur->read_tag();
482 	    pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
483 	    vec.push_back(cur);
484 	    if (pq.empty() || pq.top()->current_key != key) break;
485 	    cur = pq.top();
486 	    pq.pop();
487 	}
488 
489 	string lastword;
490 	while (!pqtag.empty()) {
491 	    ByteLengthPrefixedStringItor * it = pqtag.top();
492 	    if (**it != lastword) {
493 		lastword = **it;
494 		tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
495 		tag += lastword;
496 	    }
497 	    ++*it;
498 	    pqtag.pop();
499 	    if (!it->at_end()) {
500 		pqtag.push(it);
501 	    } else {
502 		delete it;
503 	    }
504 	}
505 
506 	vector<MergeCursor *>::const_iterator i;
507 	for (i = vec.begin(); i != vec.end(); ++i) {
508 	    cur = *i;
509 	    if (cur->next()) {
510 		pq.push(cur);
511 	    } else {
512 		delete cur;
513 	    }
514 	}
515 
516 	out->add(key, tag);
517     }
518 }
519 
520 static void
multimerge_postlists(Xapian::Compactor & compactor,FlintTable * out,const char * tmpdir,Xapian::docid last_docid,vector<string> tmp,vector<flint_revision_number_t> revs,vector<Xapian::docid> off)521 multimerge_postlists(Xapian::Compactor & compactor,
522 		     FlintTable * out, const char * tmpdir,
523 		     Xapian::docid last_docid,
524 		     vector<string> tmp, vector<flint_revision_number_t> revs,
525 		     vector<Xapian::docid> off)
526 {
527     unsigned int c = 0;
528     while (tmp.size() > 3) {
529 	vector<string> tmpout;
530 	tmpout.reserve(tmp.size() / 2);
531 	vector<Xapian::docid> newoff;
532 	newoff.resize(tmp.size() / 2);
533 	vector<flint_revision_number_t> newrevs;
534 	newrevs.reserve(tmp.size() / 2);
535 	for (unsigned int i = 0, j; i < tmp.size(); i = j) {
536 	    j = i + 2;
537 	    if (j == tmp.size() - 1) ++j;
538 
539 	    string dest = tmpdir;
540 	    char buf[64];
541 	    sprintf(buf, "/tmp%u_%u.", c, i / 2);
542 	    dest += buf;
543 
544 	    // Don't compress temporary tables, even if the final table would
545 	    // be.
546 	    FlintTable tmptab("postlist", dest, false);
547 	    // Use maximum blocksize for temporary tables.
548 	    tmptab.create_and_open(65536);
549 
550 	    merge_postlists(compactor, &tmptab, off.begin() + i,
551 			    tmp.begin() + i, tmp.begin() + j,
552 			    revs.begin() + i, last_docid);
553 	    if (c > 0) {
554 		for (unsigned int k = i; k < j; ++k) {
555 		    unlink((tmp[k] + "DB").c_str());
556 		    unlink((tmp[k] + "baseA").c_str());
557 		    unlink((tmp[k] + "baseB").c_str());
558 		}
559 	    }
560 	    tmpout.push_back(dest);
561 	    tmptab.flush_db();
562 	    tmptab.commit(1);
563 	    newrevs.push_back(1);
564 	}
565 	swap(tmp, tmpout);
566 	swap(off, newoff);
567 	swap(revs, newrevs);
568 	++c;
569     }
570     merge_postlists(compactor,
571 		    out, off.begin(), tmp.begin(), tmp.end(), revs.begin(),
572 		    last_docid);
573     if (c > 0) {
574 	for (size_t k = 0; k < tmp.size(); ++k) {
575 	    unlink((tmp[k] + "DB").c_str());
576 	    unlink((tmp[k] + "baseA").c_str());
577 	    unlink((tmp[k] + "baseB").c_str());
578 	}
579     }
580 }
581 
582 static void
merge_docid_keyed(const char * tablename,FlintTable * out,const vector<string> & inputs,const vector<flint_revision_number_t> & revs,const vector<Xapian::docid> & offset,bool lazy)583 merge_docid_keyed(const char * tablename,
584 		  FlintTable *out, const vector<string> & inputs,
585 		  const vector<flint_revision_number_t> & revs,
586 		  const vector<Xapian::docid> & offset, bool lazy)
587 {
588     for (size_t i = 0; i < inputs.size(); ++i) {
589 	Xapian::docid off = offset[i];
590 
591 	FlintTable in(tablename, inputs[i], true, DONT_COMPRESS, lazy);
592 	if (!in.open(revs[i])) {
593 	    failed_to_open_at_rev(inputs[i], revs[i]);
594 	}
595 	if (in.empty()) continue;
596 
597 	FlintCursor cur(&in);
598 	cur.find_entry(string());
599 
600 	string key;
601 	while (cur.next()) {
602 	    // Adjust the key if this isn't the first database.
603 	    if (off) {
604 		Xapian::docid did;
605 		const char * d = cur.current_key.data();
606 		const char * e = d + cur.current_key.size();
607 		if (!F_unpack_uint_preserving_sort(&d, e, &did)) {
608 		    string msg = "Bad key in ";
609 		    msg += inputs[i];
610 		    throw Xapian::DatabaseCorruptError(msg);
611 		}
612 		did += off;
613 		key = F_pack_uint_preserving_sort(did);
614 		if (d != e) {
615 		    // Copy over the termname for the position table.
616 		    key.append(d, e - d);
617 		}
618 	    } else {
619 		key = cur.current_key;
620 	    }
621 	    bool compressed = cur.read_tag(true);
622 	    out->add(key, cur.current_tag, compressed);
623 	}
624     }
625 }
626 
627 }
628 
629 using namespace FlintCompact;
630 
631 void
compact_flint(Xapian::Compactor & compactor,const char * destdir,const vector<string> & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,bool multipass,Xapian::docid last_docid)632 compact_flint(Xapian::Compactor & compactor,
633 	      const char * destdir, const vector<string> & sources,
634 	      const vector<Xapian::docid> & offset, size_t block_size,
635 	      Xapian::Compactor::compaction_level compaction, bool multipass,
636 	      Xapian::docid last_docid) {
637     // Get the revisions of each database to use to ensure we don't read tables
638     // at different revisions from any of them.
639     vector<flint_revision_number_t> revs;
640     revs.reserve(sources.size());
641     for (vector<string>::const_iterator i = sources.begin();
642 	 i != sources.end(); ++i) {
643 	FlintDatabase db(*i);
644 	revs.push_back(db.get_revision_number());
645     }
646 
647     enum table_type {
648 	POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
649     };
650     struct table_list {
651 	// The "base name" of the table.
652 	const char * name;
653 	// The type.
654 	table_type type;
655 	// zlib compression strategy to use on tags.
656 	int compress_strategy;
657 	// Create tables after position lazily.
658 	bool lazy;
659     };
660 
661     static const table_list tables[] = {
662 	// name		type		compress_strategy	lazy
663 	{ "postlist",	POSTLIST,	DONT_COMPRESS,		false },
664 	{ "record",	RECORD,		Z_DEFAULT_STRATEGY,	false },
665 	{ "termlist",	TERMLIST,	Z_DEFAULT_STRATEGY,	false },
666 	{ "position",	POSITION,	DONT_COMPRESS,		true },
667 	{ "value",	VALUE,		DONT_COMPRESS,		true },
668 	{ "spelling",	SPELLING,	Z_DEFAULT_STRATEGY,	true },
669 	{ "synonym",	SYNONYM,	Z_DEFAULT_STRATEGY,	true }
670     };
671     const table_list * tables_end = tables +
672 	(sizeof(tables) / sizeof(tables[0]));
673 
674     for (const table_list * t = tables; t < tables_end; ++t) {
675 	// The postlist table requires an N-way merge, adjusting the
676 	// headers of various blocks.  The spelling and synonym tables also
677 	// need special handling.  The other tables have keys sorted in
678 	// docid order, so we can merge them by simply copying all the keys
679 	// from each source table in turn.
680 	compactor.set_status(t->name, string());
681 
682 	string dest = destdir;
683 	dest += '/';
684 	dest += t->name;
685 	dest += '.';
686 
687 	bool output_will_exist = !t->lazy;
688 
689 	// Sometimes stat can fail for benign reasons (e.g. >= 2GB file
690 	// on certain systems).
691 	bool bad_stat = false;
692 
693 	off_t in_size = 0;
694 
695 	vector<string> inputs;
696 	inputs.reserve(sources.size());
697 	size_t inputs_present = 0;
698 	for (vector<string>::const_iterator src = sources.begin();
699 	     src != sources.end(); ++src) {
700 	    string s(*src);
701 	    s += t->name;
702 	    s += '.';
703 
704 	    struct stat sb;
705 	    if (stat(s + "DB", &sb) == 0) {
706 		in_size += sb.st_size / 1024;
707 		output_will_exist = true;
708 		++inputs_present;
709 	    } else if (errno != ENOENT) {
710 		// We get ENOENT for an optional table.
711 		bad_stat = true;
712 		output_will_exist = true;
713 		++inputs_present;
714 	    }
715 	    inputs.push_back(s);
716 	}
717 
718 	if (!output_will_exist) {
719 	    compactor.set_status(t->name, "doesn't exist");
720 	    continue;
721 	}
722 
723 	FlintTable out(t->name, dest, false, t->compress_strategy, t->lazy);
724 	if (!t->lazy) {
725 	    out.create_and_open(block_size);
726 	} else {
727 	    out.erase();
728 	    out.set_block_size(block_size);
729 	}
730 
731 	out.set_full_compaction(compaction != compactor.STANDARD);
732 	if (compaction == compactor.FULLER) out.set_max_item_size(1);
733 
734 	switch (t->type) {
735 	    case POSTLIST:
736 		if (multipass && inputs.size() > 3) {
737 		    multimerge_postlists(compactor, &out, destdir, last_docid,
738 					 inputs, revs, offset);
739 		} else {
740 		    merge_postlists(compactor, &out, offset.begin(),
741 				    inputs.begin(), inputs.end(),
742 				    revs.begin(), last_docid);
743 		}
744 		break;
745 	    case SPELLING:
746 		merge_spellings(&out, inputs.begin(), inputs.end(),
747 				revs.begin());
748 		break;
749 	    case SYNONYM:
750 		merge_synonyms(&out, inputs.begin(), inputs.end(),
751 			       revs.begin());
752 		break;
753 	    default:
754 		// Position, Record, Termlist, Value.
755 		merge_docid_keyed(t->name, &out, inputs, revs, offset, t->lazy);
756 		break;
757 	}
758 
759 	// Commit as revision 1.
760 	out.flush_db();
761 	out.commit(1);
762 
763 	off_t out_size = 0;
764 	if (!bad_stat) {
765 	    struct stat sb;
766 	    if (stat(dest + "DB", &sb) == 0) {
767 		out_size = sb.st_size / 1024;
768 	    } else {
769 		bad_stat = (errno != ENOENT);
770 	    }
771 	}
772 	if (bad_stat) {
773 	    compactor.set_status(t->name, "Done (couldn't stat all the DB files)");
774 	} else {
775 	    string status;
776 	    if (out_size == in_size) {
777 		status = "Size unchanged (";
778 	    } else {
779 		off_t delta;
780 		if (out_size < in_size) {
781 		    delta = in_size - out_size;
782 		    status = "Reduced by ";
783 		} else {
784 		    delta = out_size - in_size;
785 		    status = "INCREASED by ";
786 		}
787 		status += str(100 * delta / in_size);
788 		status += "% ";
789 		status += str(delta);
790 		status += "K (";
791 		status += str(in_size);
792 		status += "K -> ";
793 	    }
794 	    status += str(out_size);
795 	    status += "K)";
796 	    compactor.set_status(t->name, status);
797 	}
798     }
799 }
800