1 /** @file
2 * @brief Compact a glass database, or merge and compact several.
3 */
4 /* Copyright (C) 2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2017 Olly Betts
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation; either version 2 of the
9 * License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
19 * USA
20 */
21
22 #include <config.h>
23
24 #include "xapian/compactor.h"
25 #include "xapian/constants.h"
26 #include "xapian/error.h"
27 #include "xapian/types.h"
28
29 #include "autoptr.h"
30 #include <algorithm>
31 #include <queue>
32
33 #include <cerrno>
34 #include <cstdio>
35
36 #include "backends/flint_lock.h"
37 #include "glass_database.h"
38 #include "glass_defs.h"
39 #include "glass_table.h"
40 #include "glass_cursor.h"
41 #include "glass_version.h"
42 #include "filetests.h"
43 #include "internaltypes.h"
44 #include "pack.h"
45 #include "backends/valuestats.h"
46
47 #include "../byte_length_strings.h"
48 #include "../prefix_compressed_strings.h"
49
50 using namespace std;
51
52 // Put all the helpers in a namespace to avoid symbols colliding with those of
53 // the same name in other flint-derived backends.
54 namespace GlassCompact {
55
56 static inline bool
is_user_metadata_key(const string & key)57 is_user_metadata_key(const string & key)
58 {
59 return key.size() > 1 && key[0] == '\0' && key[1] == '\xc0';
60 }
61
62 static inline bool
is_valuestats_key(const string & key)63 is_valuestats_key(const string & key)
64 {
65 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd0';
66 }
67
68 static inline bool
is_valuechunk_key(const string & key)69 is_valuechunk_key(const string & key)
70 {
71 return key.size() > 1 && key[0] == '\0' && key[1] == '\xd8';
72 }
73
74 static inline bool
is_doclenchunk_key(const string & key)75 is_doclenchunk_key(const string & key)
76 {
77 return key.size() > 1 && key[0] == '\0' && key[1] == '\xe0';
78 }
79
80 class PostlistCursor : private GlassCursor {
81 Xapian::docid offset;
82
83 public:
84 string key, tag;
85 Xapian::docid firstdid;
86 Xapian::termcount tf, cf;
87
PostlistCursor(GlassTable * in,Xapian::docid offset_)88 PostlistCursor(GlassTable *in, Xapian::docid offset_)
89 : GlassCursor(in), offset(offset_), firstdid(0)
90 {
91 find_entry(string());
92 next();
93 }
94
next()95 bool next() {
96 if (!GlassCursor::next()) return false;
97 // We put all chunks into the non-initial chunk form here, then fix up
98 // the first chunk for each term in the merged database as we merge.
99 read_tag();
100 key = current_key;
101 tag = current_tag;
102 tf = cf = 0;
103 if (is_user_metadata_key(key)) return true;
104 if (is_valuestats_key(key)) return true;
105 if (is_valuechunk_key(key)) {
106 const char * p = key.data();
107 const char * end = p + key.length();
108 p += 2;
109 Xapian::valueno slot;
110 if (!unpack_uint(&p, end, &slot))
111 throw Xapian::DatabaseCorruptError("bad value key");
112 Xapian::docid did;
113 if (!unpack_uint_preserving_sort(&p, end, &did))
114 throw Xapian::DatabaseCorruptError("bad value key");
115 did += offset;
116
117 key.assign("\0\xd8", 2);
118 pack_uint(key, slot);
119 pack_uint_preserving_sort(key, did);
120 return true;
121 }
122
123 // Adjust key if this is *NOT* an initial chunk.
124 // key is: pack_string_preserving_sort(key, tname)
125 // plus optionally: pack_uint_preserving_sort(key, did)
126 const char * d = key.data();
127 const char * e = d + key.size();
128 if (is_doclenchunk_key(key)) {
129 d += 2;
130 } else {
131 string tname;
132 if (!unpack_string_preserving_sort(&d, e, tname))
133 throw Xapian::DatabaseCorruptError("Bad postlist key");
134 }
135
136 if (d == e) {
137 // This is an initial chunk for a term, so adjust tag header.
138 d = tag.data();
139 e = d + tag.size();
140 if (!unpack_uint(&d, e, &tf) ||
141 !unpack_uint(&d, e, &cf) ||
142 !unpack_uint(&d, e, &firstdid)) {
143 throw Xapian::DatabaseCorruptError("Bad postlist key");
144 }
145 ++firstdid;
146 tag.erase(0, d - tag.data());
147 } else {
148 // Not an initial chunk, so adjust key.
149 size_t tmp = d - key.data();
150 if (!unpack_uint_preserving_sort(&d, e, &firstdid) || d != e)
151 throw Xapian::DatabaseCorruptError("Bad postlist key");
152 if (is_doclenchunk_key(key)) {
153 key.erase(tmp);
154 } else {
155 key.erase(tmp - 1);
156 }
157 }
158 firstdid += offset;
159 return true;
160 }
161 };
162
163 class PostlistCursorGt {
164 public:
165 /** Return true if and only if a's key is strictly greater than b's key.
166 */
operator ()(const PostlistCursor * a,const PostlistCursor * b) const167 bool operator()(const PostlistCursor *a, const PostlistCursor *b) const {
168 if (a->key > b->key) return true;
169 if (a->key != b->key) return false;
170 return (a->firstdid > b->firstdid);
171 }
172 };
173
174 static string
encode_valuestats(Xapian::doccount freq,const string & lbound,const string & ubound)175 encode_valuestats(Xapian::doccount freq,
176 const string & lbound, const string & ubound)
177 {
178 string value;
179 pack_uint(value, freq);
180 pack_string(value, lbound);
181 // We don't store or count empty values, so neither of the bounds
182 // can be empty. So we can safely store an empty upper bound when
183 // the bounds are equal.
184 if (lbound != ubound) value += ubound;
185 return value;
186 }
187
188 static void
merge_postlists(Xapian::Compactor * compactor,GlassTable * out,vector<Xapian::docid>::const_iterator offset,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)189 merge_postlists(Xapian::Compactor * compactor,
190 GlassTable * out, vector<Xapian::docid>::const_iterator offset,
191 vector<GlassTable*>::const_iterator b,
192 vector<GlassTable*>::const_iterator e)
193 {
194 priority_queue<PostlistCursor *, vector<PostlistCursor *>, PostlistCursorGt> pq;
195 for ( ; b != e; ++b, ++offset) {
196 GlassTable *in = *b;
197 if (in->empty()) {
198 // Skip empty tables.
199 continue;
200 }
201
202 pq.push(new PostlistCursor(in, *offset));
203 }
204
205 string last_key;
206 {
207 // Merge user metadata.
208 vector<string> tags;
209 while (!pq.empty()) {
210 PostlistCursor * cur = pq.top();
211 const string& key = cur->key;
212 if (!is_user_metadata_key(key)) break;
213
214 if (key != last_key) {
215 if (!tags.empty()) {
216 if (tags.size() > 1 && compactor) {
217 Assert(!last_key.empty());
218 // FIXME: It would be better to merge all duplicates
219 // for a key in one call, but currently we don't in
220 // multipass mode.
221 const string & resolved_tag =
222 compactor->resolve_duplicate_metadata(last_key,
223 tags.size(),
224 &tags[0]);
225 if (!resolved_tag.empty())
226 out->add(last_key, resolved_tag);
227 } else {
228 Assert(!last_key.empty());
229 out->add(last_key, tags[0]);
230 }
231 tags.resize(0);
232 }
233 last_key = key;
234 }
235 tags.push_back(cur->tag);
236
237 pq.pop();
238 if (cur->next()) {
239 pq.push(cur);
240 } else {
241 delete cur;
242 }
243 }
244 if (!tags.empty()) {
245 if (tags.size() > 1 && compactor) {
246 Assert(!last_key.empty());
247 const string & resolved_tag =
248 compactor->resolve_duplicate_metadata(last_key,
249 tags.size(),
250 &tags[0]);
251 if (!resolved_tag.empty())
252 out->add(last_key, resolved_tag);
253 } else {
254 Assert(!last_key.empty());
255 out->add(last_key, tags[0]);
256 }
257 }
258 }
259
260 {
261 // Merge valuestats.
262 Xapian::doccount freq = 0;
263 string lbound, ubound;
264
265 while (!pq.empty()) {
266 PostlistCursor * cur = pq.top();
267 const string& key = cur->key;
268 if (!is_valuestats_key(key)) break;
269 if (key != last_key) {
270 // For the first valuestats key, last_key will be the previous
271 // key we wrote, which we don't want to overwrite. This is the
272 // only time that freq will be 0, so check that.
273 if (freq) {
274 out->add(last_key, encode_valuestats(freq, lbound, ubound));
275 freq = 0;
276 }
277 last_key = key;
278 }
279
280 const string & tag = cur->tag;
281
282 const char * pos = tag.data();
283 const char * end = pos + tag.size();
284
285 Xapian::doccount f;
286 string l, u;
287 if (!unpack_uint(&pos, end, &f)) {
288 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
289 throw Xapian::RangeError("Frequency statistic in value table is too large");
290 }
291 if (!unpack_string(&pos, end, l)) {
292 if (*pos == 0) throw Xapian::DatabaseCorruptError("Incomplete stats item in value table");
293 throw Xapian::RangeError("Lower bound in value table is too large");
294 }
295 size_t len = end - pos;
296 if (len == 0) {
297 u = l;
298 } else {
299 u.assign(pos, len);
300 }
301 if (freq == 0) {
302 freq = f;
303 lbound = l;
304 ubound = u;
305 } else {
306 freq += f;
307 if (l < lbound) lbound = l;
308 if (u > ubound) ubound = u;
309 }
310
311 pq.pop();
312 if (cur->next()) {
313 pq.push(cur);
314 } else {
315 delete cur;
316 }
317 }
318
319 if (freq) {
320 out->add(last_key, encode_valuestats(freq, lbound, ubound));
321 }
322 }
323
324 // Merge valuestream chunks.
325 while (!pq.empty()) {
326 PostlistCursor * cur = pq.top();
327 const string & key = cur->key;
328 if (!is_valuechunk_key(key)) break;
329 Assert(!is_user_metadata_key(key));
330 out->add(key, cur->tag);
331 pq.pop();
332 if (cur->next()) {
333 pq.push(cur);
334 } else {
335 delete cur;
336 }
337 }
338
339 Xapian::termcount tf = 0, cf = 0; // Initialise to avoid warnings.
340 vector<pair<Xapian::docid, string>> tags;
341 while (true) {
342 PostlistCursor * cur = NULL;
343 if (!pq.empty()) {
344 cur = pq.top();
345 pq.pop();
346 }
347 Assert(cur == NULL || !is_user_metadata_key(cur->key));
348 if (cur == NULL || cur->key != last_key) {
349 if (!tags.empty()) {
350 string first_tag;
351 pack_uint(first_tag, tf);
352 pack_uint(first_tag, cf);
353 pack_uint(first_tag, tags[0].first - 1);
354 string tag = tags[0].second;
355 tag[0] = (tags.size() == 1) ? '1' : '0';
356 first_tag += tag;
357 out->add(last_key, first_tag);
358
359 string term;
360 if (!is_doclenchunk_key(last_key)) {
361 const char * p = last_key.data();
362 const char * end = p + last_key.size();
363 if (!unpack_string_preserving_sort(&p, end, term) || p != end)
364 throw Xapian::DatabaseCorruptError("Bad postlist chunk key");
365 }
366
367 auto i = tags.begin();
368 while (++i != tags.end()) {
369 tag = i->second;
370 tag[0] = (i + 1 == tags.end()) ? '1' : '0';
371 out->add(pack_glass_postlist_key(term, i->first), tag);
372 }
373 }
374 tags.clear();
375 if (cur == NULL) break;
376 tf = cf = 0;
377 last_key = cur->key;
378 }
379 tf += cur->tf;
380 cf += cur->cf;
381 tags.push_back(make_pair(cur->firstdid, cur->tag));
382 if (cur->next()) {
383 pq.push(cur);
384 } else {
385 delete cur;
386 }
387 }
388 }
389
390 struct MergeCursor : public GlassCursor {
MergeCursorGlassCompact::MergeCursor391 explicit MergeCursor(GlassTable *in) : GlassCursor(in) {
392 find_entry(string());
393 next();
394 }
395 };
396
397 struct CursorGt {
398 /// Return true if and only if a's key is strictly greater than b's key.
operator ()GlassCompact::CursorGt399 bool operator()(const GlassCursor *a, const GlassCursor *b) const {
400 if (b->after_end()) return false;
401 if (a->after_end()) return true;
402 return (a->current_key > b->current_key);
403 }
404 };
405
406 static void
merge_spellings(GlassTable * out,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)407 merge_spellings(GlassTable * out,
408 vector<GlassTable*>::const_iterator b,
409 vector<GlassTable*>::const_iterator e)
410 {
411 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
412 for ( ; b != e; ++b) {
413 GlassTable *in = *b;
414 if (!in->empty()) {
415 pq.push(new MergeCursor(in));
416 }
417 }
418
419 while (!pq.empty()) {
420 MergeCursor * cur = pq.top();
421 pq.pop();
422
423 string key = cur->current_key;
424 if (pq.empty() || pq.top()->current_key > key) {
425 // No need to merge the tags, just copy the (possibly compressed)
426 // tag value.
427 bool compressed = cur->read_tag(true);
428 out->add(key, cur->current_tag, compressed);
429 if (cur->next()) {
430 pq.push(cur);
431 } else {
432 delete cur;
433 }
434 continue;
435 }
436
437 // Merge tag values with the same key:
438 string tag;
439 if (key[0] != 'W') {
440 // We just want the union of words, so copy over the first instance
441 // and skip any identical ones.
442 priority_queue<PrefixCompressedStringItor *,
443 vector<PrefixCompressedStringItor *>,
444 PrefixCompressedStringItorGt> pqtag;
445 // Stick all the MergeCursor pointers in a vector because their
446 // current_tag members must remain valid while we're merging their
447 // tags, but we need to call next() on them all afterwards.
448 vector<MergeCursor *> vec;
449 vec.reserve(pq.size());
450
451 while (true) {
452 cur->read_tag();
453 pqtag.push(new PrefixCompressedStringItor(cur->current_tag));
454 vec.push_back(cur);
455 if (pq.empty() || pq.top()->current_key != key) break;
456 cur = pq.top();
457 pq.pop();
458 }
459
460 PrefixCompressedStringWriter wr(tag);
461 string lastword;
462 while (!pqtag.empty()) {
463 PrefixCompressedStringItor * it = pqtag.top();
464 pqtag.pop();
465 string word = **it;
466 if (word != lastword) {
467 lastword = word;
468 wr.append(lastword);
469 }
470 ++*it;
471 if (!it->at_end()) {
472 pqtag.push(it);
473 } else {
474 delete it;
475 }
476 }
477
478 vector<MergeCursor *>::const_iterator i;
479 for (i = vec.begin(); i != vec.end(); ++i) {
480 cur = *i;
481 if (cur->next()) {
482 pq.push(cur);
483 } else {
484 delete cur;
485 }
486 }
487 } else {
488 // We want to sum the frequencies from tags for the same key.
489 Xapian::termcount tot_freq = 0;
490 while (true) {
491 cur->read_tag();
492 Xapian::termcount freq;
493 const char * p = cur->current_tag.data();
494 const char * end = p + cur->current_tag.size();
495 if (!unpack_uint_last(&p, end, &freq) || freq == 0) {
496 throw Xapian::DatabaseCorruptError("Bad spelling word freq");
497 }
498 tot_freq += freq;
499 if (cur->next()) {
500 pq.push(cur);
501 } else {
502 delete cur;
503 }
504 if (pq.empty() || pq.top()->current_key != key) break;
505 cur = pq.top();
506 pq.pop();
507 }
508 tag.resize(0);
509 pack_uint_last(tag, tot_freq);
510 }
511 out->add(key, tag);
512 }
513 }
514
515 static void
merge_synonyms(GlassTable * out,vector<GlassTable * >::const_iterator b,vector<GlassTable * >::const_iterator e)516 merge_synonyms(GlassTable * out,
517 vector<GlassTable*>::const_iterator b,
518 vector<GlassTable*>::const_iterator e)
519 {
520 priority_queue<MergeCursor *, vector<MergeCursor *>, CursorGt> pq;
521 for ( ; b != e; ++b) {
522 GlassTable *in = *b;
523 if (!in->empty()) {
524 pq.push(new MergeCursor(in));
525 }
526 }
527
528 while (!pq.empty()) {
529 MergeCursor * cur = pq.top();
530 pq.pop();
531
532 string key = cur->current_key;
533 if (pq.empty() || pq.top()->current_key > key) {
534 // No need to merge the tags, just copy the (possibly compressed)
535 // tag value.
536 bool compressed = cur->read_tag(true);
537 out->add(key, cur->current_tag, compressed);
538 if (cur->next()) {
539 pq.push(cur);
540 } else {
541 delete cur;
542 }
543 continue;
544 }
545
546 // Merge tag values with the same key:
547 string tag;
548
549 // We just want the union of words, so copy over the first instance
550 // and skip any identical ones.
551 priority_queue<ByteLengthPrefixedStringItor *,
552 vector<ByteLengthPrefixedStringItor *>,
553 ByteLengthPrefixedStringItorGt> pqtag;
554 vector<MergeCursor *> vec;
555
556 while (true) {
557 cur->read_tag();
558 pqtag.push(new ByteLengthPrefixedStringItor(cur->current_tag));
559 vec.push_back(cur);
560 if (pq.empty() || pq.top()->current_key != key) break;
561 cur = pq.top();
562 pq.pop();
563 }
564
565 string lastword;
566 while (!pqtag.empty()) {
567 ByteLengthPrefixedStringItor * it = pqtag.top();
568 pqtag.pop();
569 if (**it != lastword) {
570 lastword = **it;
571 tag += uint8_t(lastword.size() ^ MAGIC_XOR_VALUE);
572 tag += lastword;
573 }
574 ++*it;
575 if (!it->at_end()) {
576 pqtag.push(it);
577 } else {
578 delete it;
579 }
580 }
581
582 vector<MergeCursor *>::const_iterator i;
583 for (i = vec.begin(); i != vec.end(); ++i) {
584 cur = *i;
585 if (cur->next()) {
586 pq.push(cur);
587 } else {
588 delete cur;
589 }
590 }
591
592 out->add(key, tag);
593 }
594 }
595
596 static void
multimerge_postlists(Xapian::Compactor * compactor,GlassTable * out,const char * tmpdir,vector<GlassTable * > tmp,vector<Xapian::docid> off)597 multimerge_postlists(Xapian::Compactor * compactor,
598 GlassTable * out, const char * tmpdir,
599 vector<GlassTable *> tmp,
600 vector<Xapian::docid> off)
601 {
602 unsigned int c = 0;
603 while (tmp.size() > 3) {
604 vector<GlassTable *> tmpout;
605 tmpout.reserve(tmp.size() / 2);
606 vector<Xapian::docid> newoff;
607 newoff.resize(tmp.size() / 2);
608 for (unsigned int i = 0, j; i < tmp.size(); i = j) {
609 j = i + 2;
610 if (j == tmp.size() - 1) ++j;
611
612 string dest = tmpdir;
613 char buf[64];
614 sprintf(buf, "/tmp%u_%u.", c, i / 2);
615 dest += buf;
616
617 GlassTable * tmptab = new GlassTable("postlist", dest, false);
618
619 // Use maximum blocksize for temporary tables. And don't compress
620 // entries in temporary tables, even if the final table would do
621 // so. Any already compressed entries will get copied in
622 // compressed form.
623 RootInfo root_info;
624 root_info.init(65536, 0);
625 const int flags = Xapian::DB_DANGEROUS|Xapian::DB_NO_SYNC;
626 tmptab->create_and_open(flags, root_info);
627
628 merge_postlists(compactor, tmptab, off.begin() + i,
629 tmp.begin() + i, tmp.begin() + j);
630 if (c > 0) {
631 for (unsigned int k = i; k < j; ++k) {
632 unlink(tmp[k]->get_path().c_str());
633 delete tmp[k];
634 tmp[k] = NULL;
635 }
636 }
637 tmpout.push_back(tmptab);
638 tmptab->flush_db();
639 tmptab->commit(1, &root_info);
640 AssertRel(root_info.get_blocksize(),==,65536);
641 }
642 swap(tmp, tmpout);
643 swap(off, newoff);
644 ++c;
645 }
646 merge_postlists(compactor, out, off.begin(), tmp.begin(), tmp.end());
647 if (c > 0) {
648 for (size_t k = 0; k < tmp.size(); ++k) {
649 unlink(tmp[k]->get_path().c_str());
650 delete tmp[k];
651 tmp[k] = NULL;
652 }
653 }
654 }
655
656 class PositionCursor : private GlassCursor {
657 Xapian::docid offset;
658
659 public:
660 string key;
661 Xapian::docid firstdid;
662
PositionCursor(GlassTable * in,Xapian::docid offset_)663 PositionCursor(GlassTable *in, Xapian::docid offset_)
664 : GlassCursor(in), offset(offset_), firstdid(0) {
665 find_entry(string());
666 next();
667 }
668
next()669 bool next() {
670 if (!GlassCursor::next()) return false;
671 read_tag();
672 const char * d = current_key.data();
673 const char * e = d + current_key.size();
674 string term;
675 Xapian::docid did;
676 if (!unpack_string_preserving_sort(&d, e, term) ||
677 !unpack_uint_preserving_sort(&d, e, &did) ||
678 d != e) {
679 throw Xapian::DatabaseCorruptError("Bad position key");
680 }
681
682 key.resize(0);
683 pack_string_preserving_sort(key, term);
684 pack_uint_preserving_sort(key, did + offset);
685 return true;
686 }
687
get_tag() const688 const string & get_tag() const {
689 return current_tag;
690 }
691 };
692
693 class PositionCursorGt {
694 public:
695 /** Return true if and only if a's key is strictly greater than b's key.
696 */
operator ()(const PositionCursor * a,const PositionCursor * b) const697 bool operator()(const PositionCursor *a, const PositionCursor *b) const {
698 return a->key > b->key;
699 }
700 };
701
702 static void
merge_positions(GlassTable * out,const vector<GlassTable * > & inputs,const vector<Xapian::docid> & offset)703 merge_positions(GlassTable *out, const vector<GlassTable*> & inputs,
704 const vector<Xapian::docid> & offset)
705 {
706 priority_queue<PositionCursor *, vector<PositionCursor *>, PositionCursorGt> pq;
707 for (size_t i = 0; i < inputs.size(); ++i) {
708 GlassTable *in = inputs[i];
709 if (in->empty()) {
710 // Skip empty tables.
711 continue;
712 }
713
714 pq.push(new PositionCursor(in, offset[i]));
715 }
716
717 while (!pq.empty()) {
718 PositionCursor * cur = pq.top();
719 pq.pop();
720 out->add(cur->key, cur->get_tag());
721 if (cur->next()) {
722 pq.push(cur);
723 } else {
724 delete cur;
725 }
726 }
727 }
728
729 static void
merge_docid_keyed(GlassTable * out,const vector<GlassTable * > & inputs,const vector<Xapian::docid> & offset)730 merge_docid_keyed(GlassTable *out, const vector<GlassTable*> & inputs,
731 const vector<Xapian::docid> & offset)
732 {
733 for (size_t i = 0; i < inputs.size(); ++i) {
734 Xapian::docid off = offset[i];
735
736 GlassTable * in = inputs[i];
737 if (in->empty()) continue;
738
739 GlassCursor cur(in);
740 cur.find_entry(string());
741
742 string key;
743 while (cur.next()) {
744 // Adjust the key if this isn't the first database.
745 if (off) {
746 Xapian::docid did;
747 const char * d = cur.current_key.data();
748 const char * e = d + cur.current_key.size();
749 if (!unpack_uint_preserving_sort(&d, e, &did)) {
750 string msg = "Bad key in ";
751 msg += inputs[i]->get_path();
752 throw Xapian::DatabaseCorruptError(msg);
753 }
754 did += off;
755 key.resize(0);
756 pack_uint_preserving_sort(key, did);
757 if (d != e) {
758 // Copy over anything extra in the key (e.g. the zero byte
759 // at the end of "used value slots" in the termlist table).
760 key.append(d, e - d);
761 }
762 } else {
763 key = cur.current_key;
764 }
765 bool compressed = cur.read_tag(true);
766 out->add(key, cur.current_tag, compressed);
767 }
768 }
769 }
770
771 }
772
773 using namespace GlassCompact;
774
775 void
compact(Xapian::Compactor * compactor,const char * destdir,int fd,const vector<Xapian::Database::Internal * > & sources,const vector<Xapian::docid> & offset,size_t block_size,Xapian::Compactor::compaction_level compaction,unsigned flags,Xapian::docid last_docid)776 GlassDatabase::compact(Xapian::Compactor * compactor,
777 const char * destdir,
778 int fd,
779 const vector<Xapian::Database::Internal*> & sources,
780 const vector<Xapian::docid> & offset,
781 size_t block_size,
782 Xapian::Compactor::compaction_level compaction,
783 unsigned flags,
784 Xapian::docid last_docid)
785 {
786 struct table_list {
787 // The "base name" of the table.
788 char name[9];
789 // The type.
790 Glass::table_type type;
791 // Create tables after position lazily.
792 bool lazy;
793 };
794
795 static const table_list tables[] = {
796 // name type lazy
797 { "postlist", Glass::POSTLIST, false },
798 { "docdata", Glass::DOCDATA, true },
799 { "termlist", Glass::TERMLIST, false },
800 { "position", Glass::POSITION, true },
801 { "spelling", Glass::SPELLING, true },
802 { "synonym", Glass::SYNONYM, true }
803 };
804 const table_list * tables_end = tables +
805 (sizeof(tables) / sizeof(tables[0]));
806
807 const int FLAGS = Xapian::DB_DANGEROUS;
808
809 bool single_file = (flags & Xapian::DBCOMPACT_SINGLE_FILE);
810 bool multipass = (flags & Xapian::DBCOMPACT_MULTIPASS);
811 if (single_file) {
812 // FIXME: Support this combination - we need to put temporary files
813 // somewhere.
814 multipass = false;
815 }
816
817 for (size_t i = 0; i != sources.size(); ++i) {
818 auto db = static_cast<const GlassDatabase*>(sources[i]);
819 if (db->has_uncommitted_changes()) {
820 const char * m =
821 "Can't compact from a WritableDatabase with uncommitted "
822 "changes - either call commit() first, or create a new "
823 "Database object from the filename on disk";
824 throw Xapian::InvalidOperationError(m);
825 }
826 }
827
828 if (block_size < 2048 || block_size > 65536 ||
829 (block_size & (block_size - 1)) != 0) {
830 block_size = GLASS_DEFAULT_BLOCKSIZE;
831 }
832
833 FlintLock lock(destdir ? destdir : "");
834 if (!single_file) {
835 string explanation;
836 FlintLock::reason why = lock.lock(true, false, explanation);
837 if (why != FlintLock::SUCCESS) {
838 lock.throw_databaselockerror(why, destdir, explanation);
839 }
840 }
841
842 AutoPtr<GlassVersion> version_file_out;
843 if (single_file) {
844 if (destdir) {
845 fd = open(destdir, O_RDWR|O_CREAT|O_TRUNC|O_BINARY|O_CLOEXEC, 0666);
846 if (fd < 0) {
847 throw Xapian::DatabaseCreateError("open() failed", errno);
848 }
849 }
850 version_file_out.reset(new GlassVersion(fd));
851 } else {
852 fd = -1;
853 version_file_out.reset(new GlassVersion(destdir));
854 }
855
856 version_file_out->create(block_size);
857 for (size_t i = 0; i != sources.size(); ++i) {
858 GlassDatabase * db = static_cast<GlassDatabase*>(sources[i]);
859 version_file_out->merge_stats(db->version_file);
860 }
861
862 string fl_serialised;
863 if (single_file) {
864 GlassFreeList fl;
865 fl.set_first_unused_block(1); // FIXME: Assumption?
866 fl.pack(fl_serialised);
867 }
868
869 vector<GlassTable *> tabs;
870 tabs.reserve(tables_end - tables);
871 off_t prev_size = block_size;
872 for (const table_list * t = tables; t < tables_end; ++t) {
873 // The postlist table requires an N-way merge, adjusting the
874 // headers of various blocks. The spelling and synonym tables also
875 // need special handling. The other tables have keys sorted in
876 // docid order, so we can merge them by simply copying all the keys
877 // from each source table in turn.
878 if (compactor)
879 compactor->set_status(t->name, string());
880
881 string dest;
882 if (!single_file) {
883 dest = destdir;
884 dest += '/';
885 dest += t->name;
886 dest += '.';
887 }
888
889 bool output_will_exist = !t->lazy;
890
891 // Sometimes stat can fail for benign reasons (e.g. >= 2GB file
892 // on certain systems).
893 bool bad_stat = false;
894
895 // We can't currently report input sizes if there's a single file DB
896 // amongst the inputs.
897 bool single_file_in = false;
898
899 off_t in_size = 0;
900
901 vector<GlassTable*> inputs;
902 inputs.reserve(sources.size());
903 size_t inputs_present = 0;
904 for (auto src : sources) {
905 GlassDatabase * db = static_cast<GlassDatabase*>(src);
906 GlassTable * table;
907 switch (t->type) {
908 case Glass::POSTLIST:
909 table = &(db->postlist_table);
910 break;
911 case Glass::DOCDATA:
912 table = &(db->docdata_table);
913 break;
914 case Glass::TERMLIST:
915 table = &(db->termlist_table);
916 break;
917 case Glass::POSITION:
918 table = &(db->position_table);
919 break;
920 case Glass::SPELLING:
921 table = &(db->spelling_table);
922 break;
923 case Glass::SYNONYM:
924 table = &(db->synonym_table);
925 break;
926 default:
927 Assert(false);
928 return;
929 }
930
931 if (db->single_file()) {
932 if (t->lazy && table->empty()) {
933 // Essentially doesn't exist.
934 } else {
935 // FIXME: Find actual size somehow?
936 // in_size += table->size() / 1024;
937 single_file_in = true;
938 output_will_exist = true;
939 ++inputs_present;
940 }
941 } else {
942 off_t db_size = file_size(table->get_path());
943 if (errno == 0) {
944 in_size += db_size / 1024;
945 output_will_exist = true;
946 ++inputs_present;
947 } else if (errno != ENOENT) {
948 // We get ENOENT for an optional table.
949 bad_stat = true;
950 output_will_exist = true;
951 ++inputs_present;
952 }
953 }
954 inputs.push_back(table);
955 }
956
957 // If any inputs lack a termlist table, suppress it in the output.
958 if (t->type == Glass::TERMLIST && inputs_present != sources.size()) {
959 if (inputs_present != 0) {
960 if (compactor) {
961 string m = str(inputs_present);
962 m += " of ";
963 m += str(sources.size());
964 m += " inputs present, so suppressing output";
965 compactor->set_status(t->name, m);
966 }
967 continue;
968 }
969 output_will_exist = false;
970 }
971
972 if (!output_will_exist) {
973 if (compactor)
974 compactor->set_status(t->name, "doesn't exist");
975 continue;
976 }
977
978 GlassTable * out;
979 if (single_file) {
980 out = new GlassTable(t->name, fd, version_file_out->get_offset(),
981 false, false);
982 } else {
983 out = new GlassTable(t->name, dest, false, t->lazy);
984 }
985 tabs.push_back(out);
986 RootInfo * root_info = version_file_out->root_to_set(t->type);
987 if (single_file) {
988 root_info->set_free_list(fl_serialised);
989 out->open(FLAGS, version_file_out->get_root(t->type), version_file_out->get_revision());
990 } else {
991 out->create_and_open(FLAGS, *root_info);
992 }
993
994 out->set_full_compaction(compaction != compactor->STANDARD);
995 if (compaction == compactor->FULLER) out->set_max_item_size(1);
996
997 switch (t->type) {
998 case Glass::POSTLIST: {
999 if (multipass && inputs.size() > 3) {
1000 multimerge_postlists(compactor, out, destdir,
1001 inputs, offset);
1002 } else {
1003 merge_postlists(compactor, out, offset.begin(),
1004 inputs.begin(), inputs.end());
1005 }
1006 break;
1007 }
1008 case Glass::SPELLING:
1009 merge_spellings(out, inputs.begin(), inputs.end());
1010 break;
1011 case Glass::SYNONYM:
1012 merge_synonyms(out, inputs.begin(), inputs.end());
1013 break;
1014 case Glass::POSITION:
1015 merge_positions(out, inputs, offset);
1016 break;
1017 default:
1018 // DocData, Termlist
1019 merge_docid_keyed(out, inputs, offset);
1020 break;
1021 }
1022
1023 // Commit as revision 1.
1024 out->flush_db();
1025 out->commit(1, root_info);
1026 out->sync();
1027 if (single_file) fl_serialised = root_info->get_free_list();
1028
1029 off_t out_size = 0;
1030 if (!bad_stat && !single_file_in) {
1031 off_t db_size;
1032 if (single_file) {
1033 db_size = file_size(fd);
1034 } else {
1035 db_size = file_size(dest + GLASS_TABLE_EXTENSION);
1036 }
1037 if (errno == 0) {
1038 if (single_file) {
1039 off_t old_prev_size = max(prev_size, off_t(block_size));
1040 prev_size = db_size;
1041 db_size -= old_prev_size;
1042 }
1043 out_size = db_size / 1024;
1044 } else {
1045 bad_stat = (errno != ENOENT);
1046 }
1047 }
1048 if (bad_stat) {
1049 if (compactor)
1050 compactor->set_status(t->name, "Done (couldn't stat all the DB files)");
1051 } else if (single_file_in) {
1052 if (compactor)
1053 compactor->set_status(t->name, "Done (table sizes unknown for single file DB input)");
1054 } else {
1055 string status;
1056 if (out_size == in_size) {
1057 status = "Size unchanged (";
1058 } else {
1059 off_t delta;
1060 if (out_size < in_size) {
1061 delta = in_size - out_size;
1062 status = "Reduced by ";
1063 } else {
1064 delta = out_size - in_size;
1065 status = "INCREASED by ";
1066 }
1067 if (in_size) {
1068 status += str(100 * delta / in_size);
1069 status += "% ";
1070 }
1071 status += str(delta);
1072 status += "K (";
1073 status += str(in_size);
1074 status += "K -> ";
1075 }
1076 status += str(out_size);
1077 status += "K)";
1078 if (compactor)
1079 compactor->set_status(t->name, status);
1080 }
1081 }
1082
1083 // If compacting to a single file output and all the tables are empty, pad
1084 // the output so that it isn't mistaken for a stub database when we try to
1085 // open it. For this it needs to be a multiple of 2KB in size.
1086 if (single_file && prev_size < off_t(block_size)) {
1087 #ifdef HAVE_FTRUNCATE
1088 if (ftruncate(fd, block_size) < 0) {
1089 throw Xapian::DatabaseError("Failed to set size of output database", errno);
1090 }
1091 #else
1092 const off_t off = block_size - 1;
1093 if (lseek(fd, off, SEEK_SET) != off || write(fd, "", 1) != 1) {
1094 throw Xapian::DatabaseError("Failed to set size of output database", errno);
1095 }
1096 #endif
1097 }
1098
1099 if (single_file) {
1100 if (lseek(fd, version_file_out->get_offset(), SEEK_SET) < 0) {
1101 throw Xapian::DatabaseError("lseek() failed", errno);
1102 }
1103 }
1104 version_file_out->set_last_docid(last_docid);
1105 string tmpfile = version_file_out->write(1, FLAGS);
1106 for (unsigned j = 0; j != tabs.size(); ++j) {
1107 tabs[j]->sync();
1108 }
1109 // Commit with revision 1.
1110 version_file_out->sync(tmpfile, 1, FLAGS);
1111 for (unsigned j = 0; j != tabs.size(); ++j) {
1112 delete tabs[j];
1113 }
1114
1115 if (!single_file) lock.release();
1116 }
1117