1 /** @file flint_compact.cc
2 * @brief Compact a flint database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2013,2015 Olly Betts
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
20 */
21
22 #include <config.h>
23
24 #include <xapian/compactor.h>
25
26 #include <algorithm>
27 #include <queue>
28
29 #include <cstdio>
30
31 #include "safeerrno.h"
32 #include <sys/types.h>
33 #include "safesysstat.h"
34
35 #include "flint_table.h"
36 #include "flint_compact.h"
37 #include "flint_cursor.h"
38 #include "flint_utils.h"
39 #include "flint_database.h"
40 #include "internaltypes.h"
41 #include "noreturn.h"
42 #include "utils.h"
43
44 #include "../byte_length_strings.h"
45 #include "../prefix_compressed_strings.h"
46 #include <xapian.h>
47
48 using namespace std;
49
50 XAPIAN_NORETURN(
51 static void failed_to_open_at_rev(string, flint_revision_number_t));
52 static void
failed_to_open_at_rev(string m,flint_revision_number_t rev)53 failed_to_open_at_rev(string m, flint_revision_number_t rev)
54 {
55 m += ": Couldn't open at revision ";
56 m += str(rev);
57 throw Xapian::DatabaseError(m);
58 }
59
60 // Put all the helpers in a namespace to avoid symbols colliding with those of
61 // the same name in chert_compact.cc.
62 namespace FlintCompact {
63
64 static inline bool
is_metainfo_key(const string & key)65 is_metainfo_key(const string & key)
66 {
67 return key.size() == 1 && key[0] == '\0';
68 }
69
70 static inline bool
is_user_metadata_key(const string & key)71 is_user_metadata_key(const string & key)
72 {
73 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
74 }
75
76 class PostlistCursor : private FlintCursor {
77 Xapian::docid offset;
78
79 public:
80 string key, tag;
81 Xapian::docid firstdid;
82 Xapian::termcount tf, cf;
83
PostlistCursor(FlintTable * in,Xapian::docid offset_)84 PostlistCursor(FlintTable *in, Xapian::docid offset_)
85 : FlintCursor(in), offset(offset_), firstdid(0)
86 {
87 find_entry(string());
88 next();
89 }
90
~PostlistCursor()91 ~PostlistCursor()
92 {
93 delete FlintCursor::get_table();
94 }
95
next()96 bool next() {
97 if (!FlintCursor::next()) return false;
98 // We put all chunks into the non-initial chunk form here, then fix up
99 // the first chunk for each term in the merged database as we merge.
100 read_tag();
101 key = current_key;
102 tag = current_tag;
103 tf = cf = 0;
104 if (is_metainfo_key(key)) return true;
105 if (is_user_metadata_key(key)) return true;
106 // Adjust key if this is *NOT* an initial chunk.
107 // key is: F_pack_string_preserving_sort(tname)
108 // plus optionally: F_pack_uint_preserving_sort(did)
109 const char * d = key.data();
110 const char * e = d + key.size();
111 string tname;
112 if (!F_unpack_string_preserving_sort(&d, e, tname))
113 throw Xapian::DatabaseCorruptError("Bad postlist key");
114 if (d == e) {
115 // This is an initial chunk for a term, so adjust tag header.
116 d = tag.data();
117 e = d + tag.size();
118 if (!F_unpack_uint(&d, e, &tf) ||
119 !F_unpack_uint(&d, e, &cf) ||
120 !F_unpack_uint(&d, e, &firstdid)) {
121 throw Xapian::DatabaseCorruptError("Bad postlist tag");
122 }
123 ++firstdid;
124 tag.erase(0, d - tag.data());
125 } else {
126 // Not an initial chunk, so adjust key.
127 size_t tmp = d - key.data();
128 if (!F_unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
129 throw Xapian::DatabaseCorruptError("Bad postlist key");
130 key.erase(tmp);
131 }
132 firstdid += offset;
133 return true;
134 }
135 };
136
137 class PostlistCursorGt {
138 public:
139 /** Return true if and only if a's key is strictly greater than b's key.
140 */
operator ()(const PostlistCursor * a,const PostlistCursor * b)141 bool operator()(const PostlistCursor *a, const PostlistCursor *b) {
142 if (a->key > b->key) return true;
143 if (a->key != b->key) return false;
144 return (a->firstdid > b->firstdid);
145 }
146 };
147
148 static void
merge_postlists(Xapian::Compactor & compactor,FlintTable * out,vector<Xapian::docid>::const_iterator offset,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev,Xapian::docid last_docid)149 merge_postlists(Xapian::Compactor & compactor,
150 FlintTable * out, vector<Xapian::docid>::const_iterator offset,
151 vector<string>::const_iterator b,
152 vector<string>::const_iterator e,
153 vector<flint_revision_number_t>::const_iterator rev,
154 Xapian::docid last_docid)
155 {
156 totlen_t tot_totlen = 0;
157 priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
158 for ( ; b != e; ++b, ++offset, ++rev) {
159 FlintTable *in = new FlintTable("postlist", *b, true);
160 if (!in->open(*rev)) {
161 failed_to_open_at_rev(*b, *rev);
162 }
163 if (in->empty()) {
164 // Skip empty tables.
165 delete in;
166 continue;
167 }
168
169 // PostlistCursor takes ownership of FlintTable in and is
170 // responsible for deleting it.
171 PostlistCursor * cur = new PostlistCursor(in, *offset);
172 // Merge the METAINFO tags from each database into one.
173 // They have a key consisting of a single zero byte.
174 // They may be absent, if the database contains no documents. If it
175 // has user metadata we'll still get here.
176 if (is_metainfo_key(cur->key)) {
177 const char * data = cur->tag.data();
178 const char * end = data + cur->tag.size();
179 Xapian::docid dummy_did = 0;
180 if (!F_unpack_uint(&data, end, &dummy_did)) {
181 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
182 }
183 totlen_t totlen = 0;
184 if (!F_unpack_uint_last(&data, end, &totlen)) {
185 throw Xapian::DatabaseCorruptError("Tag containing meta information is corrupt.");
186 }
187 tot_totlen += totlen;
188 if (tot_totlen < totlen) {
189 throw "totlen wrapped!";
190 }
191 if (cur->next()) {
192 pq.push(cur);
193 } else {
194 delete cur;
195 }
196 } else {
197 pq.push(cur);
198 }
199 }
200
201 {
202 string tag = F_pack_uint(last_docid);
203 tag += F_pack_uint_last(tot_totlen);
204 out->add(string(1, '\0'), tag);
205 }
206
207 string last_key;
208 {
209 // Merge user metadata.
210 vector<string> tags;
211 while (!pq.empty()) {
212 PostlistCursor * cur = pq.top();
213 const string& key = cur->key;
214 if (!is_user_metadata_key(key)) break;
215
216 if (key != last_key) {
217 if (tags.size() > 1) {
218 Assert(!last_key.empty());
219 out->add(last_key,
220 compactor.resolve_duplicate_metadata(last_key,
221 tags.size(),
222 &tags[0]));
223 } else if (tags.size() == 1) {
224 Assert(!last_key.empty());
225 out->add(last_key, tags[0]);
226 }
227 tags.resize(0);
228 last_key = key;
229 }
230 tags.push_back(cur->tag);
231
232 pq.pop();
233 if (cur->next()) {
234 pq.push(cur);
235 } else {
236 delete cur;
237 }
238 }
239 if (tags.size() > 1) {
240 Assert(!last_key.empty());
241 out->add(last_key,
242 compactor.resolve_duplicate_metadata(last_key,
243 tags.size(),
244 &tags[0]));
245 } else if (tags.size() == 1) {
246 Assert(!last_key.empty());
247 out->add(last_key, tags[0]);
248 }
249 }
250
251 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
252 vector<pair<Xapian::docid, string> > tags;
253 while (true) {
254 PostlistCursor * cur = NULL;
255 if (!pq.empty()) {
256 cur = pq.top();
257 pq.pop();
258 }
259 Assert(cur == NULL || !is_user_metadata_key(cur->key));
260 if (cur == NULL || cur->key != last_key) {
261 if (!tags.empty()) {
262 string first_tag = F_pack_uint(tf);
263 first_tag += F_pack_uint(cf);
264 first_tag += F_pack_uint(tags[0].first - 1);
265 string tag = tags[0].second;
266 tag[0] = (tags.size() == 1) ? '1' : '0';
267 first_tag += tag;
268 out->add(last_key, first_tag);
269 vector<pair<Xapian::docid, string> >::const_iterator i;
270 i = tags.begin();
271 while (++i != tags.end()) {
272 string new_key = last_key;
273 new_key += F_pack_uint_preserving_sort(i->first);
274 tag = i->second;
275 tag[0] = (i + 1 == tags.end()) ? '1' : '0';
276 out->add(new_key, tag);
277 }
278 }
279 tags.clear();
280 if (cur == NULL) break;
281 tf = cf = 0;
282 last_key = cur->key;
283 }
284 tf += cur->tf;
285 cf += cur->cf;
286 tags.push_back(make_pair(cur->firstdid, cur->tag));
287 if (cur->next()) {
288 pq.push(cur);
289 } else {
290 delete cur;
291 }
292 }
293 }
294
295 struct MergeCursor : public FlintCursor {
MergeCursorFlintCompact::MergeCursor296 MergeCursor(FlintTable *in) : FlintCursor(in) {
297 find_entry(string());
298 next();
299 }
300
~MergeCursorFlintCompact::MergeCursor301 ~MergeCursor() {
302 delete FlintCursor::get_table();
303 }
304 };
305
306 struct CursorGt {
307 /// Return true if and only if a's key is strictly greater than b's key.
operator ()FlintCompact::CursorGt308 bool operator()(const FlintCursor *a, const FlintCursor *b) {
309 if (b->after_end()) return false;
310 if (a->after_end()) return true;
311 return (a->current_key > b->current_key);
312 }
313 };
314
315 static void
merge_spellings(FlintTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev)316 merge_spellings(FlintTable * out,
317 vector<string>::const_iterator b,
318 vector<string>::const_iterator e,
319 vector<flint_revision_number_t>::const_iterator rev)
320 {
321 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
322 for ( ; b != e; ++b, ++rev) {
323 FlintTable *in = new FlintTable("spelling", *b, true, DONT_COMPRESS, true);
324 if (!in->open(*rev)) {
325 failed_to_open_at_rev(*b, *rev);
326 }
327 if (!in->empty()) {
328 // The MergeCursor takes ownership of FlintTable in and is
329 // responsible for deleting it.
330 pq.push(new MergeCursor(in));
331 } else {
332 delete in;
333 }
334 }
335
336 while (!pq.empty()) {
337 MergeCursor * cur = pq.top();
338 pq.pop();
339
340 string key = cur->current_key;
341 if (pq.empty() || pq.top()->current_key > key) {
342 // No need to merge the tags, just copy the (possibly compressed)
343 // tag value.
344 bool compressed = cur->read_tag(true);
345 out->add(key, cur->current_tag, compressed);
346 if (cur->next()) {
347 pq.push(cur);
348 } else {
349 delete cur;
350 }
351 continue;
352 }
353
354 // Merge tag values with the same key:
355 string tag;
356 if (key[0] != 'W') {
357 // We just want the union of words, so copy over the first instance
358 // and skip any identical ones.
359 priority_queue<PrefixCompressedStringItor *,
360 vector<PrefixCompressedStringItor *>,
361 PrefixCompressedStringItorGt> pqtag;
362 // Stick all the MergeCursor pointers in a vector because their
363 // current_tag members must remain valid while we're merging their
364 // tags, but we need to call next() on them all afterwards.
365 vector<MergeCursor *> vec;
366 vec.reserve(pq.size());
367
368 while (true) {
369 cur->read_tag();
370 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
371 vec.push_back(cur);
372 if (pq.empty() || pq.top()->current_key != key) break;
373 cur = pq.top();
374 pq.pop();
375 }
376
377 PrefixCompressedStringWriter wr(tag);
378 string lastword;
379 while (!pqtag.empty()) {
380 PrefixCompressedStringItor * it = pqtag.top();
381 string word = **it;
382 if (word != lastword) {
383 lastword = word;
384 wr.append(lastword);
385 }
386 ++*it;
387 pqtag.pop();
388 if (!it->at_end()) {
389 pqtag.push(it);
390 } else {
391 delete it;
392 }
393 }
394
395 vector<MergeCursor *>::const_iterator i;
396 for (i = vec.begin(); i != vec.end(); ++i) {
397 cur = *i;
398 if (cur->next()) {
399 pq.push(cur);
400 } else {
401 delete cur;
402 }
403 }
404 } else {
405 // We want to sum the frequencies from tags for the same key.
406 Xapian::termcount tot_freq = 0;
407 while (true) {
408 cur->read_tag();
409 Xapian::termcount freq;
410 const char * p = cur->current_tag.data();
411 const char * end = p + cur->current_tag.size();
412 if (!F_unpack_uint_last(&p, end, &freq) || freq == 0) {
413 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
414 }
415 tot_freq += freq;
416 if (cur->next()) {
417 pq.push(cur);
418 } else {
419 delete cur;
420 }
421 if (pq.empty() || pq.top()->current_key != key) break;
422 cur = pq.top();
423 pq.pop();
424 }
425 tag = F_pack_uint_last(tot_freq);
426 }
427 out->add(key, tag);
428 }
429 }
430
431 static void
merge_synonyms(FlintTable * out,vector<string>::const_iterator b,vector<string>::const_iterator e,vector<flint_revision_number_t>::const_iterator rev)432 merge_synonyms(FlintTable * out,
433 vector<string>::const_iterator b,
434 vector<string>::const_iterator e,
435 vector<flint_revision_number_t>::const_iterator rev)
436 {
437 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
438 for ( ; b != e; ++b, ++rev) {
439 FlintTable *in = new FlintTable("synonym", *b, true, DONT_COMPRESS, true);
440 if (!in->open(*rev)) {
441 failed_to_open_at_rev(*b, *rev);
442 }
443 if (!in->empty()) {
444 // The MergeCursor takes ownership of FlintTable in and is
445 // responsible for deleting it.
446 pq.push(new MergeCursor(in));
447 } else {
448 delete in;
449 }
450 }
451
452 while (!pq.empty()) {
453 MergeCursor * cur = pq.top();
454 pq.pop();
455
456 string key = cur->current_key;
457 if (pq.empty() || pq.top()->current_key > key) {
458 // No need to merge the tags, just copy the (possibly compressed)
459 // tag value.
460 bool compressed = cur->read_tag(true);
461 out->add(key, cur->current_tag, compressed);
462 if (cur->next()) {
463 pq.push(cur);
464 } else {
465 delete cur;
466 }
467 continue;
468 }
469
470 // Merge tag values with the same key:
471 string tag;
472
473 // We just want the union of words, so copy over the first instance
474 // and skip any identical ones.
475 priority_queue<ByteLengthPrefixedStringItor *,
476 vector<ByteLengthPrefixedStringItor *>,
477 ByteLengthPrefixedStringItorGt> pqtag;
478 vector<MergeCursor *> vec;
479
480 while (true) {
481 cur->read_tag();
482 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
483 vec.push_back(cur);
484 if (pq.empty() || pq.top()->current_key != key) break;
485 cur = pq.top();
486 pq.pop();
487 }
488
489 string lastword;
490 while (!pqtag.empty()) {
491 ByteLengthPrefixedStringItor * it = pqtag.top();
492 if (**it != lastword) {
493 lastword = **it;
494 tag += byte(lastword.size() ^ MAGIC_XOR_VALUE);
495 tag += lastword;
496 }
497 ++*it;
498 pqtag.pop();
499 if (!it->at_end()) {
500 pqtag.push(it);
501 } else {
502 delete it;
503 }
504 }
505
506 vector<MergeCursor *>::const_iterator i;
507 for (i = vec.begin(); i != vec.end(); ++i) {
508 cur = *i;
509 if (cur->next()) {
510 pq.push(cur);
511 } else {
512 delete cur;
513 }
514 }
515
516 out->add(key, tag);
517 }
518 }
519
520 static void
multimerge_postlists(Xapian::Compactor & compactor,FlintTable * out,const char * tmpdir,Xapian::docid last_docid,vector<string> tmp,vector<flint_revision_number_t> revs,vector<Xapian::docid> off)521 multimerge_postlists(Xapian::Compactor & compactor,
522 FlintTable * out, const char * tmpdir,
523 Xapian::docid last_docid,
524 vector<string> tmp, vector<flint_revision_number_t> revs,
525 vector<Xapian::docid> off)
526 {
527 unsigned int c = 0;
528 while (tmp.size() > 3) {
529 vector<string> tmpout;
530 tmpout.reserve(tmp.size() / 2);
531 vector<Xapian::docid> newoff;
532 newoff.resize(tmp.size() / 2);
533 vector<flint_revision_number_t> newrevs;
534 newrevs.reserve(tmp.size() / 2);
535 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
536 j = i + 2;
537 if (j == tmp.size() - 1) ++j;
538
539 string dest = tmpdir;
540 char buf[64];
541 sprintf(buf, "/tmp%u_%u.", c, i / 2);
542 dest += buf;
543
544 // Don't compress temporary tables, even if the final table would
545 // be.
546 FlintTable tmptab("postlist", dest, false);
547 // Use maximum blocksize for temporary tables.
548 tmptab.create_and_open(65536);
549
550 merge_postlists(compactor, &tmptab, off.begin() + i,
551 tmp.begin() + i, tmp.begin() + j,
552 revs.begin() + i, last_docid);
553 if (c > 0) {
554 for (unsigned int k = i; k < j; ++k) {
555 unlink((tmp[k] + "DB").c_str());
556 unlink((tmp[k] + "baseA").c_str());
557 unlink((tmp[k] + "baseB").c_str());
558 }
559 }
560 tmpout.push_back(dest);
561 tmptab.flush_db();
562 tmptab.commit(1);
563 newrevs.push_back(1);
564 }
565 swap(tmp, tmpout);
566 swap(off, newoff);
567 swap(revs, newrevs);
568 ++c;
569 }
570 merge_postlists(compactor,
571 out, off.begin(), tmp.begin(), tmp.end(), revs.begin(),
572 last_docid);
573 if (c > 0) {
574 for (size_t k = 0; k < tmp.size(); ++k) {
575 unlink((tmp[k] + "DB").c_str());
576 unlink((tmp[k] + "baseA").c_str());
577 unlink((tmp[k] + "baseB").c_str());
578 }
579 }
580 }
581
582 static void
merge_docid_keyed(const char * tablename,FlintTable * out,const vector<string> & inputs,const vector<flint_revision_number_t> & revs,const vector<Xapian::docid> & offset,bool lazy)583 merge_docid_keyed(const char * tablename,
584 FlintTable *out, const vector<string> & inputs,
585 const vector<flint_revision_number_t> & revs,
586 const vector<Xapian::docid> & offset, bool lazy)
587 {
588 for (size_t i = 0; i < inputs.size(); ++i) {
589 Xapian::docid off = offset[i];
590
591 FlintTable in(tablename, inputs[i], true, DONT_COMPRESS, lazy);
592 if (!in.open(revs[i])) {
593 failed_to_open_at_rev(inputs[i], revs[i]);
594 }
595 if (in.empty()) continue;
596
597 FlintCursor cur(&in);
598 cur.find_entry(string());
599
600 string key;
601 while (cur.next()) {
602 // Adjust the key if this isn't the first database.
603 if (off) {
604 Xapian::docid did;
605 const char * d = cur.current_key.data();
606 const char * e = d + cur.current_key.size();
607 if (!F_unpack_uint_preserving_sort(&d, e, &did)) {
608 string msg = "Bad key in ";
609 msg += inputs[i];
610 throw Xapian::DatabaseCorruptError(msg);
611 }
612 did += off;
613 key = F_pack_uint_preserving_sort(did);
614 if (d != e) {
615 // Copy over the termname for the position table.
616 key.append(d, e - d);
617 }
618 } else {
619 key = cur.current_key;
620 }
621 bool compressed = cur.read_tag(true);
622 out->add(key, cur.current_tag, compressed);
623 }
624 }
625 }
626
627 }
628
629 using namespace FlintCompact;
630
631 void
compact_flint(Xapian::Compactor & compactor,const char * destdir,const vector<string> & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,bool multipass,Xapian::docid last_docid)632 compact_flint(Xapian::Compactor & compactor,
633 const char * destdir, const vector<string> & sources,
634 const vector<Xapian::docid> & offset, size_t block_size,
635 Xapian::Compactor::compaction_level compaction, bool multipass,
636 Xapian::docid last_docid) {
637 // Get the revisions of each database to use to ensure we don't read tables
638 // at different revisions from any of them.
639 vector<flint_revision_number_t> revs;
640 revs.reserve(sources.size());
641 for (vector<string>::const_iterator i = sources.begin();
642 i != sources.end(); ++i) {
643 FlintDatabase db(*i);
644 revs.push_back(db.get_revision_number());
645 }
646
647 enum table_type {
648 POSTLIST, RECORD, TERMLIST, POSITION, VALUE, SPELLING, SYNONYM
649 };
650 struct table_list {
651 // The "base name" of the table.
652 const char * name;
653 // The type.
654 table_type type;
655 // zlib compression strategy to use on tags.
656 int compress_strategy;
657 // Create tables after position lazily.
658 bool lazy;
659 };
660
661 static const table_list tables[] = {
662 // name type compress_strategy lazy
663 { "postlist", POSTLIST, DONT_COMPRESS, false },
664 { "record", RECORD, Z_DEFAULT_STRATEGY, false },
665 { "termlist", TERMLIST, Z_DEFAULT_STRATEGY, false },
666 { "position", POSITION, DONT_COMPRESS, true },
667 { "value", VALUE, DONT_COMPRESS, true },
668 { "spelling", SPELLING, Z_DEFAULT_STRATEGY, true },
669 { "synonym", SYNONYM, Z_DEFAULT_STRATEGY, true }
670 };
671 const table_list * tables_end = tables +
672 (sizeof(tables) / sizeof(tables[0]));
673
674 for (const table_list * t = tables; t < tables_end; ++t) {
675 // The postlist table requires an N-way merge, adjusting the
676 // headers of various blocks. The spelling and synonym tables also
677 // need special handling. The other tables have keys sorted in
678 // docid order, so we can merge them by simply copying all the keys
679 // from each source table in turn.
680 compactor.set_status(t->name, string());
681
682 string dest = destdir;
683 dest += '/';
684 dest += t->name;
685 dest += '.';
686
687 bool output_will_exist = !t->lazy;
688
689 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
690 // on certain systems).
691 bool bad_stat = false;
692
693 off_t in_size = 0;
694
695 vector<string> inputs;
696 inputs.reserve(sources.size());
697 size_t inputs_present = 0;
698 for (vector<string>::const_iterator src = sources.begin();
699 src != sources.end(); ++src) {
700 string s(*src);
701 s += t->name;
702 s += '.';
703
704 struct stat sb;
705 if (stat(s + "DB", &sb) == 0) {
706 in_size += sb.st_size / 1024;
707 output_will_exist = true;
708 ++inputs_present;
709 } else if (errno != ENOENT) {
710 // We get ENOENT for an optional table.
711 bad_stat = true;
712 output_will_exist = true;
713 ++inputs_present;
714 }
715 inputs.push_back(s);
716 }
717
718 if (!output_will_exist) {
719 compactor.set_status(t->name, "doesn't exist");
720 continue;
721 }
722
723 FlintTable out(t->name, dest, false, t->compress_strategy, t->lazy);
724 if (!t->lazy) {
725 out.create_and_open(block_size);
726 } else {
727 out.erase();
728 out.set_block_size(block_size);
729 }
730
731 out.set_full_compaction(compaction != compactor.STANDARD);
732 if (compaction == compactor.FULLER) out.set_max_item_size(1);
733
734 switch (t->type) {
735 case POSTLIST:
736 if (multipass && inputs.size() > 3) {
737 multimerge_postlists(compactor, &out, destdir, last_docid,
738 inputs, revs, offset);
739 } else {
740 merge_postlists(compactor, &out, offset.begin(),
741 inputs.begin(), inputs.end(),
742 revs.begin(), last_docid);
743 }
744 break;
745 case SPELLING:
746 merge_spellings(&out, inputs.begin(), inputs.end(),
747 revs.begin());
748 break;
749 case SYNONYM:
750 merge_synonyms(&out, inputs.begin(), inputs.end(),
751 revs.begin());
752 break;
753 default:
754 // Position, Record, Termlist, Value.
755 merge_docid_keyed(t->name, &out, inputs, revs, offset, t->lazy);
756 break;
757 }
758
759 // Commit as revision 1.
760 out.flush_db();
761 out.commit(1);
762
763 off_t out_size = 0;
764 if (!bad_stat) {
765 struct stat sb;
766 if (stat(dest + "DB", &sb) == 0) {
767 out_size = sb.st_size / 1024;
768 } else {
769 bad_stat = (errno != ENOENT);
770 }
771 }
772 if (bad_stat) {
773 compactor.set_status(t->name, "Done (couldn't stat all the DB files)");
774 } else {
775 string status;
776 if (out_size == in_size) {
777 status = "Size unchanged (";
778 } else {
779 off_t delta;
780 if (out_size < in_size) {
781 delta = in_size - out_size;
782 status = "Reduced by ";
783 } else {
784 delta = out_size - in_size;
785 status = "INCREASED by ";
786 }
787 status += str(100 * delta / in_size);
788 status += "% ";
789 status += str(delta);
790 status += "K (";
791 status += str(in_size);
792 status += "K -> ";
793 }
794 status += str(out_size);
795 status += "K)";
796 compactor.set_status(t->name, status);
797 }
798 }
799 }
800