1 /** @file
2  * @brief Compact a database, or merge and compact several.
3  */
4 /* Copyright (C) 2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2015,2016 Olly Betts
5  * Copyright (C) 2008 Lemur Consulting Ltd
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
20  * USA
21  */
22 
23 #include <config.h>
24 
25 #include <xapian/compactor.h>
26 
27 #include <algorithm>
28 #include <fstream>
29 #include <vector>
30 
31 #include <cerrno>
32 #include <cstring>
33 #include <ctime>
34 #include "safesysstat.h"
35 #include <sys/types.h>
36 
37 #include "safeunistd.h"
38 #include "safefcntl.h"
39 
40 #include "backends/backends.h"
41 #include "backends/database.h"
42 #include "debuglog.h"
43 #include "leafpostlist.h"
44 #include "noreturn.h"
45 #include "omassert.h"
46 #include "filetests.h"
47 #include "fileutils.h"
48 #include "io_utils.h"
49 #include "stringutils.h"
50 #include "str.h"
51 
52 #ifdef XAPIAN_HAS_GLASS_BACKEND
53 #include "backends/glass/glass_database.h"
54 #include "backends/glass/glass_version.h"
55 #endif
56 #ifdef XAPIAN_HAS_CHERT_BACKEND
57 #include "backends/chert/chert_database.h"
58 #include "backends/chert/chert_version.h"
59 #endif
60 
61 #include <xapian/constants.h>
62 #include <xapian/database.h>
63 #include <xapian/error.h>
64 
65 using namespace std;
66 
67 class CmpByFirstUsed {
68     const vector<pair<Xapian::docid, Xapian::docid>>& used_ranges;
69 
70   public:
71     explicit
CmpByFirstUsed(const vector<pair<Xapian::docid,Xapian::docid>> & ur)72     CmpByFirstUsed(const vector<pair<Xapian::docid, Xapian::docid>>& ur)
73 	: used_ranges(ur) { }
74 
operator ()(size_t a,size_t b) const75     bool operator()(size_t a, size_t b) const {
76 	return used_ranges[a].first < used_ranges[b].first;
77     }
78 };
79 
80 namespace Xapian {
81 
82 class Compactor::Internal : public Xapian::Internal::intrusive_base {
83     friend class Compactor;
84 
85     string destdir_compat;
86     size_t block_size;
87     unsigned flags;
88 
89     vector<string> srcdirs_compat;
90 
91   public:
Internal()92     Internal() : block_size(8192), flags(FULL) { }
93 };
94 
Compactor()95 Compactor::Compactor() : internal(new Compactor::Internal()) { }
96 
~Compactor()97 Compactor::~Compactor() { }
98 
99 void
set_block_size(size_t block_size)100 Compactor::set_block_size(size_t block_size)
101 {
102     internal->block_size = block_size;
103 }
104 
105 void
set_flags_(unsigned flags,unsigned mask)106 Compactor::set_flags_(unsigned flags, unsigned mask)
107 {
108     internal->flags = (internal->flags & mask) | flags;
109 }
110 
111 void
set_destdir(const string & destdir)112 Compactor::set_destdir(const string & destdir)
113 {
114     internal->destdir_compat = destdir;
115 }
116 
117 void
add_source(const string & srcdir)118 Compactor::add_source(const string & srcdir)
119 {
120     internal->srcdirs_compat.push_back(srcdir);
121 }
122 
123 void
compact()124 Compactor::compact()
125 {
126     Xapian::Database src;
127     for (auto srcdir : internal->srcdirs_compat) {
128 	src.add_database(Xapian::Database(srcdir));
129     }
130     src.compact(internal->destdir_compat, internal->flags,
131 		internal->block_size, *this);
132 }
133 
134 void
set_status(const string & table,const string & status)135 Compactor::set_status(const string & table, const string & status)
136 {
137     (void)table;
138     (void)status;
139 }
140 
141 string
resolve_duplicate_metadata(const string & key,size_t num_tags,const std::string tags[])142 Compactor::resolve_duplicate_metadata(const string & key,
143 				      size_t num_tags, const std::string tags[])
144 {
145     (void)key;
146     (void)num_tags;
147     return tags[0];
148 }
149 
150 }
151 
152 XAPIAN_NORETURN(
153     static void
154     backend_mismatch(const Xapian::Database & db, int backend1,
155 		     const string &dbpath2, int backend2)
156 );
157 static void
backend_mismatch(const Xapian::Database & db,int backend1,const string & dbpath2,int backend2)158 backend_mismatch(const Xapian::Database & db, int backend1,
159 		 const string &dbpath2, int backend2)
160 {
161     string dbpath1;
162     db.internal[0]->get_backend_info(&dbpath1);
163     string msg = "All databases must be the same type ('";
164     msg += dbpath1;
165     msg += "' is ";
166     msg += backend_name(backend1);
167     msg += ", but '";
168     msg += dbpath2;
169     msg += "' is ";
170     msg += backend_name(backend2);
171     msg += ')';
172     throw Xapian::InvalidArgumentError(msg);
173 }
174 
175 namespace Xapian {
176 
177 void
compact_(const string * output_ptr,int fd,unsigned flags,int block_size,Xapian::Compactor * compactor) const178 Database::compact_(const string * output_ptr, int fd, unsigned flags,
179 		   int block_size,
180 		   Xapian::Compactor * compactor) const
181 {
182     LOGCALL_VOID(API, "Database::compact_", output_ptr | fd | flags | block_size | compactor);
183 
184     bool renumber = !(flags & DBCOMPACT_NO_RENUMBER);
185 
186     enum { STUB_NO, STUB_FILE, STUB_DIR } compact_to_stub = STUB_NO;
187     string destdir;
188     if (output_ptr) {
189 	// We need a modifiable destdir in this function.
190 	destdir = *output_ptr;
191 	if (!(flags & DBCOMPACT_SINGLE_FILE)) {
192 	    if (file_exists(destdir)) {
193 		// Stub file.
194 		compact_to_stub = STUB_FILE;
195 	    } else if (file_exists(destdir + "/XAPIANDB")) {
196 		// Stub directory.
197 		compact_to_stub = STUB_DIR;
198 	    }
199 	}
200     } else {
201 	// Single file is implied when writing to a file descriptor.
202 	flags |= DBCOMPACT_SINGLE_FILE;
203     }
204 
205     int backend = BACKEND_UNKNOWN;
206     for (const auto& it : internal) {
207 	string srcdir;
208 	int type = it->get_backend_info(&srcdir);
209 	// Check destdir isn't the same as any source directory, unless it
210 	// is a stub database or we're compacting to an fd.
211 	if (!compact_to_stub && !destdir.empty() && srcdir == destdir)
212 	    throw Xapian::InvalidArgumentError("destination may not be the same as any source database, unless it is a stub database");
213 	switch (type) {
214 	    case BACKEND_CHERT:
215 	    case BACKEND_GLASS:
216 		if (backend != type && backend != BACKEND_UNKNOWN) {
217 		    backend_mismatch(*this, backend, srcdir, type);
218 		}
219 		backend = type;
220 		break;
221 	    default:
222 		throw Xapian::DatabaseError("Only chert and glass databases can be compacted");
223 	}
224     }
225 
226     Xapian::docid tot_off = 0;
227     Xapian::docid last_docid = 0;
228 
229     vector<Xapian::docid> offset;
230     vector<pair<Xapian::docid, Xapian::docid> > used_ranges;
231     vector<Xapian::Database::Internal *> internals;
232     offset.reserve(internal.size());
233     used_ranges.reserve(internal.size());
234     internals.reserve(internal.size());
235 
236     for (const auto& i : internal) {
237 	Xapian::Database::Internal * db = i.get();
238 	internals.push_back(db);
239 
240 	Xapian::docid first = 0, last = 0;
241 
242 	// "Empty" databases might have spelling or synonym data so can't
243 	// just be completely ignored.
244 	Xapian::doccount num_docs = db->get_doccount();
245 	if (num_docs != 0) {
246 	    db->get_used_docid_range(first, last);
247 
248 	    if (renumber && first) {
249 		// Prune any unused docids off the start of this source
250 		// database.
251 		//
252 		// tot_off could wrap here, but it's unsigned, so that's
253 		// OK.
254 		tot_off -= (first - 1);
255 	    }
256 
257 #ifdef XAPIAN_ASSERTIONS
258 	    LeafPostList * pl = db->open_post_list(string());
259 	    pl->next();
260 	    // This test should never fail, since db->get_doccount() is
261 	    // non-zero!
262 	    Assert(!pl->at_end());
263 	    AssertEq(pl->get_docid(), first);
264 	    AssertRel(last,>=,first);
265 	    pl->skip_to(last);
266 	    Assert(!pl->at_end());
267 	    AssertEq(pl->get_docid(), last);
268 	    pl->next();
269 	    Assert(pl->at_end());
270 	    delete pl;
271 #endif
272 	}
273 
274 	offset.push_back(tot_off);
275 	if (renumber)
276 	    tot_off += last;
277 	else if (last_docid < db->get_lastdocid())
278 	    last_docid = db->get_lastdocid();
279 	used_ranges.push_back(make_pair(first, last));
280     }
281 
282     if (renumber)
283 	last_docid = tot_off;
284 
285     if (!renumber && internal.size() > 1) {
286 	// We want to process the sources in ascending order of first
287 	// docid.  So we create a vector "order" with ascending integers
288 	// and then sort so the indirected order is right.  Then we reorder
289 	// the vectors into that order and check the ranges are disjoint.
290 	vector<size_t> order;
291 	order.reserve(internal.size());
292 	for (size_t i = 0; i < internal.size(); ++i)
293 	    order.push_back(i);
294 
295 	sort(order.begin(), order.end(), CmpByFirstUsed(used_ranges));
296 
297 	// Reorder the vectors to be in ascending of first docid, and
298 	// set all the offsets to 0.
299 	vector<Xapian::Database::Internal *> internals_;
300 	internals_.reserve(internal.size());
301 	vector<pair<Xapian::docid, Xapian::docid>> used_ranges_;
302 	used_ranges_.reserve(internal.size());
303 
304 	Xapian::docid last_start = 0, last_end = 0;
305 	for (size_t j = 0; j != order.size(); ++j) {
306 	    size_t n = order[j];
307 
308 	    internals_.push_back(internals[n]);
309 	    used_ranges_.push_back(used_ranges[n]);
310 
311 	    const pair<Xapian::docid, Xapian::docid> p = used_ranges[n];
312 	    // Skip empty databases.
313 	    if (p.first == 0 && p.second == 0)
314 		continue;
315 	    // Check for overlap with the previous database's range.
316 	    if (p.first <= last_end) {
317 		string tmp;
318 		string msg = "when merging databases, --no-renumber is only currently supported if the databases have disjoint ranges of used document ids: ";
319 		internals_[j - 1]->get_backend_info(&tmp);
320 		msg += tmp;
321 		msg += " has range ";
322 		msg += str(last_start);
323 		msg += '-';
324 		msg += str(last_end);
325 		msg += ", ";
326 		internals_[j]->get_backend_info(&tmp);
327 		msg += tmp;
328 		msg += " has range ";
329 		msg += str(p.first);
330 		msg += '-';
331 		msg += str(p.second);
332 		throw Xapian::InvalidOperationError(msg);
333 	    }
334 	    last_start = p.first;
335 	    last_end = p.second;
336 	}
337 
338 	swap(internals, internals_);
339 	swap(used_ranges, used_ranges_);
340     }
341 
342     string stub_file;
343     if (compact_to_stub) {
344 	stub_file = destdir;
345 	if (compact_to_stub == STUB_DIR) {
346 	    stub_file += "/XAPIANDB";
347 	    destdir += '/';
348 	} else {
349 	    destdir += '_';
350 	}
351 	size_t sfx = destdir.size();
352 	time_t now = time(NULL);
353 	while (true) {
354 	    destdir.resize(sfx);
355 	    destdir += str(now++);
356 	    if (mkdir(destdir.c_str(), 0755) == 0)
357 		break;
358 	    if (errno != EEXIST) {
359 		string msg = destdir;
360 		msg += ": mkdir failed";
361 		throw Xapian::DatabaseError(msg, errno);
362 	    }
363 	}
364     } else if (!(flags & Xapian::DBCOMPACT_SINGLE_FILE)) {
365 	// If the destination database directory doesn't exist, create it.
366 	if (mkdir(destdir.c_str(), 0755) < 0) {
367 	    // Check why mkdir failed.  It's ok if the directory already
368 	    // exists, but we also get EEXIST if there's an existing file with
369 	    // that name.
370 	    int mkdir_errno = errno;
371 	    if (mkdir_errno != EEXIST || !dir_exists(destdir)) {
372 		string msg = destdir;
373 		msg += ": cannot create directory";
374 		throw Xapian::DatabaseError(msg, mkdir_errno);
375 	    }
376 	}
377     }
378 
379 #if defined XAPIAN_HAS_CHERT_BACKEND || defined XAPIAN_HAS_GLASS_BACKEND
380     Xapian::Compactor::compaction_level compaction =
381 	static_cast<Xapian::Compactor::compaction_level>(flags & (Xapian::Compactor::STANDARD|Xapian::Compactor::FULL|Xapian::Compactor::FULLER));
382 #else
383     (void)compactor;
384     (void)block_size;
385 #endif
386 
387     if (backend == BACKEND_CHERT) {
388 #ifdef XAPIAN_HAS_CHERT_BACKEND
389 	ChertDatabase::compact(compactor, destdir.c_str(), internals, offset,
390 			       block_size, compaction, flags, last_docid);
391 
392 	// Create the version file ("iamchert").
393 	//
394 	// This file contains a UUID, and we want the copy to have a fresh
395 	// UUID since its revision counter is reset to 1.
396 	ChertVersion(destdir).create();
397 #else
398 	(void)last_docid;
399 	throw Xapian::FeatureUnavailableError("Chert backend disabled at build time");
400 #endif
401     } else if (backend == BACKEND_GLASS) {
402 #ifdef XAPIAN_HAS_GLASS_BACKEND
403 	if (output_ptr) {
404 	    GlassDatabase::compact(compactor, destdir.c_str(), 0,
405 				   internals, offset,
406 				   block_size, compaction, flags, last_docid);
407 	} else {
408 	    GlassDatabase::compact(compactor, NULL, fd,
409 				   internals, offset,
410 				   block_size, compaction, flags, last_docid);
411 	}
412 #else
413 	(void)fd;
414 	(void)last_docid;
415 	throw Xapian::FeatureUnavailableError("Glass backend disabled at build time");
416 #endif
417     }
418 
419     if (compact_to_stub) {
420 	string new_stub_file = destdir;
421 	new_stub_file += "/new_stub.tmp";
422 	{
423 	    ofstream new_stub(new_stub_file.c_str());
424 	    size_t slash = destdir.find_last_of(DIR_SEPS);
425 	    new_stub << "auto " << destdir.substr(slash + 1) << '\n';
426 	}
427 	if (!io_tmp_rename(new_stub_file, stub_file)) {
428 	    string msg = "Cannot rename '";
429 	    msg += new_stub_file;
430 	    msg += "' to '";
431 	    msg += stub_file;
432 	    msg += '\'';
433 	    throw Xapian::DatabaseError(msg, errno);
434 	}
435     }
436 }
437 
438 }
439