1 /** @file
2 * @brief Compact a database, or merge and compact several.
3 */
4 /* Copyright (C) 2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2015,2016 Olly Betts
5 * Copyright (C) 2008 Lemur Consulting Ltd
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation; either version 2 of the
10 * License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20 * USA
21 */
22
23 #include <config.h>
24
25 #include <xapian/compactor.h>
26
27 #include <algorithm>
28 #include <fstream>
29 #include <vector>
30
31 #include <cerrno>
32 #include <cstring>
33 #include <ctime>
34 #include "safesysstat.h"
35 #include <sys/types.h>
36
37 #include "safeunistd.h"
38 #include "safefcntl.h"
39
40 #include "backends/backends.h"
41 #include "backends/database.h"
42 #include "debuglog.h"
43 #include "leafpostlist.h"
44 #include "noreturn.h"
45 #include "omassert.h"
46 #include "filetests.h"
47 #include "fileutils.h"
48 #include "io_utils.h"
49 #include "stringutils.h"
50 #include "str.h"
51
52 #ifdef XAPIAN_HAS_GLASS_BACKEND
53 #include "backends/glass/glass_database.h"
54 #include "backends/glass/glass_version.h"
55 #endif
56 #ifdef XAPIAN_HAS_CHERT_BACKEND
57 #include "backends/chert/chert_database.h"
58 #include "backends/chert/chert_version.h"
59 #endif
60
61 #include <xapian/constants.h>
62 #include <xapian/database.h>
63 #include <xapian/error.h>
64
65 using namespace std;
66
67 class CmpByFirstUsed {
68 const vector<pair<Xapian::docid, Xapian::docid>>& used_ranges;
69
70 public:
71 explicit
CmpByFirstUsed(const vector<pair<Xapian::docid,Xapian::docid>> & ur)72 CmpByFirstUsed(const vector<pair<Xapian::docid, Xapian::docid>>& ur)
73 : used_ranges(ur) { }
74
operator ()(size_t a,size_t b) const75 bool operator()(size_t a, size_t b) const {
76 return used_ranges[a].first < used_ranges[b].first;
77 }
78 };
79
80 namespace Xapian {
81
82 class Compactor::Internal : public Xapian::Internal::intrusive_base {
83 friend class Compactor;
84
85 string destdir_compat;
86 size_t block_size;
87 unsigned flags;
88
89 vector<string> srcdirs_compat;
90
91 public:
Internal()92 Internal() : block_size(8192), flags(FULL) { }
93 };
94
Compactor()95 Compactor::Compactor() : internal(new Compactor::Internal()) { }
96
~Compactor()97 Compactor::~Compactor() { }
98
99 void
set_block_size(size_t block_size)100 Compactor::set_block_size(size_t block_size)
101 {
102 internal->block_size = block_size;
103 }
104
105 void
set_flags_(unsigned flags,unsigned mask)106 Compactor::set_flags_(unsigned flags, unsigned mask)
107 {
108 internal->flags = (internal->flags & mask) | flags;
109 }
110
111 void
set_destdir(const string & destdir)112 Compactor::set_destdir(const string & destdir)
113 {
114 internal->destdir_compat = destdir;
115 }
116
117 void
add_source(const string & srcdir)118 Compactor::add_source(const string & srcdir)
119 {
120 internal->srcdirs_compat.push_back(srcdir);
121 }
122
123 void
compact()124 Compactor::compact()
125 {
126 Xapian::Database src;
127 for (auto srcdir : internal->srcdirs_compat) {
128 src.add_database(Xapian::Database(srcdir));
129 }
130 src.compact(internal->destdir_compat, internal->flags,
131 internal->block_size, *this);
132 }
133
134 void
set_status(const string & table,const string & status)135 Compactor::set_status(const string & table, const string & status)
136 {
137 (void)table;
138 (void)status;
139 }
140
141 string
resolve_duplicate_metadata(const string & key,size_t num_tags,const std::string tags[])142 Compactor::resolve_duplicate_metadata(const string & key,
143 size_t num_tags, const std::string tags[])
144 {
145 (void)key;
146 (void)num_tags;
147 return tags[0];
148 }
149
150 }
151
152 XAPIAN_NORETURN(
153 static void
154 backend_mismatch(const Xapian::Database & db, int backend1,
155 const string &dbpath2, int backend2)
156 );
157 static void
backend_mismatch(const Xapian::Database & db,int backend1,const string & dbpath2,int backend2)158 backend_mismatch(const Xapian::Database & db, int backend1,
159 const string &dbpath2, int backend2)
160 {
161 string dbpath1;
162 db.internal[0]->get_backend_info(&dbpath1);
163 string msg = "All databases must be the same type ('";
164 msg += dbpath1;
165 msg += "' is ";
166 msg += backend_name(backend1);
167 msg += ", but '";
168 msg += dbpath2;
169 msg += "' is ";
170 msg += backend_name(backend2);
171 msg += ')';
172 throw Xapian::InvalidArgumentError(msg);
173 }
174
175 namespace Xapian {
176
177 void
compact_(const string * output_ptr,int fd,unsigned flags,int block_size,Xapian::Compactor * compactor) const178 Database::compact_(const string * output_ptr, int fd, unsigned flags,
179 int block_size,
180 Xapian::Compactor * compactor) const
181 {
182 LOGCALL_VOID(API, "Database::compact_", output_ptr | fd | flags | block_size | compactor);
183
184 bool renumber = !(flags & DBCOMPACT_NO_RENUMBER);
185
186 enum { STUB_NO, STUB_FILE, STUB_DIR } compact_to_stub = STUB_NO;
187 string destdir;
188 if (output_ptr) {
189 // We need a modifiable destdir in this function.
190 destdir = *output_ptr;
191 if (!(flags & DBCOMPACT_SINGLE_FILE)) {
192 if (file_exists(destdir)) {
193 // Stub file.
194 compact_to_stub = STUB_FILE;
195 } else if (file_exists(destdir + "/XAPIANDB")) {
196 // Stub directory.
197 compact_to_stub = STUB_DIR;
198 }
199 }
200 } else {
201 // Single file is implied when writing to a file descriptor.
202 flags |= DBCOMPACT_SINGLE_FILE;
203 }
204
205 int backend = BACKEND_UNKNOWN;
206 for (const auto& it : internal) {
207 string srcdir;
208 int type = it->get_backend_info(&srcdir);
209 // Check destdir isn't the same as any source directory, unless it
210 // is a stub database or we're compacting to an fd.
211 if (!compact_to_stub && !destdir.empty() && srcdir == destdir)
212 throw Xapian::InvalidArgumentError("destination may not be the same as any source database, unless it is a stub database");
213 switch (type) {
214 case BACKEND_CHERT:
215 case BACKEND_GLASS:
216 if (backend != type && backend != BACKEND_UNKNOWN) {
217 backend_mismatch(*this, backend, srcdir, type);
218 }
219 backend = type;
220 break;
221 default:
222 throw Xapian::DatabaseError("Only chert and glass databases can be compacted");
223 }
224 }
225
226 Xapian::docid tot_off = 0;
227 Xapian::docid last_docid = 0;
228
229 vector<Xapian::docid> offset;
230 vector<pair<Xapian::docid, Xapian::docid> > used_ranges;
231 vector<Xapian::Database::Internal *> internals;
232 offset.reserve(internal.size());
233 used_ranges.reserve(internal.size());
234 internals.reserve(internal.size());
235
236 for (const auto& i : internal) {
237 Xapian::Database::Internal * db = i.get();
238 internals.push_back(db);
239
240 Xapian::docid first = 0, last = 0;
241
242 // "Empty" databases might have spelling or synonym data so can't
243 // just be completely ignored.
244 Xapian::doccount num_docs = db->get_doccount();
245 if (num_docs != 0) {
246 db->get_used_docid_range(first, last);
247
248 if (renumber && first) {
249 // Prune any unused docids off the start of this source
250 // database.
251 //
252 // tot_off could wrap here, but it's unsigned, so that's
253 // OK.
254 tot_off -= (first - 1);
255 }
256
257 #ifdef XAPIAN_ASSERTIONS
258 LeafPostList * pl = db->open_post_list(string());
259 pl->next();
260 // This test should never fail, since db->get_doccount() is
261 // non-zero!
262 Assert(!pl->at_end());
263 AssertEq(pl->get_docid(), first);
264 AssertRel(last,>=,first);
265 pl->skip_to(last);
266 Assert(!pl->at_end());
267 AssertEq(pl->get_docid(), last);
268 pl->next();
269 Assert(pl->at_end());
270 delete pl;
271 #endif
272 }
273
274 offset.push_back(tot_off);
275 if (renumber)
276 tot_off += last;
277 else if (last_docid < db->get_lastdocid())
278 last_docid = db->get_lastdocid();
279 used_ranges.push_back(make_pair(first, last));
280 }
281
282 if (renumber)
283 last_docid = tot_off;
284
285 if (!renumber && internal.size() > 1) {
286 // We want to process the sources in ascending order of first
287 // docid. So we create a vector "order" with ascending integers
288 // and then sort so the indirected order is right. Then we reorder
289 // the vectors into that order and check the ranges are disjoint.
290 vector<size_t> order;
291 order.reserve(internal.size());
292 for (size_t i = 0; i < internal.size(); ++i)
293 order.push_back(i);
294
295 sort(order.begin(), order.end(), CmpByFirstUsed(used_ranges));
296
297 // Reorder the vectors to be in ascending of first docid, and
298 // set all the offsets to 0.
299 vector<Xapian::Database::Internal *> internals_;
300 internals_.reserve(internal.size());
301 vector<pair<Xapian::docid, Xapian::docid>> used_ranges_;
302 used_ranges_.reserve(internal.size());
303
304 Xapian::docid last_start = 0, last_end = 0;
305 for (size_t j = 0; j != order.size(); ++j) {
306 size_t n = order[j];
307
308 internals_.push_back(internals[n]);
309 used_ranges_.push_back(used_ranges[n]);
310
311 const pair<Xapian::docid, Xapian::docid> p = used_ranges[n];
312 // Skip empty databases.
313 if (p.first == 0 && p.second == 0)
314 continue;
315 // Check for overlap with the previous database's range.
316 if (p.first <= last_end) {
317 string tmp;
318 string msg = "when merging databases, --no-renumber is only currently supported if the databases have disjoint ranges of used document ids: ";
319 internals_[j - 1]->get_backend_info(&tmp);
320 msg += tmp;
321 msg += " has range ";
322 msg += str(last_start);
323 msg += '-';
324 msg += str(last_end);
325 msg += ", ";
326 internals_[j]->get_backend_info(&tmp);
327 msg += tmp;
328 msg += " has range ";
329 msg += str(p.first);
330 msg += '-';
331 msg += str(p.second);
332 throw Xapian::InvalidOperationError(msg);
333 }
334 last_start = p.first;
335 last_end = p.second;
336 }
337
338 swap(internals, internals_);
339 swap(used_ranges, used_ranges_);
340 }
341
342 string stub_file;
343 if (compact_to_stub) {
344 stub_file = destdir;
345 if (compact_to_stub == STUB_DIR) {
346 stub_file += "/XAPIANDB";
347 destdir += '/';
348 } else {
349 destdir += '_';
350 }
351 size_t sfx = destdir.size();
352 time_t now = time(NULL);
353 while (true) {
354 destdir.resize(sfx);
355 destdir += str(now++);
356 if (mkdir(destdir.c_str(), 0755) == 0)
357 break;
358 if (errno != EEXIST) {
359 string msg = destdir;
360 msg += ": mkdir failed";
361 throw Xapian::DatabaseError(msg, errno);
362 }
363 }
364 } else if (!(flags & Xapian::DBCOMPACT_SINGLE_FILE)) {
365 // If the destination database directory doesn't exist, create it.
366 if (mkdir(destdir.c_str(), 0755) < 0) {
367 // Check why mkdir failed. It's ok if the directory already
368 // exists, but we also get EEXIST if there's an existing file with
369 // that name.
370 int mkdir_errno = errno;
371 if (mkdir_errno != EEXIST || !dir_exists(destdir)) {
372 string msg = destdir;
373 msg += ": cannot create directory";
374 throw Xapian::DatabaseError(msg, mkdir_errno);
375 }
376 }
377 }
378
379 #if defined XAPIAN_HAS_CHERT_BACKEND || defined XAPIAN_HAS_GLASS_BACKEND
380 Xapian::Compactor::compaction_level compaction =
381 static_cast<Xapian::Compactor::compaction_level>(flags & (Xapian::Compactor::STANDARD|Xapian::Compactor::FULL|Xapian::Compactor::FULLER));
382 #else
383 (void)compactor;
384 (void)block_size;
385 #endif
386
387 if (backend == BACKEND_CHERT) {
388 #ifdef XAPIAN_HAS_CHERT_BACKEND
389 ChertDatabase::compact(compactor, destdir.c_str(), internals, offset,
390 block_size, compaction, flags, last_docid);
391
392 // Create the version file ("iamchert").
393 //
394 // This file contains a UUID, and we want the copy to have a fresh
395 // UUID since its revision counter is reset to 1.
396 ChertVersion(destdir).create();
397 #else
398 (void)last_docid;
399 throw Xapian::FeatureUnavailableError("Chert backend disabled at build time");
400 #endif
401 } else if (backend == BACKEND_GLASS) {
402 #ifdef XAPIAN_HAS_GLASS_BACKEND
403 if (output_ptr) {
404 GlassDatabase::compact(compactor, destdir.c_str(), 0,
405 internals, offset,
406 block_size, compaction, flags, last_docid);
407 } else {
408 GlassDatabase::compact(compactor, NULL, fd,
409 internals, offset,
410 block_size, compaction, flags, last_docid);
411 }
412 #else
413 (void)fd;
414 (void)last_docid;
415 throw Xapian::FeatureUnavailableError("Glass backend disabled at build time");
416 #endif
417 }
418
419 if (compact_to_stub) {
420 string new_stub_file = destdir;
421 new_stub_file += "/new_stub.tmp";
422 {
423 ofstream new_stub(new_stub_file.c_str());
424 size_t slash = destdir.find_last_of(DIR_SEPS);
425 new_stub << "auto " << destdir.substr(slash + 1) << '\n';
426 }
427 if (!io_tmp_rename(new_stub_file, stub_file)) {
428 string msg = "Cannot rename '";
429 msg += new_stub_file;
430 msg += "' to '";
431 msg += stub_file;
432 msg += '\'';
433 throw Xapian::DatabaseError(msg, errno);
434 }
435 }
436 }
437
438 }
439