1 /* chert_database.cc: chert database
2  *
3  * Copyright 1999,2000,2001 BrightStation PLC
4  * Copyright 2001 Hein Ragas
5  * Copyright 2002 Ananova Ltd
6  * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016 Olly Betts
7  * Copyright 2006,2008 Lemur Consulting Ltd
8  * Copyright 2009,2010 Richard Boulton
9  * Copyright 2009 Kan-Ru Chen
10  * Copyright 2011 Dan Colish
11  *
12  * This program is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU General Public License as
14  * published by the Free Software Foundation; either version 2 of the
15  * License, or (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
25  * USA
26  */
27 
28 #include <config.h>
29 
30 #include "chert_database.h"
31 
32 #include "xapian/constants.h"
33 #include "xapian/error.h"
34 #include "xapian/valueiterator.h"
35 
36 #include "backends/contiguousalldocspostlist.h"
37 #include "chert_alldocsmodifiedpostlist.h"
38 #include "chert_alldocspostlist.h"
39 #include "chert_alltermslist.h"
40 #include "chert_replicate_internal.h"
41 #include "chert_document.h"
42 #include "../flint_lock.h"
43 #include "chert_metadata.h"
44 #include "chert_modifiedpostlist.h"
45 #include "chert_positionlist.h"
46 #include "chert_postlist.h"
47 #include "chert_record.h"
48 #include "chert_spellingwordslist.h"
49 #include "chert_termlist.h"
50 #include "chert_valuelist.h"
51 #include "chert_values.h"
52 #include "debuglog.h"
53 #include "fd.h"
54 #include "io_utils.h"
55 #include "pack.h"
56 #include "posixy_wrapper.h"
57 #include "net/remoteconnection.h"
58 #include "replicate_utils.h"
59 #include "api/replication.h"
60 #include "replicationprotocol.h"
61 #include "net/length.h"
62 #include "str.h"
63 #include "stringutils.h"
64 #include "backends/valuestats.h"
65 
66 #include "safesysstat.h"
67 #include <sys/types.h>
68 
69 #include <algorithm>
70 #include "autoptr.h"
71 #include <cerrno>
72 #include <cstdlib>
73 #include <string>
74 
75 using namespace std;
76 using namespace Xapian;
77 using Xapian::Internal::intrusive_ptr;
78 
79 // The maximum safe term length is determined by the postlist.  There we
80 // store the term using pack_string_preserving_sort() which takes the
81 // length of the string plus an extra byte (assuming the string doesn't
82 // contain any zero bytes), followed by the docid with encoded with
83 // C_pack_uint_preserving_sort() which takes up to 5 bytes.
84 //
85 // The Btree manager's key length limit is 252 bytes so the maximum safe term
86 // length is 252 - 1 - 5 = 246 bytes.  We use 245 rather than 246 for
87 // consistency with flint.
88 //
89 // If the term contains zero bytes, the limit is lower (by one for each zero
90 // byte in the term).
91 #define MAX_SAFE_TERM_LENGTH 245
92 
93 /** Maximum number of times to try opening the tables to get them at a
94  *  consistent revision.
95  *
96  *  This is mostly just to avoid any chance of an infinite loop - normally
97  *  we'll either get then on the first or second try.
98  */
99 const int MAX_OPEN_RETRIES = 100;
100 
101 /* This finds the tables, opens them at consistent revisions, manages
102  * determining the current and next revision numbers, and stores handles
103  * to the tables.
104  */
ChertDatabase(const string & chert_dir,int flags,unsigned int block_size)105 ChertDatabase::ChertDatabase(const string &chert_dir, int flags,
106 			     unsigned int block_size)
107 	: db_dir(chert_dir),
108 	  readonly(flags == Xapian::DB_READONLY_),
109 	  version_file(db_dir),
110 	  postlist_table(db_dir, readonly),
111 	  position_table(db_dir, readonly),
112 	  termlist_table(db_dir, readonly),
113 	  value_manager(&postlist_table, &termlist_table),
114 	  synonym_table(db_dir, readonly),
115 	  spelling_table(db_dir, readonly),
116 	  record_table(db_dir, readonly),
117 	  lock(db_dir),
118 	  max_changesets(0)
119 {
120     LOGCALL_CTOR(DB, "ChertDatabase", chert_dir | flags | block_size);
121 
122     if (readonly) {
123 	open_tables_consistent();
124 	return;
125     }
126 
127     int action = flags & Xapian::DB_ACTION_MASK_;
128     if (action != Xapian::DB_OPEN && !database_exists()) {
129 
130 	// Create the directory for the database, if it doesn't exist
131 	// already.
132 	bool fail = false;
133 	struct stat statbuf;
134 	if (stat(db_dir.c_str(), &statbuf) == 0) {
135 	    if (!S_ISDIR(statbuf.st_mode)) fail = true;
136 	} else if (errno != ENOENT || mkdir(db_dir.c_str(), 0755) == -1) {
137 	    fail = true;
138 	}
139 	if (fail) {
140 	    throw Xapian::DatabaseCreateError("Cannot create directory '" +
141 					      db_dir + "'", errno);
142 	}
143 	get_database_write_lock(flags, true);
144 
145 	create_and_open_tables(block_size);
146 	return;
147     }
148 
149     if (action == Xapian::DB_CREATE) {
150 	throw Xapian::DatabaseCreateError("Can't create new database at '" +
151 					  db_dir + "': a database already exists and I was told "
152 					  "not to overwrite it");
153     }
154 
155     get_database_write_lock(flags, false);
156     // if we're overwriting, pretend the db doesn't exist
157     if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
158 	create_and_open_tables(block_size);
159 	return;
160     }
161 
162     // Get latest consistent version
163     open_tables_consistent();
164 
165     // Check that there are no more recent versions of tables.  If there
166     // are, perform recovery by writing a new revision number to all
167     // tables.
168     if (record_table.get_open_revision_number() !=
169 	postlist_table.get_latest_revision_number()) {
170 	chert_revision_number_t new_revision = get_next_revision_number();
171 
172 	set_revision_number(new_revision);
173     }
174 }
175 
~ChertDatabase()176 ChertDatabase::~ChertDatabase()
177 {
178     LOGCALL_DTOR(DB, "ChertDatabase");
179 }
180 
181 bool
database_exists()182 ChertDatabase::database_exists() {
183     LOGCALL(DB, bool, "ChertDatabase::database_exists", NO_ARGS);
184     RETURN(record_table.exists() && postlist_table.exists());
185 }
186 
187 void
create_and_open_tables(unsigned int block_size)188 ChertDatabase::create_and_open_tables(unsigned int block_size)
189 {
190     LOGCALL_VOID(DB, "ChertDatabase::create_and_open_tables", NO_ARGS);
191     // The caller is expected to create the database directory if it doesn't
192     // already exist.
193 
194     // Create postlist_table first, and record_table last.  Existence of
195     // record_table is considered to imply existence of the database.
196     version_file.create();
197     postlist_table.create_and_open(block_size);
198     position_table.create_and_open(block_size);
199     termlist_table.create_and_open(block_size);
200     synonym_table.create_and_open(block_size);
201     spelling_table.create_and_open(block_size);
202     record_table.create_and_open(block_size);
203 
204     Assert(database_exists());
205 
206     // Check consistency
207     chert_revision_number_t revision = record_table.get_open_revision_number();
208     if (revision != postlist_table.get_open_revision_number()) {
209 	throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
210     }
211 
212     stats.zero();
213 }
214 
215 bool
open_tables_consistent()216 ChertDatabase::open_tables_consistent()
217 {
218     LOGCALL(DB, bool, "ChertDatabase::open_tables_consistent", NO_ARGS);
219     // Open record_table first, since it's the last to be written to,
220     // and hence if a revision is available in it, it should be available
221     // in all the other tables (unless they've moved on already).
222     //
223     // If we find that a table can't open the desired revision, we
224     // go back and open record_table again, until record_table has
225     // the same revision as the last time we opened it.
226 
227     chert_revision_number_t cur_rev = record_table.get_open_revision_number();
228 
229     // Check the version file unless we're reopening.
230     if (cur_rev == 0) version_file.read_and_check();
231 
232     record_table.open();
233     chert_revision_number_t revision = record_table.get_open_revision_number();
234 
235     if (cur_rev && cur_rev == revision) {
236 	// We're reopening a database and the revision hasn't changed so we
237 	// don't need to do anything.
238 	RETURN(false);
239     }
240 
241     // Set the block_size for optional tables as they may not currently exist.
242     unsigned int block_size = record_table.get_block_size();
243     position_table.set_block_size(block_size);
244     termlist_table.set_block_size(block_size);
245     synonym_table.set_block_size(block_size);
246     spelling_table.set_block_size(block_size);
247 
248     value_manager.reset();
249 
250     bool fully_opened = false;
251     int tries_left = MAX_OPEN_RETRIES;
252     while (!fully_opened && (tries_left--) > 0) {
253 	if (spelling_table.open(revision) &&
254 	    synonym_table.open(revision) &&
255 	    termlist_table.open(revision) &&
256 	    position_table.open(revision) &&
257 	    postlist_table.open(revision)) {
258 	    // Everything now open at the same revision.
259 	    fully_opened = true;
260 	} else {
261 	    // Couldn't open consistent revision: two cases possible:
262 	    // i)   An update has completed and a second one has begun since
263 	    //      record was opened.  This leaves a consistent revision
264 	    //      available, but not the one we were trying to open.
265 	    // ii)  Tables have become corrupt / have no consistent revision
266 	    //      available.  In this case, updates must have ceased.
267 	    //
268 	    // So, we reopen the record table, and check its revision number,
269 	    // if it's changed we try the opening again, otherwise we give up.
270 	    //
271 	    record_table.open();
272 	    chert_revision_number_t newrevision =
273 		    record_table.get_open_revision_number();
274 	    if (revision == newrevision) {
275 		// Revision number hasn't changed - therefore a second index
276 		// sweep hasn't begun and the system must have failed.  Database
277 		// is inconsistent.
278 		throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
279 	    }
280 	    revision = newrevision;
281 	}
282     }
283 
284     if (!fully_opened) {
285 	throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
286     }
287 
288     stats.read(postlist_table);
289     return true;
290 }
291 
292 void
open_tables(chert_revision_number_t revision)293 ChertDatabase::open_tables(chert_revision_number_t revision)
294 {
295     LOGCALL_VOID(DB, "ChertDatabase::open_tables", revision);
296     version_file.read_and_check();
297     record_table.open(revision);
298 
299     // Set the block_size for optional tables as they may not currently exist.
300     unsigned int block_size = record_table.get_block_size();
301     position_table.set_block_size(block_size);
302     termlist_table.set_block_size(block_size);
303     synonym_table.set_block_size(block_size);
304     spelling_table.set_block_size(block_size);
305 
306     value_manager.reset();
307 
308     spelling_table.open(revision);
309     synonym_table.open(revision);
310     termlist_table.open(revision);
311     position_table.open(revision);
312     postlist_table.open(revision);
313 }
314 
315 chert_revision_number_t
get_revision_number() const316 ChertDatabase::get_revision_number() const
317 {
318     LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_revision_number", NO_ARGS);
319     // We could use any table here, theoretically.
320     RETURN(postlist_table.get_open_revision_number());
321 }
322 
323 chert_revision_number_t
get_next_revision_number() const324 ChertDatabase::get_next_revision_number() const
325 {
326     LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_next_revision_number", NO_ARGS);
327     /* We _must_ use postlist_table here, since it is always the first
328      * to be written, and hence will have the greatest available revision
329      * number.
330      */
331     chert_revision_number_t new_revision =
332 	    postlist_table.get_latest_revision_number();
333     ++new_revision;
334     RETURN(new_revision);
335 }
336 
337 void
get_changeset_revisions(const string & path,chert_revision_number_t * startrev,chert_revision_number_t * endrev) const338 ChertDatabase::get_changeset_revisions(const string & path,
339 				       chert_revision_number_t * startrev,
340 				       chert_revision_number_t * endrev) const
341 {
342     FD changes_fd(posixy_open(path.c_str(), O_RDONLY | O_CLOEXEC));
343     if (changes_fd < 0) {
344 	string message = string("Couldn't open changeset ")
345 		+ path + " to read";
346 	throw Xapian::DatabaseError(message, errno);
347     }
348 
349     char buf[REASONABLE_CHANGESET_SIZE];
350     const char *start = buf;
351     const char *end = buf + io_read(changes_fd, buf, REASONABLE_CHANGESET_SIZE);
352     if (size_t(end - start) < CONST_STRLEN(CHANGES_MAGIC_STRING))
353 	throw Xapian::DatabaseError("Changeset too short at " + path);
354     if (memcmp(start, CHANGES_MAGIC_STRING,
355 	       CONST_STRLEN(CHANGES_MAGIC_STRING)) != 0) {
356 	string message = string("Changeset at ")
357 		+ path + " does not contain valid magic string";
358 	throw Xapian::DatabaseError(message);
359     }
360     start += CONST_STRLEN(CHANGES_MAGIC_STRING);
361 
362     unsigned int changes_version;
363     if (!unpack_uint(&start, end, &changes_version))
364 	throw Xapian::DatabaseError("Couldn't read a valid version number for "
365 				    "changeset at " + path);
366     if (changes_version != CHANGES_VERSION)
367 	throw Xapian::DatabaseError("Don't support version of changeset at "
368 				    + path);
369 
370     if (!unpack_uint(&start, end, startrev))
371 	throw Xapian::DatabaseError("Couldn't read a valid start revision from "
372 				    "changeset at " + path);
373 
374     if (!unpack_uint(&start, end, endrev))
375 	throw Xapian::DatabaseError("Couldn't read a valid end revision for "
376 				    "changeset at " + path);
377 }
378 
379 void
set_revision_number(chert_revision_number_t new_revision)380 ChertDatabase::set_revision_number(chert_revision_number_t new_revision)
381 {
382     LOGCALL_VOID(DB, "ChertDatabase::set_revision_number", new_revision);
383 
384     value_manager.merge_changes();
385 
386     postlist_table.flush_db();
387     position_table.flush_db();
388     termlist_table.flush_db();
389     synonym_table.flush_db();
390     spelling_table.flush_db();
391     record_table.flush_db();
392 
393     int changes_fd = -1;
394     string changes_name;
395 
396     const char *p = getenv("XAPIAN_MAX_CHANGESETS");
397     if (p) {
398 	max_changesets = atoi(p);
399     } else {
400 	max_changesets = 0;
401     }
402 
403     if (max_changesets > 0) {
404 	chert_revision_number_t old_revision = get_revision_number();
405 	if (old_revision) {
406 	    // Don't generate a changeset for the first revision.
407 	    changes_fd = create_changeset_file(db_dir,
408 					       "/changes" + str(old_revision),
409 					       changes_name);
410 	}
411     }
412 
413     try {
414 	FD closefd(changes_fd);
415 	if (changes_fd >= 0) {
416 	    string buf;
417 	    chert_revision_number_t old_revision = get_revision_number();
418 	    buf += CHANGES_MAGIC_STRING;
419 	    pack_uint(buf, CHANGES_VERSION);
420 	    pack_uint(buf, old_revision);
421 	    pack_uint(buf, new_revision);
422 
423 #ifndef DANGEROUS
424 	    buf += '\x00'; // Changes can be applied to a live database.
425 #else
426 	    buf += '\x01';
427 #endif
428 
429 	    io_write(changes_fd, buf.data(), buf.size());
430 
431 	    // Write the changes to the blocks in the tables.  Do the postlist
432 	    // table last, so that ends up cached the most, if the cache
433 	    // available is limited.  Do the position table just before that
434 	    // as having that cached will also improve search performance.
435 	    termlist_table.write_changed_blocks(changes_fd);
436 	    synonym_table.write_changed_blocks(changes_fd);
437 	    spelling_table.write_changed_blocks(changes_fd);
438 	    record_table.write_changed_blocks(changes_fd);
439 	    position_table.write_changed_blocks(changes_fd);
440 	    postlist_table.write_changed_blocks(changes_fd);
441 	}
442 
443 	postlist_table.commit(new_revision, changes_fd);
444 	position_table.commit(new_revision, changes_fd);
445 	termlist_table.commit(new_revision, changes_fd);
446 	synonym_table.commit(new_revision, changes_fd);
447 	spelling_table.commit(new_revision, changes_fd);
448 
449 	string changes_tail; // Data to be appended to the changes file
450 	if (changes_fd >= 0) {
451 	    changes_tail += '\0';
452 	    pack_uint(changes_tail, new_revision);
453 	}
454 	record_table.commit(new_revision, changes_fd, &changes_tail);
455     } catch (...) {
456 	// Remove the changeset, if there was one.
457 	if (changes_fd >= 0) {
458 	    (void)io_unlink(changes_name);
459 	}
460 
461 	throw;
462     }
463 
464     if (changes_fd >= 0 && max_changesets < new_revision) {
465 	// While change sets less than N - max_changesets exist, delete them
466 	// 1 must be subtracted so we don't delete the changeset we just wrote
467 	// when max_changesets = 1
468 	unsigned rev = new_revision - max_changesets - 1;
469 	while (io_unlink(db_dir + "/changes" + str(rev--))) { }
470     }
471 }
472 
473 void
request_document(Xapian::docid did) const474 ChertDatabase::request_document(Xapian::docid did) const
475 {
476     record_table.readahead_for_record(did);
477 }
478 
479 void
readahead_for_query(const Xapian::Query & query)480 ChertDatabase::readahead_for_query(const Xapian::Query &query)
481 {
482     Xapian::TermIterator t;
483     for (t = query.get_unique_terms_begin(); t != Xapian::TermIterator(); ++t) {
484 	const string & term = *t;
485 	if (!postlist_table.readahead_key(ChertPostListTable::make_key(term)))
486 	    break;
487     }
488 }
489 
490 bool
reopen()491 ChertDatabase::reopen()
492 {
493     LOGCALL(DB, bool, "ChertDatabase::reopen", NO_ARGS);
494     if (!readonly) RETURN(false);
495     RETURN(open_tables_consistent());
496 }
497 
498 void
close()499 ChertDatabase::close()
500 {
501     LOGCALL_VOID(DB, "ChertDatabase::close", NO_ARGS);
502     postlist_table.close(true);
503     position_table.close(true);
504     termlist_table.close(true);
505     synonym_table.close(true);
506     spelling_table.close(true);
507     record_table.close(true);
508     lock.release();
509 }
510 
511 void
get_database_write_lock(int flags,bool creating)512 ChertDatabase::get_database_write_lock(int flags, bool creating)
513 {
514     LOGCALL_VOID(DB, "ChertDatabase::get_database_write_lock", flags|creating);
515     string explanation;
516     bool retry = flags & Xapian::DB_RETRY_LOCK;
517     FlintLock::reason why = lock.lock(true, retry, explanation);
518     if (why != FlintLock::SUCCESS) {
519 	if (why == FlintLock::UNKNOWN && !creating && !database_exists()) {
520 	    string msg("No chert database found at path '");
521 	    msg += db_dir;
522 	    msg += '\'';
523 	    throw Xapian::DatabaseNotFoundError(msg);
524 	}
525 	lock.throw_databaselockerror(why, db_dir, explanation);
526     }
527 }
528 
529 void
send_whole_database(RemoteConnection & conn,double end_time)530 ChertDatabase::send_whole_database(RemoteConnection & conn, double end_time)
531 {
532     LOGCALL_VOID(DB, "ChertDatabase::send_whole_database", conn | end_time);
533 #ifdef XAPIAN_HAS_REMOTE_BACKEND
534     // Send the current revision number in the header.
535     string buf;
536     string uuid = get_uuid();
537     buf += encode_length(uuid.size());
538     buf += uuid;
539     pack_uint(buf, get_revision_number());
540     conn.send_message(REPL_REPLY_DB_HEADER, buf, end_time);
541 
542     // Send all the tables.  The tables which we want to be cached best after
543     // the copy finished are sent last.
544     static const char filenames[] =
545 	"\x0b""termlist.DB""\x0e""termlist.baseA\x0e""termlist.baseB"
546 	"\x0a""synonym.DB""\x0d""synonym.baseA\x0d""synonym.baseB"
547 	"\x0b""spelling.DB""\x0e""spelling.baseA\x0e""spelling.baseB"
548 	"\x09""record.DB""\x0c""record.baseA\x0c""record.baseB"
549 	"\x0b""position.DB""\x0e""position.baseA\x0e""position.baseB"
550 	"\x0b""postlist.DB""\x0e""postlist.baseA\x0e""postlist.baseB"
551 	"\x08""iamchert";
552     string filepath = db_dir;
553     filepath += '/';
554     for (const char * p = filenames; *p; p += *p + 1) {
555 	string leaf(p + 1, size_t(static_cast<unsigned char>(*p)));
556 	filepath.replace(db_dir.size() + 1, string::npos, leaf);
557 	FD fd(posixy_open(filepath.c_str(), O_RDONLY | O_CLOEXEC));
558 	if (fd >= 0) {
559 	    conn.send_message(REPL_REPLY_DB_FILENAME, leaf, end_time);
560 	    conn.send_file(REPL_REPLY_DB_FILEDATA, fd, end_time);
561 	}
562     }
563 #else
564     (void)conn;
565     (void)end_time;
566 #endif
567 }
568 
569 void
write_changesets_to_fd(int fd,const string & revision,bool need_whole_db,ReplicationInfo * info)570 ChertDatabase::write_changesets_to_fd(int fd,
571 				      const string & revision,
572 				      bool need_whole_db,
573 				      ReplicationInfo * info)
574 {
575     LOGCALL_VOID(DB, "ChertDatabase::write_changesets_to_fd", fd | revision | need_whole_db | info);
576 #ifdef XAPIAN_HAS_REMOTE_BACKEND
577     int whole_db_copies_left = MAX_DB_COPIES_PER_CONVERSATION;
578     chert_revision_number_t start_rev_num = 0;
579     string start_uuid = get_uuid();
580 
581     chert_revision_number_t needed_rev_num = 0;
582 
583     const char * rev_ptr = revision.data();
584     const char * rev_end = rev_ptr + revision.size();
585     if (!unpack_uint(&rev_ptr, rev_end, &start_rev_num)) {
586 	need_whole_db = true;
587     }
588 
589     RemoteConnection conn(-1, fd, string());
590 
591     // While the starting revision number is less than the latest revision
592     // number, look for a changeset, and write it.
593     //
594     // FIXME - perhaps we should make hardlinks for all the changesets we're
595     // likely to need, first, and then start sending them, so that there's no
596     // risk of them disappearing while we're sending earlier ones.
597     while (true) {
598 	if (need_whole_db) {
599 	    // Decrease the counter of copies left to be sent, and fail
600 	    // if we've already copied the database enough.  This ensures that
601 	    // synchronisation attempts always terminate eventually.
602 	    if (whole_db_copies_left == 0) {
603 		conn.send_message(REPL_REPLY_FAIL,
604 				  "Database changing too fast",
605 				  0.0);
606 		return;
607 	    }
608 	    whole_db_copies_left--;
609 
610 	    // Send the whole database across.
611 	    start_rev_num = get_revision_number();
612 	    start_uuid = get_uuid();
613 
614 	    send_whole_database(conn, 0.0);
615 	    if (info != NULL)
616 		++(info->fullcopy_count);
617 
618 	    need_whole_db = false;
619 
620 	    reopen();
621 	    if (start_uuid == get_uuid()) {
622 		// Send the latest revision number after sending the tables.
623 		// The update must proceed to that revision number before the
624 		// copy is safe to make live.
625 
626 		string buf;
627 		needed_rev_num = get_revision_number();
628 		pack_uint(buf, needed_rev_num);
629 		conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
630 		if (info != NULL && start_rev_num == needed_rev_num)
631 		    info->changed = true;
632 	    } else {
633 		// Database has been replaced since we did the copy.  Send a
634 		// higher revision number than the revision we've just copied,
635 		// so that the client doesn't make the copy we've just done
636 		// live, and then mark that we need to do a copy again.
637 		// The client will never actually get the required revision,
638 		// because the next message is going to be the start of a new
639 		// database transfer.
640 
641 		string buf;
642 		pack_uint(buf, start_rev_num + 1);
643 		conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
644 		need_whole_db = true;
645 	    }
646 	} else {
647 	    // Check if we've sent all the updates.
648 	    if (start_rev_num >= get_revision_number()) {
649 		reopen();
650 		if (start_uuid != get_uuid()) {
651 		    need_whole_db = true;
652 		    continue;
653 		}
654 		if (start_rev_num >= get_revision_number()) {
655 		    break;
656 		}
657 	    }
658 
659 	    // Look for the changeset for revision start_rev_num.
660 	    string changes_name = db_dir + "/changes" + str(start_rev_num);
661 	    FD fd_changes(posixy_open(changes_name.c_str(), O_RDONLY | O_CLOEXEC));
662 	    if (fd_changes >= 0) {
663 		// Send it, and also update start_rev_num to the new value
664 		// specified in the changeset.
665 		chert_revision_number_t changeset_start_rev_num;
666 		chert_revision_number_t changeset_end_rev_num;
667 		get_changeset_revisions(changes_name,
668 					&changeset_start_rev_num,
669 					&changeset_end_rev_num);
670 		if (changeset_start_rev_num != start_rev_num) {
671 		    throw Xapian::DatabaseError("Changeset start revision does not match changeset filename");
672 		}
673 		if (changeset_start_rev_num >= changeset_end_rev_num) {
674 		    throw Xapian::DatabaseError("Changeset start revision is not less than end revision");
675 		}
676 
677 		conn.send_file(REPL_REPLY_CHANGESET, fd_changes, 0.0);
678 		start_rev_num = changeset_end_rev_num;
679 		if (info != NULL) {
680 		    ++(info->changeset_count);
681 		    if (start_rev_num >= needed_rev_num)
682 			info->changed = true;
683 		}
684 	    } else {
685 		// The changeset doesn't exist: leave the revision number as it
686 		// is, and mark for doing a full database copy.
687 		need_whole_db = true;
688 	    }
689 	}
690     }
691     conn.send_message(REPL_REPLY_END_OF_CHANGES, string(), 0.0);
692 #else
693     (void)fd;
694     (void)revision;
695     (void)need_whole_db;
696     (void)info;
697 #endif
698 }
699 
700 void
modifications_failed(chert_revision_number_t old_revision,chert_revision_number_t new_revision,const std::string & msg)701 ChertDatabase::modifications_failed(chert_revision_number_t old_revision,
702 				    chert_revision_number_t new_revision,
703 				    const std::string & msg)
704 {
705     // Modifications failed.  Wipe all the modifications from memory.
706     try {
707 	// Discard any buffered changes and reinitialised cached values
708 	// from the table.
709 	cancel();
710 
711 	// Reopen tables with old revision number.
712 	open_tables(old_revision);
713 
714 	// Increase revision numbers to new revision number plus one,
715 	// writing increased numbers to all tables.
716 	++new_revision;
717 	set_revision_number(new_revision);
718     } catch (const Xapian::Error &e) {
719 	// We can't get the database into a consistent state, so close
720 	// it to avoid the risk of database corruption.
721 	ChertDatabase::close();
722 	throw Xapian::DatabaseError("Modifications failed (" + msg +
723 				    "), and cannot set consistent table "
724 				    "revision numbers: " + e.get_msg());
725     }
726 }
727 
728 void
apply()729 ChertDatabase::apply()
730 {
731     LOGCALL_VOID(DB, "ChertDatabase::apply", NO_ARGS);
732     if (!postlist_table.is_modified() &&
733 	!position_table.is_modified() &&
734 	!termlist_table.is_modified() &&
735 	!value_manager.is_modified() &&
736 	!synonym_table.is_modified() &&
737 	!spelling_table.is_modified() &&
738 	!record_table.is_modified()) {
739 	return;
740     }
741 
742     chert_revision_number_t old_revision = get_revision_number();
743     chert_revision_number_t new_revision = get_next_revision_number();
744 
745     try {
746 	set_revision_number(new_revision);
747     } catch (const Xapian::Error &e) {
748 	modifications_failed(old_revision, new_revision, e.get_description());
749 	throw;
750     } catch (...) {
751 	modifications_failed(old_revision, new_revision, "Unknown error");
752 	throw;
753     }
754 }
755 
756 void
cancel()757 ChertDatabase::cancel()
758 {
759     LOGCALL_VOID(DB, "ChertDatabase::cancel", NO_ARGS);
760     postlist_table.cancel();
761     position_table.cancel();
762     termlist_table.cancel();
763     value_manager.cancel();
764     synonym_table.cancel();
765     spelling_table.cancel();
766     record_table.cancel();
767 }
768 
769 Xapian::doccount
get_doccount() const770 ChertDatabase::get_doccount() const
771 {
772     LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_doccount", NO_ARGS);
773     RETURN(record_table.get_doccount());
774 }
775 
776 Xapian::docid
get_lastdocid() const777 ChertDatabase::get_lastdocid() const
778 {
779     LOGCALL(DB, Xapian::docid, "ChertDatabase::get_lastdocid", NO_ARGS);
780     RETURN(stats.get_last_docid());
781 }
782 
783 Xapian::totallength
get_total_length() const784 ChertDatabase::get_total_length() const
785 {
786     LOGCALL(DB, Xapian::totallength, "ChertDatabase::get_total_length", NO_ARGS);
787     RETURN(stats.get_total_doclen());
788 }
789 
790 Xapian::termcount
get_doclength(Xapian::docid did) const791 ChertDatabase::get_doclength(Xapian::docid did) const
792 {
793     LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_doclength", did);
794     Assert(did != 0);
795     intrusive_ptr<const ChertDatabase> ptrtothis(this);
796     RETURN(postlist_table.get_doclength(did, ptrtothis));
797 }
798 
799 Xapian::termcount
get_unique_terms(Xapian::docid did) const800 ChertDatabase::get_unique_terms(Xapian::docid did) const
801 {
802     LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_unique_terms", did);
803     Assert(did != 0);
804     intrusive_ptr<const ChertDatabase> ptrtothis(this);
805     ChertTermList termlist(ptrtothis, did);
806     // Note that the "approximate" size should be exact in this case.
807     //
808     // get_unique_terms() really ought to only count terms with wdf > 0, but
809     // that's expensive to calculate on demand, so for now let's just ensure
810     // unique_terms <= doclen.
811     RETURN(min(termlist.get_approx_size(),
812 	       postlist_table.get_doclength(did, ptrtothis)));
813 }
814 
815 void
get_freqs(const string & term,Xapian::doccount * termfreq_ptr,Xapian::termcount * collfreq_ptr) const816 ChertDatabase::get_freqs(const string & term,
817 			 Xapian::doccount * termfreq_ptr,
818 			 Xapian::termcount * collfreq_ptr) const
819 {
820     LOGCALL_VOID(DB, "ChertDatabase::get_freqs", term | termfreq_ptr | collfreq_ptr);
821     Assert(!term.empty());
822     postlist_table.get_freqs(term, termfreq_ptr, collfreq_ptr);
823 }
824 
825 Xapian::doccount
get_value_freq(Xapian::valueno slot) const826 ChertDatabase::get_value_freq(Xapian::valueno slot) const
827 {
828     LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_value_freq", slot);
829     RETURN(value_manager.get_value_freq(slot));
830 }
831 
832 std::string
get_value_lower_bound(Xapian::valueno slot) const833 ChertDatabase::get_value_lower_bound(Xapian::valueno slot) const
834 {
835     LOGCALL(DB, std::string, "ChertDatabase::get_value_lower_bound", slot);
836     RETURN(value_manager.get_value_lower_bound(slot));
837 }
838 
839 std::string
get_value_upper_bound(Xapian::valueno slot) const840 ChertDatabase::get_value_upper_bound(Xapian::valueno slot) const
841 {
842     LOGCALL(DB, std::string, "ChertDatabase::get_value_upper_bound", slot);
843     RETURN(value_manager.get_value_upper_bound(slot));
844 }
845 
846 Xapian::termcount
get_doclength_lower_bound() const847 ChertDatabase::get_doclength_lower_bound() const
848 {
849     return stats.get_doclength_lower_bound();
850 }
851 
852 Xapian::termcount
get_doclength_upper_bound() const853 ChertDatabase::get_doclength_upper_bound() const
854 {
855     return stats.get_doclength_upper_bound();
856 }
857 
858 Xapian::termcount
get_wdf_upper_bound(const string & term) const859 ChertDatabase::get_wdf_upper_bound(const string & term) const
860 {
861     Xapian::termcount cf;
862     get_freqs(term, NULL, &cf);
863     return min(cf, stats.get_wdf_upper_bound());
864 }
865 
866 bool
term_exists(const string & term) const867 ChertDatabase::term_exists(const string & term) const
868 {
869     LOGCALL(DB, bool, "ChertDatabase::term_exists", term);
870     Assert(!term.empty());
871     RETURN(postlist_table.term_exists(term));
872 }
873 
874 bool
has_positions() const875 ChertDatabase::has_positions() const
876 {
877     return !position_table.empty();
878 }
879 
880 LeafPostList *
open_post_list(const string & term) const881 ChertDatabase::open_post_list(const string& term) const
882 {
883     LOGCALL(DB, LeafPostList *, "ChertDatabase::open_post_list", term);
884     intrusive_ptr<const ChertDatabase> ptrtothis(this);
885 
886     if (term.empty()) {
887 	Xapian::doccount doccount = get_doccount();
888 	if (stats.get_last_docid() == doccount) {
889 	    RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
890 	}
891 	RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
892     }
893 
894     RETURN(new ChertPostList(ptrtothis, term, true));
895 }
896 
897 ValueList *
open_value_list(Xapian::valueno slot) const898 ChertDatabase::open_value_list(Xapian::valueno slot) const
899 {
900     LOGCALL(DB, ValueList *, "ChertDatabase::open_value_list", slot);
901     intrusive_ptr<const ChertDatabase> ptrtothis(this);
902     RETURN(new ChertValueList(slot, ptrtothis));
903 }
904 
905 TermList *
open_term_list(Xapian::docid did) const906 ChertDatabase::open_term_list(Xapian::docid did) const
907 {
908     LOGCALL(DB, TermList *, "ChertDatabase::open_term_list", did);
909     Assert(did != 0);
910     if (!termlist_table.is_open())
911 	throw_termlist_table_close_exception();
912     intrusive_ptr<const ChertDatabase> ptrtothis(this);
913     RETURN(new ChertTermList(ptrtothis, did));
914 }
915 
916 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const917 ChertDatabase::open_document(Xapian::docid did, bool lazy) const
918 {
919     LOGCALL(DB, Xapian::Document::Internal *, "ChertDatabase::open_document", did | lazy);
920     Assert(did != 0);
921     if (!lazy) {
922 	// This will throw DocNotFoundError if the document doesn't exist.
923 	(void)get_doclength(did);
924     }
925 
926     intrusive_ptr<const Database::Internal> ptrtothis(this);
927     RETURN(new ChertDocument(ptrtothis, did, &value_manager, &record_table));
928 }
929 
930 PositionList *
open_position_list(Xapian::docid did,const string & term) const931 ChertDatabase::open_position_list(Xapian::docid did, const string & term) const
932 {
933     Assert(did != 0);
934 
935     AutoPtr<ChertPositionList> poslist(new ChertPositionList);
936     if (!poslist->read_data(&position_table, did, term)) {
937 	// As of 1.1.0, we don't check if the did and term exist - we just
938 	// return an empty positionlist.  If the user really needs to know,
939 	// they can check for themselves.
940     }
941 
942     return poslist.release();
943 }
944 
945 TermList *
open_allterms(const string & prefix) const946 ChertDatabase::open_allterms(const string & prefix) const
947 {
948     LOGCALL(DB, TermList *, "ChertDatabase::open_allterms", NO_ARGS);
949     RETURN(new ChertAllTermsList(intrusive_ptr<const ChertDatabase>(this),
950 				 prefix));
951 }
952 
953 TermList *
open_spelling_termlist(const string & word) const954 ChertDatabase::open_spelling_termlist(const string & word) const
955 {
956     return spelling_table.open_termlist(word);
957 }
958 
959 TermList *
open_spelling_wordlist() const960 ChertDatabase::open_spelling_wordlist() const
961 {
962     ChertCursor * cursor = spelling_table.cursor_get();
963     if (!cursor) return NULL;
964     return new ChertSpellingWordsList(intrusive_ptr<const ChertDatabase>(this),
965 				      cursor);
966 }
967 
968 Xapian::doccount
get_spelling_frequency(const string & word) const969 ChertDatabase::get_spelling_frequency(const string & word) const
970 {
971     return spelling_table.get_word_frequency(word);
972 }
973 
974 TermList *
open_synonym_termlist(const string & term) const975 ChertDatabase::open_synonym_termlist(const string & term) const
976 {
977     return synonym_table.open_termlist(term);
978 }
979 
980 TermList *
open_synonym_keylist(const string & prefix) const981 ChertDatabase::open_synonym_keylist(const string & prefix) const
982 {
983     ChertCursor * cursor = synonym_table.cursor_get();
984     if (!cursor) return NULL;
985     return new ChertSynonymTermList(intrusive_ptr<const ChertDatabase>(this),
986 				    cursor, prefix);
987 }
988 
989 string
get_metadata(const string & key) const990 ChertDatabase::get_metadata(const string & key) const
991 {
992     LOGCALL(DB, string, "ChertDatabase::get_metadata", key);
993     string btree_key("\x00\xc0", 2);
994     btree_key += key;
995     string tag;
996     (void)postlist_table.get_exact_entry(btree_key, tag);
997     RETURN(tag);
998 }
999 
1000 TermList *
open_metadata_keylist(const std::string & prefix) const1001 ChertDatabase::open_metadata_keylist(const std::string &prefix) const
1002 {
1003     LOGCALL(DB, TermList *, "ChertDatabase::open_metadata_keylist", NO_ARGS);
1004     ChertCursor * cursor = postlist_table.cursor_get();
1005     if (!cursor) RETURN(NULL);
1006     RETURN(new ChertMetadataTermList(intrusive_ptr<const ChertDatabase>(this),
1007 				     cursor, prefix));
1008 }
1009 
1010 string
get_revision_info() const1011 ChertDatabase::get_revision_info() const
1012 {
1013     LOGCALL(DB, string, "ChertDatabase::get_revision_info", NO_ARGS);
1014     string buf;
1015     pack_uint(buf, get_revision_number());
1016     RETURN(buf);
1017 }
1018 
1019 string
get_uuid() const1020 ChertDatabase::get_uuid() const
1021 {
1022     LOGCALL(DB, string, "ChertDatabase::get_uuid", NO_ARGS);
1023     RETURN(version_file.get_uuid_string());
1024 }
1025 
1026 void
throw_termlist_table_close_exception() const1027 ChertDatabase::throw_termlist_table_close_exception() const
1028 {
1029     // Either the database has been closed, or else there's no termlist table.
1030     // Check if the postlist table is open to determine which is the case.
1031     if (!postlist_table.is_open())
1032 	ChertTable::throw_database_closed();
1033     throw Xapian::FeatureUnavailableError("Database has no termlist");
1034 }
1035 
1036 void
get_used_docid_range(Xapian::docid & first,Xapian::docid & last) const1037 ChertDatabase::get_used_docid_range(Xapian::docid & first,
1038 				    Xapian::docid & last) const
1039 {
1040     last = stats.get_last_docid();
1041     if (last == record_table.get_doccount()) {
1042 	// Contiguous range starting at 1.
1043 	first = 1;
1044 	return;
1045     }
1046     postlist_table.get_used_docid_range(first, last);
1047 }
1048 
1049 bool
locked() const1050 ChertDatabase::locked() const
1051 {
1052     return lock.test();
1053 }
1054 
1055 bool
has_uncommitted_changes() const1056 ChertDatabase::has_uncommitted_changes() const
1057 {
1058     return false;
1059 }
1060 
1061 ///////////////////////////////////////////////////////////////////////////
1062 
ChertWritableDatabase(const string & dir,int action,int block_size)1063 ChertWritableDatabase::ChertWritableDatabase(const string &dir, int action,
1064 					       int block_size)
1065 	: ChertDatabase(dir, action, block_size),
1066 	  freq_deltas(),
1067 	  doclens(),
1068 	  mod_plists(),
1069 	  change_count(0),
1070 	  flush_threshold(0),
1071 	  modify_shortcut_document(NULL),
1072 	  modify_shortcut_docid(0)
1073 {
1074     LOGCALL_CTOR(DB, "ChertWritableDatabase", dir | action | block_size);
1075 
1076     const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
1077     if (p)
1078 	flush_threshold = atoi(p);
1079     if (flush_threshold == 0)
1080 	flush_threshold = 10000;
1081 }
1082 
~ChertWritableDatabase()1083 ChertWritableDatabase::~ChertWritableDatabase()
1084 {
1085     LOGCALL_DTOR(DB, "ChertWritableDatabase");
1086     dtor_called();
1087 }
1088 
1089 void
commit()1090 ChertWritableDatabase::commit()
1091 {
1092     if (transaction_active())
1093 	throw Xapian::InvalidOperationError("Can't commit during a transaction");
1094     if (change_count) flush_postlist_changes();
1095     apply();
1096 }
1097 
1098 void
check_flush_threshold()1099 ChertWritableDatabase::check_flush_threshold()
1100 {
1101     // FIXME: this should be done by checking memory usage, not the number of
1102     // changes.
1103     // We could also look at:
1104     // * mod_plists.size()
1105     // * doclens.size()
1106     // * freq_deltas.size()
1107     //
1108     // cout << "+++ mod_plists.size() " << mod_plists.size() <<
1109     //     ", doclens.size() " << doclens.size() <<
1110     //	   ", freq_deltas.size() " << freq_deltas.size() << endl;
1111     if (++change_count >= flush_threshold) {
1112 	flush_postlist_changes();
1113 	if (!transaction_active()) apply();
1114     }
1115 }
1116 
1117 void
flush_postlist_changes() const1118 ChertWritableDatabase::flush_postlist_changes() const
1119 {
1120     postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
1121     stats.write(postlist_table);
1122 
1123     freq_deltas.clear();
1124     doclens.clear();
1125     mod_plists.clear();
1126     change_count = 0;
1127 }
1128 
1129 void
close()1130 ChertWritableDatabase::close()
1131 {
1132     LOGCALL_VOID(DB, "ChertWritableDatabase::close", NO_ARGS);
1133     if (!transaction_active()) {
1134 	commit();
1135 	// FIXME: if commit() throws, should we still close?
1136     }
1137     ChertDatabase::close();
1138 }
1139 
1140 void
apply()1141 ChertWritableDatabase::apply()
1142 {
1143     value_manager.set_value_stats(value_stats);
1144     ChertDatabase::apply();
1145 }
1146 
1147 void
add_freq_delta(const string & tname,Xapian::termcount_diff tf_delta,Xapian::termcount_diff cf_delta)1148 ChertWritableDatabase::add_freq_delta(const string & tname,
1149 				      Xapian::termcount_diff tf_delta,
1150 				      Xapian::termcount_diff cf_delta)
1151 {
1152     map<string, pair<termcount_diff, termcount_diff> >::iterator i;
1153     i = freq_deltas.find(tname);
1154     if (i == freq_deltas.end()) {
1155 	freq_deltas.insert(make_pair(tname, make_pair(tf_delta, cf_delta)));
1156     } else {
1157 	i->second.first += tf_delta;
1158 	i->second.second += cf_delta;
1159     }
1160 }
1161 
1162 void
insert_mod_plist(Xapian::docid did,const string & tname,Xapian::termcount wdf)1163 ChertWritableDatabase::insert_mod_plist(Xapian::docid did,
1164 					const string & tname,
1165 					Xapian::termcount wdf)
1166 {
1167     // Find or make the appropriate entry in mod_plists.
1168     map<string, map<docid, pair<char, termcount> > >::iterator j;
1169     j = mod_plists.find(tname);
1170     if (j == mod_plists.end()) {
1171 	map<docid, pair<char, termcount> > m;
1172 	j = mod_plists.insert(make_pair(tname, m)).first;
1173     }
1174     j->second[did] = make_pair('A', wdf);
1175 }
1176 
1177 void
update_mod_plist(Xapian::docid did,const string & tname,char type,Xapian::termcount wdf)1178 ChertWritableDatabase::update_mod_plist(Xapian::docid did,
1179 					const string & tname,
1180 					char type,
1181 					Xapian::termcount wdf)
1182 {
1183     // Find or make the appropriate entry in mod_plists.
1184     map<string, map<docid, pair<char, termcount> > >::iterator j;
1185     j = mod_plists.find(tname);
1186     if (j == mod_plists.end()) {
1187 	map<docid, pair<char, termcount> > m;
1188 	j = mod_plists.insert(make_pair(tname, m)).first;
1189     }
1190 
1191     map<docid, pair<char, termcount> >::iterator k;
1192     k = j->second.find(did);
1193     if (k == j->second.end()) {
1194 	j->second.insert(make_pair(did, make_pair(type, wdf)));
1195     } else {
1196 	if (type == 'A') {
1197 	    // Adding an entry which has already been deleted.
1198 	    Assert(k->second.first == 'D');
1199 	    type = 'M';
1200 	}
1201 	k->second = make_pair(type, wdf);
1202     }
1203 }
1204 
1205 Xapian::docid
add_document(const Xapian::Document & document)1206 ChertWritableDatabase::add_document(const Xapian::Document & document)
1207 {
1208     LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document", document);
1209     // Make sure the docid counter doesn't overflow.
1210     if (stats.get_last_docid() == CHERT_MAX_DOCID)
1211 	throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
1212     // Use the next unused document ID.
1213     RETURN(add_document_(stats.get_next_docid(), document));
1214 }
1215 
1216 Xapian::docid
add_document_(Xapian::docid did,const Xapian::Document & document)1217 ChertWritableDatabase::add_document_(Xapian::docid did,
1218 				     const Xapian::Document & document)
1219 {
1220     LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document_", did | document);
1221     Assert(did != 0);
1222     try {
1223 	// Add the record using that document ID.
1224 	record_table.replace_record(document.get_data(), did);
1225 
1226 	// Set the values.
1227 	value_manager.add_document(did, document, value_stats);
1228 
1229 	chert_doclen_t new_doclen = 0;
1230 	{
1231 	    Xapian::TermIterator term = document.termlist_begin();
1232 	    for ( ; term != document.termlist_end(); ++term) {
1233 		termcount wdf = term.get_wdf();
1234 		// Calculate the new document length
1235 		new_doclen += wdf;
1236 		stats.check_wdf(wdf);
1237 
1238 		string tname = *term;
1239 		if (tname.size() > MAX_SAFE_TERM_LENGTH)
1240 		    throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + tname);
1241 		add_freq_delta(tname, 1, wdf);
1242 		insert_mod_plist(did, tname, wdf);
1243 
1244 		PositionIterator pos = term.positionlist_begin();
1245 		if (pos != term.positionlist_end()) {
1246 		    position_table.set_positionlist(
1247 			did, tname,
1248 			pos, term.positionlist_end(), false);
1249 		}
1250 	    }
1251 	}
1252 	LOGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
1253 
1254 	// Set the termlist.
1255 	if (termlist_table.is_open())
1256 	    termlist_table.set_termlist(did, document, new_doclen);
1257 
1258 	// Set the new document length
1259 	Assert(doclens.find(did) == doclens.end() || doclens[did] == static_cast<Xapian::termcount>(-1));
1260 	doclens[did] = new_doclen;
1261 	stats.add_document(new_doclen);
1262     } catch (...) {
1263 	// If an error occurs while adding a document, or doing any other
1264 	// transaction, the modifications so far must be cleared before
1265 	// returning control to the user - otherwise partial modifications will
1266 	// persist in memory, and eventually get written to disk.
1267 	cancel();
1268 	throw;
1269     }
1270 
1271     check_flush_threshold();
1272 
1273     RETURN(did);
1274 }
1275 
1276 void
delete_document(Xapian::docid did)1277 ChertWritableDatabase::delete_document(Xapian::docid did)
1278 {
1279     LOGCALL_VOID(DB, "ChertWritableDatabase::delete_document", did);
1280     Assert(did != 0);
1281 
1282     if (!termlist_table.is_open())
1283 	throw_termlist_table_close_exception();
1284 
1285     if (rare(modify_shortcut_docid == did)) {
1286 	// The modify_shortcut document can't be used for a modification
1287 	// shortcut now, because it's been deleted!
1288 	modify_shortcut_document = NULL;
1289 	modify_shortcut_docid = 0;
1290     }
1291 
1292     // Remove the record.  If this fails, just propagate the exception since
1293     // the state should still be consistent (most likely it's
1294     // DocNotFoundError).
1295     record_table.delete_record(did);
1296 
1297     try {
1298 	// Remove the values.
1299 	value_manager.delete_document(did, value_stats);
1300 
1301 	// OK, now add entries to remove the postings in the underlying record.
1302 	intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1303 	ChertTermList termlist(ptrtothis, did);
1304 
1305 	stats.delete_document(termlist.get_doclength());
1306 
1307 	termlist.next();
1308 	while (!termlist.at_end()) {
1309 	    string tname = termlist.get_termname();
1310 	    position_table.delete_positionlist(did, tname);
1311 	    termcount wdf = termlist.get_wdf();
1312 
1313 	    add_freq_delta(tname, -1, -wdf);
1314 	    update_mod_plist(did, tname, 'D', 0u);
1315 
1316 	    termlist.next();
1317 	}
1318 
1319 	// Remove the termlist.
1320 	if (termlist_table.is_open())
1321 	    termlist_table.delete_termlist(did);
1322 
1323 	// Mark this document as removed.
1324 	doclens[did] = static_cast<Xapian::termcount>(-1);
1325     } catch (...) {
1326 	// If an error occurs while deleting a document, or doing any other
1327 	// transaction, the modifications so far must be cleared before
1328 	// returning control to the user - otherwise partial modifications will
1329 	// persist in memory, and eventually get written to disk.
1330 	cancel();
1331 	throw;
1332     }
1333 
1334     check_flush_threshold();
1335 }
1336 
1337 void
replace_document(Xapian::docid did,const Xapian::Document & document)1338 ChertWritableDatabase::replace_document(Xapian::docid did,
1339 					const Xapian::Document & document)
1340 {
1341     LOGCALL_VOID(DB, "ChertWritableDatabase::replace_document", did | document);
1342     Assert(did != 0);
1343 
1344     try {
1345 	if (did > stats.get_last_docid()) {
1346 	    stats.set_last_docid(did);
1347 	    // If this docid is above the highwatermark, then we can't be
1348 	    // replacing an existing document.
1349 	    (void)add_document_(did, document);
1350 	    return;
1351 	}
1352 
1353 	if (!termlist_table.is_open()) {
1354 	    // We can replace an *unused* docid <= last_docid too.
1355 	    intrusive_ptr<const ChertDatabase> ptrtothis(this);
1356 	    if (!postlist_table.document_exists(did, ptrtothis)) {
1357 		(void)add_document_(did, document);
1358 		return;
1359 	    }
1360 	    throw_termlist_table_close_exception();
1361 	}
1362 
1363 	// Check for a document read from this database being replaced - ie, a
1364 	// modification operation.
1365 	bool modifying = false;
1366 	if (modify_shortcut_docid &&
1367 	    document.internal->get_docid() == modify_shortcut_docid) {
1368 	    if (document.internal.get() == modify_shortcut_document) {
1369 		// We have a docid, it matches, and the pointer matches, so we
1370 		// can skip modification of any data which hasn't been modified
1371 		// in the document.
1372 		if (!document.internal->modified()) {
1373 		    // If the document is unchanged, we've nothing to do.
1374 		    return;
1375 		}
1376 		modifying = true;
1377 		LOGLINE(DB, "Detected potential document modification shortcut.");
1378 	    } else {
1379 		// The modify_shortcut document can't be used for a
1380 		// modification shortcut now, because it's about to be
1381 		// modified.
1382 		modify_shortcut_document = NULL;
1383 		modify_shortcut_docid = 0;
1384 	    }
1385 	}
1386 
1387 	if (!modifying || document.internal->terms_modified()) {
1388 	    bool pos_modified = !modifying ||
1389 				document.internal->term_positions_modified();
1390 	    intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1391 	    ChertTermList termlist(ptrtothis, did);
1392 	    Xapian::TermIterator term = document.termlist_begin();
1393 	    chert_doclen_t old_doclen = termlist.get_doclength();
1394 	    stats.delete_document(old_doclen);
1395 	    chert_doclen_t new_doclen = old_doclen;
1396 
1397 	    string old_tname, new_tname;
1398 
1399 	    termlist.next();
1400 	    while (!termlist.at_end() || term != document.termlist_end()) {
1401 		int cmp;
1402 		if (termlist.at_end()) {
1403 		    cmp = 1;
1404 		    new_tname = *term;
1405 		} else {
1406 		    old_tname = termlist.get_termname();
1407 		    if (term != document.termlist_end()) {
1408 			new_tname = *term;
1409 			cmp = old_tname.compare(new_tname);
1410 		    } else {
1411 			cmp = -1;
1412 		    }
1413 		}
1414 
1415 		if (cmp < 0) {
1416 		    // Term old_tname has been deleted.
1417 		    termcount old_wdf = termlist.get_wdf();
1418 		    new_doclen -= old_wdf;
1419 		    add_freq_delta(old_tname, -1, -old_wdf);
1420 		    if (pos_modified)
1421 			position_table.delete_positionlist(did, old_tname);
1422 		    update_mod_plist(did, old_tname, 'D', 0u);
1423 		    termlist.next();
1424 		} else if (cmp > 0) {
1425 		    // Term new_tname as been added.
1426 		    termcount new_wdf = term.get_wdf();
1427 		    new_doclen += new_wdf;
1428 		    stats.check_wdf(new_wdf);
1429 		    if (new_tname.size() > MAX_SAFE_TERM_LENGTH)
1430 			throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + new_tname);
1431 		    add_freq_delta(new_tname, 1, new_wdf);
1432 		    update_mod_plist(did, new_tname, 'A', new_wdf);
1433 		    if (pos_modified) {
1434 			PositionIterator pos = term.positionlist_begin();
1435 			if (pos != term.positionlist_end()) {
1436 			    position_table.set_positionlist(
1437 				did, new_tname,
1438 				pos, term.positionlist_end(), false);
1439 			}
1440 		    }
1441 		    ++term;
1442 		} else if (cmp == 0) {
1443 		    // Term already exists: look for wdf and positionlist changes.
1444 		    termcount old_wdf = termlist.get_wdf();
1445 		    termcount new_wdf = term.get_wdf();
1446 
1447 		    // Check the stats even if wdf hasn't changed, because
1448 		    // this is the only document, the stats will have been
1449 		    // zeroed.
1450 		    stats.check_wdf(new_wdf);
1451 
1452 		    if (old_wdf != new_wdf) {
1453 			new_doclen += new_wdf - old_wdf;
1454 			add_freq_delta(new_tname, 0, new_wdf - old_wdf);
1455 			update_mod_plist(did, new_tname, 'M', new_wdf);
1456 		    }
1457 
1458 		    if (pos_modified) {
1459 			PositionIterator pos = term.positionlist_begin();
1460 			if (pos != term.positionlist_end()) {
1461 			    position_table.set_positionlist(did, new_tname, pos,
1462 							    term.positionlist_end(),
1463 							    true);
1464 			} else {
1465 			    position_table.delete_positionlist(did, new_tname);
1466 			}
1467 		    }
1468 
1469 		    ++term;
1470 		    termlist.next();
1471 		}
1472 	    }
1473 	    LOGLINE(DB, "Calculated doclen for replacement document " << did << " as " << new_doclen);
1474 
1475 	    // Set the termlist.
1476 	    if (termlist_table.is_open())
1477 		termlist_table.set_termlist(did, document, new_doclen);
1478 
1479 	    // Set the new document length
1480 	    if (new_doclen != old_doclen)
1481 		doclens[did] = new_doclen;
1482 	    stats.add_document(new_doclen);
1483 	}
1484 
1485 	if (!modifying || document.internal->data_modified()) {
1486 	    // Replace the record
1487 	    record_table.replace_record(document.get_data(), did);
1488 	}
1489 
1490 	if (!modifying || document.internal->values_modified()) {
1491 	    // Replace the values.
1492 	    value_manager.replace_document(did, document, value_stats);
1493 	}
1494     } catch (const Xapian::DocNotFoundError &) {
1495 	(void)add_document_(did, document);
1496 	return;
1497     } catch (...) {
1498 	// If an error occurs while replacing a document, or doing any other
1499 	// transaction, the modifications so far must be cleared before
1500 	// returning control to the user - otherwise partial modifications will
1501 	// persist in memory, and eventually get written to disk.
1502 	cancel();
1503 	throw;
1504     }
1505 
1506     check_flush_threshold();
1507 }
1508 
1509 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const1510 ChertWritableDatabase::open_document(Xapian::docid did, bool lazy) const
1511 {
1512     LOGCALL(DB, Xapian::Document::Internal *, "ChertWritableDatabase::open_document", did | lazy);
1513     modify_shortcut_document = ChertDatabase::open_document(did, lazy);
1514     // Store the docid only after open_document() successfully returns, so an
1515     // attempt to open a missing document doesn't overwrite this.
1516     modify_shortcut_docid = did;
1517     RETURN(modify_shortcut_document);
1518 }
1519 
1520 Xapian::termcount
get_doclength(Xapian::docid did) const1521 ChertWritableDatabase::get_doclength(Xapian::docid did) const
1522 {
1523     LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_doclength", did);
1524     map<docid, termcount>::const_iterator i = doclens.find(did);
1525     if (i != doclens.end()) {
1526 	Xapian::termcount doclen = i->second;
1527 	if (doclen == static_cast<Xapian::termcount>(-1)) {
1528 	    throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1529 	}
1530 	RETURN(doclen);
1531     }
1532     RETURN(ChertDatabase::get_doclength(did));
1533 }
1534 
1535 Xapian::termcount
get_unique_terms(Xapian::docid did) const1536 ChertWritableDatabase::get_unique_terms(Xapian::docid did) const
1537 {
1538     LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_unique_terms", did);
1539     Assert(did != 0);
1540     // Note that the "approximate" size should be exact in this case.
1541     //
1542     // get_unique_terms() really ought to only count terms with wdf > 0, but
1543     // that's expensive to calculate on demand, so for now let's just ensure
1544     // unique_terms <= doclen.
1545     map<docid, termcount>::const_iterator i = doclens.find(did);
1546     if (i != doclens.end()) {
1547 	Xapian::termcount doclen = i->second;
1548 	if (doclen == static_cast<Xapian::termcount>(-1)) {
1549 	    throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1550 	}
1551 	intrusive_ptr<const ChertDatabase> ptrtothis(this);
1552 	ChertTermList termlist(ptrtothis, did);
1553 	RETURN(min(doclen, termlist.get_approx_size()));
1554     }
1555     RETURN(ChertDatabase::get_unique_terms(did));
1556 }
1557 
1558 void
get_freqs(const string & term,Xapian::doccount * termfreq_ptr,Xapian::termcount * collfreq_ptr) const1559 ChertWritableDatabase::get_freqs(const string & term,
1560 				 Xapian::doccount * termfreq_ptr,
1561 				 Xapian::termcount * collfreq_ptr) const
1562 {
1563     LOGCALL_VOID(DB, "ChertWritableDatabase::get_freqs", term | termfreq_ptr | collfreq_ptr);
1564     Assert(!term.empty());
1565     ChertDatabase::get_freqs(term, termfreq_ptr, collfreq_ptr);
1566     map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1567     i = freq_deltas.find(term);
1568     if (i != freq_deltas.end()) {
1569 	if (termfreq_ptr)
1570 	    *termfreq_ptr += i->second.first;
1571 	if (collfreq_ptr)
1572 	    *collfreq_ptr += i->second.second;
1573     }
1574 }
1575 
1576 Xapian::doccount
get_value_freq(Xapian::valueno slot) const1577 ChertWritableDatabase::get_value_freq(Xapian::valueno slot) const
1578 {
1579     LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_value_freq", slot);
1580     map<Xapian::valueno, ValueStats>::const_iterator i;
1581     i = value_stats.find(slot);
1582     if (i != value_stats.end()) RETURN(i->second.freq);
1583     RETURN(ChertDatabase::get_value_freq(slot));
1584 }
1585 
1586 std::string
get_value_lower_bound(Xapian::valueno slot) const1587 ChertWritableDatabase::get_value_lower_bound(Xapian::valueno slot) const
1588 {
1589     LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_lower_bound", slot);
1590     map<Xapian::valueno, ValueStats>::const_iterator i;
1591     i = value_stats.find(slot);
1592     if (i != value_stats.end()) RETURN(i->second.lower_bound);
1593     RETURN(ChertDatabase::get_value_lower_bound(slot));
1594 }
1595 
1596 std::string
get_value_upper_bound(Xapian::valueno slot) const1597 ChertWritableDatabase::get_value_upper_bound(Xapian::valueno slot) const
1598 {
1599     LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_upper_bound", slot);
1600     map<Xapian::valueno, ValueStats>::const_iterator i;
1601     i = value_stats.find(slot);
1602     if (i != value_stats.end()) RETURN(i->second.upper_bound);
1603     RETURN(ChertDatabase::get_value_upper_bound(slot));
1604 }
1605 
1606 bool
term_exists(const string & tname) const1607 ChertWritableDatabase::term_exists(const string & tname) const
1608 {
1609     LOGCALL(DB, bool, "ChertWritableDatabase::term_exists", tname);
1610     Xapian::doccount tf;
1611     get_freqs(tname, &tf, NULL);
1612     RETURN(tf != 0);
1613 }
1614 
1615 LeafPostList *
open_post_list(const string & tname) const1616 ChertWritableDatabase::open_post_list(const string& tname) const
1617 {
1618     LOGCALL(DB, LeafPostList *, "ChertWritableDatabase::open_post_list", tname);
1619     intrusive_ptr<const ChertWritableDatabase> ptrtothis(this);
1620 
1621     if (tname.empty()) {
1622 	Xapian::doccount doccount = get_doccount();
1623 	if (stats.get_last_docid() == doccount) {
1624 	    RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
1625 	}
1626 	if (doclens.empty()) {
1627 	    RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
1628 	}
1629 	RETURN(new ChertAllDocsModifiedPostList(ptrtothis, doccount, doclens));
1630     }
1631 
1632     map<string, map<docid, pair<char, termcount> > >::const_iterator j;
1633     j = mod_plists.find(tname);
1634     if (j != mod_plists.end()) {
1635 	// We've got buffered changes to this term's postlist, so we need to
1636 	// use a ChertModifiedPostList.
1637 	RETURN(new ChertModifiedPostList(ptrtothis, tname, j->second));
1638     }
1639 
1640     RETURN(new ChertPostList(ptrtothis, tname, true));
1641 }
1642 
1643 ValueList *
open_value_list(Xapian::valueno slot) const1644 ChertWritableDatabase::open_value_list(Xapian::valueno slot) const
1645 {
1646     LOGCALL(DB, ValueList *, "ChertWritableDatabase::open_value_list", slot);
1647     // If there are changes, we don't have code to iterate the modified value
1648     // list so we need to flush (but don't commit - there may be a transaction
1649     // in progress).
1650     if (change_count) value_manager.merge_changes();
1651     RETURN(ChertDatabase::open_value_list(slot));
1652 }
1653 
1654 TermList *
open_allterms(const string & prefix) const1655 ChertWritableDatabase::open_allterms(const string & prefix) const
1656 {
1657     LOGCALL(DB, TermList *, "ChertWritableDatabase::open_allterms", NO_ARGS);
1658     // If there are changes, terms may have been added or removed, and so we
1659     // need to flush (but don't commit - there may be a transaction in
1660     // progress).
1661     if (change_count) flush_postlist_changes();
1662     RETURN(ChertDatabase::open_allterms(prefix));
1663 }
1664 
1665 void
cancel()1666 ChertWritableDatabase::cancel()
1667 {
1668     ChertDatabase::cancel();
1669     stats.read(postlist_table);
1670     freq_deltas.clear();
1671     doclens.clear();
1672     mod_plists.clear();
1673     value_stats.clear();
1674     change_count = 0;
1675 }
1676 
1677 void
add_spelling(const string & word,Xapian::termcount freqinc) const1678 ChertWritableDatabase::add_spelling(const string & word,
1679 				    Xapian::termcount freqinc) const
1680 {
1681     spelling_table.add_word(word, freqinc);
1682 }
1683 
1684 void
remove_spelling(const string & word,Xapian::termcount freqdec) const1685 ChertWritableDatabase::remove_spelling(const string & word,
1686 				       Xapian::termcount freqdec) const
1687 {
1688     spelling_table.remove_word(word, freqdec);
1689 }
1690 
1691 TermList *
open_spelling_wordlist() const1692 ChertWritableDatabase::open_spelling_wordlist() const
1693 {
1694     spelling_table.merge_changes();
1695     return ChertDatabase::open_spelling_wordlist();
1696 }
1697 
1698 TermList *
open_synonym_keylist(const string & prefix) const1699 ChertWritableDatabase::open_synonym_keylist(const string & prefix) const
1700 {
1701     synonym_table.merge_changes();
1702     return ChertDatabase::open_synonym_keylist(prefix);
1703 }
1704 
1705 void
add_synonym(const string & term,const string & synonym) const1706 ChertWritableDatabase::add_synonym(const string & term,
1707 				   const string & synonym) const
1708 {
1709     synonym_table.add_synonym(term, synonym);
1710 }
1711 
1712 void
remove_synonym(const string & term,const string & synonym) const1713 ChertWritableDatabase::remove_synonym(const string & term,
1714 				      const string & synonym) const
1715 {
1716     synonym_table.remove_synonym(term, synonym);
1717 }
1718 
1719 void
clear_synonyms(const string & term) const1720 ChertWritableDatabase::clear_synonyms(const string & term) const
1721 {
1722     synonym_table.clear_synonyms(term);
1723 }
1724 
1725 void
set_metadata(const string & key,const string & value)1726 ChertWritableDatabase::set_metadata(const string & key, const string & value)
1727 {
1728     LOGCALL_VOID(DB, "ChertWritableDatabase::set_metadata", key | value);
1729     string btree_key("\x00\xc0", 2);
1730     btree_key += key;
1731     if (value.empty()) {
1732 	postlist_table.del(btree_key);
1733     } else {
1734 	postlist_table.add(btree_key, value);
1735     }
1736 }
1737 
1738 void
invalidate_doc_object(Xapian::Document::Internal * obj) const1739 ChertWritableDatabase::invalidate_doc_object(Xapian::Document::Internal * obj) const
1740 {
1741     if (obj == modify_shortcut_document) {
1742 	modify_shortcut_document = NULL;
1743 	modify_shortcut_docid = 0;
1744     }
1745 }
1746 
1747 bool
has_uncommitted_changes() const1748 ChertWritableDatabase::has_uncommitted_changes() const
1749 {
1750     return change_count > 0 ||
1751 	   postlist_table.is_modified() ||
1752 	   position_table.is_modified() ||
1753 	   termlist_table.is_modified() ||
1754 	   value_manager.is_modified() ||
1755 	   synonym_table.is_modified() ||
1756 	   spelling_table.is_modified() ||
1757 	   record_table.is_modified();
1758 }
1759