1 /* chert_database.cc: chert database
2  *
3  * Copyright 1999,2000,2001 BrightStation PLC
4  * Copyright 2001 Hein Ragas
5  * Copyright 2002 Ananova Ltd
6  * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2015 Olly Betts
7  * Copyright 2006,2008 Lemur Consulting Ltd
8  * Copyright 2009,2010 Richard Boulton
9  * Copyright 2009 Kan-Ru Chen
10  * Copyright 2011 Dan Colish
11  *
12  * This program is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU General Public License as
14  * published by the Free Software Foundation; either version 2 of the
15  * License, or (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
25  * USA
26  */
27 
28 #include <config.h>
29 
30 #include "chert_database.h"
31 
32 #include <xapian/error.h>
33 #include <xapian/valueiterator.h>
34 
35 #include "contiguousalldocspostlist.h"
36 #include "chert_alldocsmodifiedpostlist.h"
37 #include "chert_alldocspostlist.h"
38 #include "chert_alltermslist.h"
39 #include "chert_replicate_internal.h"
40 #include "chert_document.h"
41 #include "../flint_lock.h"
42 #include "chert_metadata.h"
43 #include "chert_modifiedpostlist.h"
44 #include "chert_positionlist.h"
45 #include "chert_postlist.h"
46 #include "chert_record.h"
47 #include "chert_spellingwordslist.h"
48 #include "chert_termlist.h"
49 #include "chert_valuelist.h"
50 #include "chert_values.h"
51 #include "debuglog.h"
52 #include "io_utils.h"
53 #include "net/length.h"
54 #include "pack.h"
55 #include "remoteconnection.h"
56 #include "replicate_utils.h"
57 #include "replication.h"
58 #include "replicationprotocol.h"
59 #include "serialise.h"
60 #include "str.h"
61 #include "stringutils.h"
62 #include "utils.h"
63 #include "valuestats.h"
64 
65 #ifdef __WIN32__
66 # include "msvc_posix_wrapper.h"
67 #endif
68 
69 #include "safeerrno.h"
70 #include "safesysstat.h"
71 #include <sys/types.h>
72 
73 #include <algorithm>
74 #include "autoptr.h"
75 #include <cstdlib>
76 #include <string>
77 
78 using namespace std;
79 using namespace Xapian;
80 
81 // The maximum safe term length is determined by the postlist.  There we
82 // store the term using pack_string_preserving_sort() which takes the
83 // length of the string plus an extra byte (assuming the string doesn't
84 // contain any zero bytes), followed by the docid with encoded with
85 // pack_uint_preserving_sort() which takes up to 5 bytes.
86 //
87 // The Btree manager's key length limit is 252 bytes so the maximum safe term
88 // length is 252 - 1 - 5 = 246 bytes.  We use 245 rather than 246 for
89 // consistency with flint.
90 //
91 // If the term contains zero bytes, the limit is lower (by one for each zero
92 // byte in the term).
93 #define MAX_SAFE_TERM_LENGTH 245
94 
95 /** Maximum number of times to try opening the tables to get them at a
96  *  consistent revision.
97  *
98  *  This is mostly just to avoid any chance of an infinite loop - normally
99  *  we'll either get then on the first or second try.
100  */
101 const int MAX_OPEN_RETRIES = 100;
102 
103 /* This finds the tables, opens them at consistent revisions, manages
104  * determining the current and next revision numbers, and stores handles
105  * to the tables.
106  */
ChertDatabase(const string & chert_dir,int action,unsigned int block_size)107 ChertDatabase::ChertDatabase(const string &chert_dir, int action,
108 			     unsigned int block_size)
109 	: db_dir(chert_dir),
110 	  readonly(action == XAPIAN_DB_READONLY),
111 	  version_file(db_dir),
112 	  postlist_table(db_dir, readonly),
113 	  position_table(db_dir, readonly),
114 	  termlist_table(db_dir, readonly),
115 	  value_manager(&postlist_table, &termlist_table),
116 	  synonym_table(db_dir, readonly),
117 	  spelling_table(db_dir, readonly),
118 	  record_table(db_dir, readonly),
119 	  lock(db_dir),
120 	  max_changesets(0)
121 {
122     LOGCALL_CTOR(DB, "ChertDatabase", chert_dir | action | block_size);
123 
124     if (action == XAPIAN_DB_READONLY) {
125 	open_tables_consistent();
126 	return;
127     }
128 
129     if (action != Xapian::DB_OPEN && !database_exists()) {
130 
131 	// Create the directory for the database, if it doesn't exist
132 	// already.
133 	bool fail = false;
134 	struct stat statbuf;
135 	if (stat(db_dir, &statbuf) == 0) {
136 	    if (!S_ISDIR(statbuf.st_mode)) fail = true;
137 	} else if (errno != ENOENT || mkdir(db_dir, 0755) == -1) {
138 	    fail = true;
139 	}
140 	if (fail) {
141 	    throw Xapian::DatabaseCreateError("Cannot create directory `" +
142 					      db_dir + "'", errno);
143 	}
144 	get_database_write_lock(true);
145 
146 	create_and_open_tables(block_size);
147 	return;
148     }
149 
150     if (action == Xapian::DB_CREATE) {
151 	throw Xapian::DatabaseCreateError("Can't create new database at `" +
152 					  db_dir + "': a database already exists and I was told "
153 					  "not to overwrite it");
154     }
155 
156     get_database_write_lock(false);
157     // if we're overwriting, pretend the db doesn't exist
158     if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
159 	create_and_open_tables(block_size);
160 	return;
161     }
162 
163     // Get latest consistent version
164     open_tables_consistent();
165 
166     // Check that there are no more recent versions of tables.  If there
167     // are, perform recovery by writing a new revision number to all
168     // tables.
169     if (record_table.get_open_revision_number() !=
170 	postlist_table.get_latest_revision_number()) {
171 	chert_revision_number_t new_revision = get_next_revision_number();
172 
173 	set_revision_number(new_revision);
174     }
175 }
176 
~ChertDatabase()177 ChertDatabase::~ChertDatabase()
178 {
179     LOGCALL_DTOR(DB, "~ChertDatabase");
180 }
181 
182 bool
database_exists()183 ChertDatabase::database_exists() {
184     LOGCALL(DB, bool, "ChertDatabase::database_exists", NO_ARGS);
185     RETURN(record_table.exists() && postlist_table.exists());
186 }
187 
188 void
create_and_open_tables(unsigned int block_size)189 ChertDatabase::create_and_open_tables(unsigned int block_size)
190 {
191     LOGCALL_VOID(DB, "ChertDatabase::create_and_open_tables", NO_ARGS);
192     // The caller is expected to create the database directory if it doesn't
193     // already exist.
194 
195     // Create postlist_table first, and record_table last.  Existence of
196     // record_table is considered to imply existence of the database.
197     version_file.create();
198     postlist_table.create_and_open(block_size);
199     position_table.create_and_open(block_size);
200     termlist_table.create_and_open(block_size);
201     synonym_table.create_and_open(block_size);
202     spelling_table.create_and_open(block_size);
203     record_table.create_and_open(block_size);
204 
205     Assert(database_exists());
206 
207     // Check consistency
208     chert_revision_number_t revision = record_table.get_open_revision_number();
209     if (revision != postlist_table.get_open_revision_number()) {
210 	throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
211     }
212 
213     stats.zero();
214 }
215 
216 void
open_tables_consistent()217 ChertDatabase::open_tables_consistent()
218 {
219     LOGCALL_VOID(DB, "ChertDatabase::open_tables_consistent", NO_ARGS);
220     // Open record_table first, since it's the last to be written to,
221     // and hence if a revision is available in it, it should be available
222     // in all the other tables (unless they've moved on already).
223     //
224     // If we find that a table can't open the desired revision, we
225     // go back and open record_table again, until record_table has
226     // the same revision as the last time we opened it.
227 
228     chert_revision_number_t cur_rev = record_table.get_open_revision_number();
229 
230     // Check the version file unless we're reopening.
231     if (cur_rev == 0) version_file.read_and_check();
232 
233     record_table.open();
234     chert_revision_number_t revision = record_table.get_open_revision_number();
235 
236     if (cur_rev && cur_rev == revision) {
237 	// We're reopening a database and the revision hasn't changed so we
238 	// don't need to do anything.
239 	return;
240     }
241 
242     // Set the block_size for optional tables as they may not currently exist.
243     unsigned int block_size = record_table.get_block_size();
244     position_table.set_block_size(block_size);
245     termlist_table.set_block_size(block_size);
246     synonym_table.set_block_size(block_size);
247     spelling_table.set_block_size(block_size);
248 
249     value_manager.reset();
250 
251     bool fully_opened = false;
252     int tries_left = MAX_OPEN_RETRIES;
253     while (!fully_opened && (tries_left--) > 0) {
254 	if (spelling_table.open(revision) &&
255 	    synonym_table.open(revision) &&
256 	    termlist_table.open(revision) &&
257 	    position_table.open(revision) &&
258 	    postlist_table.open(revision)) {
259 	    // Everything now open at the same revision.
260 	    fully_opened = true;
261 	} else {
262 	    // Couldn't open consistent revision: two cases possible:
263 	    // i)   An update has completed and a second one has begun since
264 	    //      record was opened.  This leaves a consistent revision
265 	    //      available, but not the one we were trying to open.
266 	    // ii)  Tables have become corrupt / have no consistent revision
267 	    //      available.  In this case, updates must have ceased.
268 	    //
269 	    // So, we reopen the record table, and check its revision number,
270 	    // if it's changed we try the opening again, otherwise we give up.
271 	    //
272 	    record_table.open();
273 	    chert_revision_number_t newrevision =
274 		    record_table.get_open_revision_number();
275 	    if (revision == newrevision) {
276 		// Revision number hasn't changed - therefore a second index
277 		// sweep hasn't begun and the system must have failed.  Database
278 		// is inconsistent.
279 		throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
280 	    }
281 	    revision = newrevision;
282 	}
283     }
284 
285     if (!fully_opened) {
286 	throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
287     }
288 
289     stats.read(postlist_table);
290 }
291 
292 void
open_tables(chert_revision_number_t revision)293 ChertDatabase::open_tables(chert_revision_number_t revision)
294 {
295     LOGCALL_VOID(DB, "ChertDatabase::open_tables", revision);
296     version_file.read_and_check();
297     record_table.open(revision);
298 
299     // Set the block_size for optional tables as they may not currently exist.
300     unsigned int block_size = record_table.get_block_size();
301     position_table.set_block_size(block_size);
302     termlist_table.set_block_size(block_size);
303     synonym_table.set_block_size(block_size);
304     spelling_table.set_block_size(block_size);
305 
306     value_manager.reset();
307 
308     spelling_table.open(revision);
309     synonym_table.open(revision);
310     termlist_table.open(revision);
311     position_table.open(revision);
312     postlist_table.open(revision);
313 }
314 
315 chert_revision_number_t
get_revision_number() const316 ChertDatabase::get_revision_number() const
317 {
318     LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_revision_number", NO_ARGS);
319     // We could use any table here, theoretically.
320     RETURN(postlist_table.get_open_revision_number());
321 }
322 
323 chert_revision_number_t
get_next_revision_number() const324 ChertDatabase::get_next_revision_number() const
325 {
326     LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_next_revision_number", NO_ARGS);
327     /* We _must_ use postlist_table here, since it is always the first
328      * to be written, and hence will have the greatest available revision
329      * number.
330      */
331     chert_revision_number_t new_revision =
332 	    postlist_table.get_latest_revision_number();
333     ++new_revision;
334     RETURN(new_revision);
335 }
336 
337 void
get_changeset_revisions(const string & path,chert_revision_number_t * startrev,chert_revision_number_t * endrev) const338 ChertDatabase::get_changeset_revisions(const string & path,
339 				       chert_revision_number_t * startrev,
340 				       chert_revision_number_t * endrev) const
341 {
342     int changes_fd = -1;
343 #ifdef __WIN32__
344     changes_fd = msvc_posix_open(path.c_str(), O_RDONLY | O_BINARY);
345 #else
346     changes_fd = open(path.c_str(), O_RDONLY | O_BINARY);
347 #endif
348     fdcloser closer(changes_fd);
349 
350     if (changes_fd < 0) {
351 	string message = string("Couldn't open changeset ")
352 		+ path + " to read";
353 	throw Xapian::DatabaseError(message, errno);
354     }
355 
356     char buf[REASONABLE_CHANGESET_SIZE];
357     const char *start = buf;
358     const char *end = buf + io_read(changes_fd, buf,
359 				    REASONABLE_CHANGESET_SIZE, 0);
360     if (size_t(end - start) < CONST_STRLEN(CHANGES_MAGIC_STRING))
361 	throw Xapian::DatabaseError("Changeset too short at " + path);
362     if (memcmp(start, CHANGES_MAGIC_STRING,
363 	       CONST_STRLEN(CHANGES_MAGIC_STRING)) != 0) {
364 	string message = string("Changeset at ")
365 		+ path + " does not contain valid magic string";
366 	throw Xapian::DatabaseError(message);
367     }
368     start += CONST_STRLEN(CHANGES_MAGIC_STRING);
369 
370     unsigned int changes_version;
371     if (!unpack_uint(&start, end, &changes_version))
372 	throw Xapian::DatabaseError("Couldn't read a valid version number for "
373 				    "changeset at " + path);
374     if (changes_version != CHANGES_VERSION)
375 	throw Xapian::DatabaseError("Don't support version of changeset at "
376 				    + path);
377 
378     if (!unpack_uint(&start, end, startrev))
379 	throw Xapian::DatabaseError("Couldn't read a valid start revision from "
380 				    "changeset at " + path);
381 
382     if (!unpack_uint(&start, end, endrev))
383 	throw Xapian::DatabaseError("Couldn't read a valid end revision for "
384 				    "changeset at " + path);
385 }
386 
387 void
set_revision_number(chert_revision_number_t new_revision)388 ChertDatabase::set_revision_number(chert_revision_number_t new_revision)
389 {
390     LOGCALL_VOID(DB, "ChertDatabase::set_revision_number", new_revision);
391 
392     value_manager.merge_changes();
393 
394     postlist_table.flush_db();
395     position_table.flush_db();
396     termlist_table.flush_db();
397     synonym_table.flush_db();
398     spelling_table.flush_db();
399     record_table.flush_db();
400 
401     int changes_fd = -1;
402     string changes_name;
403 
404     const char *p = getenv("XAPIAN_MAX_CHANGESETS");
405     if (p) {
406 	max_changesets = atoi(p);
407     } else {
408 	max_changesets = 0;
409     }
410 
411     if (max_changesets > 0) {
412 	chert_revision_number_t old_revision = get_revision_number();
413 	if (old_revision) {
414 	    // Don't generate a changeset for the first revision.
415 	    changes_fd = create_changeset_file(db_dir,
416 					       "/changes" + str(old_revision),
417 					       changes_name);
418 	}
419     }
420 
421     try {
422 	fdcloser closefd(changes_fd);
423 	if (changes_fd >= 0) {
424 	    string buf;
425 	    chert_revision_number_t old_revision = get_revision_number();
426 	    buf += CHANGES_MAGIC_STRING;
427 	    pack_uint(buf, CHANGES_VERSION);
428 	    pack_uint(buf, old_revision);
429 	    pack_uint(buf, new_revision);
430 
431 #ifndef DANGEROUS
432 	    buf += '\x00'; // Changes can be applied to a live database.
433 #else
434 	    buf += '\x01';
435 #endif
436 
437 	    io_write(changes_fd, buf.data(), buf.size());
438 
439 	    // Write the changes to the blocks in the tables.  Do the postlist
440 	    // table last, so that ends up cached the most, if the cache
441 	    // available is limited.  Do the position table just before that
442 	    // as having that cached will also improve search performance.
443 	    termlist_table.write_changed_blocks(changes_fd);
444 	    synonym_table.write_changed_blocks(changes_fd);
445 	    spelling_table.write_changed_blocks(changes_fd);
446 	    record_table.write_changed_blocks(changes_fd);
447 	    position_table.write_changed_blocks(changes_fd);
448 	    postlist_table.write_changed_blocks(changes_fd);
449 	}
450 
451 	postlist_table.commit(new_revision, changes_fd);
452 	position_table.commit(new_revision, changes_fd);
453 	termlist_table.commit(new_revision, changes_fd);
454 	synonym_table.commit(new_revision, changes_fd);
455 	spelling_table.commit(new_revision, changes_fd);
456 
457 	string changes_tail; // Data to be appended to the changes file
458 	if (changes_fd >= 0) {
459 	    changes_tail += '\0';
460 	    pack_uint(changes_tail, new_revision);
461 	}
462 	record_table.commit(new_revision, changes_fd, &changes_tail);
463 
464     } catch (...) {
465 	// Remove the changeset, if there was one.
466 	if (changes_fd >= 0) {
467 	    (void)io_unlink(changes_name);
468 	}
469 
470 	throw;
471     }
472 
473     if (changes_fd >= 0 && max_changesets < new_revision) {
474 	// While change sets less than N - max_changesets exist, delete them
475 	// 1 must be subtracted so we don't delete the changeset we just wrote
476 	// when max_changesets = 1
477 	unsigned rev = new_revision - max_changesets - 1;
478 	while (io_unlink(db_dir + "/changes" + str(rev--))) { }
479     }
480 }
481 
482 void
reopen()483 ChertDatabase::reopen()
484 {
485     LOGCALL_VOID(DB, "ChertDatabase::reopen", NO_ARGS);
486     if (readonly) open_tables_consistent();
487 }
488 
489 void
close()490 ChertDatabase::close()
491 {
492     LOGCALL_VOID(DB, "ChertDatabase::close", NO_ARGS);
493     postlist_table.close(true);
494     position_table.close(true);
495     termlist_table.close(true);
496     synonym_table.close(true);
497     spelling_table.close(true);
498     record_table.close(true);
499     lock.release();
500 }
501 
502 void
get_database_write_lock(bool creating)503 ChertDatabase::get_database_write_lock(bool creating)
504 {
505     LOGCALL_VOID(DB, "ChertDatabase::get_database_write_lock", creating);
506     string explanation;
507     FlintLock::reason why = lock.lock(true, explanation);
508     if (why != FlintLock::SUCCESS) {
509 	if (why == FlintLock::UNKNOWN && !creating && !database_exists()) {
510 	    string msg("No chert database found at path `");
511 	    msg += db_dir;
512 	    msg += '\'';
513 	    throw Xapian::DatabaseOpeningError(msg);
514 	}
515 	lock.throw_databaselockerror(why, db_dir, explanation);
516     }
517 }
518 
519 void
send_whole_database(RemoteConnection & conn,double end_time)520 ChertDatabase::send_whole_database(RemoteConnection & conn, double end_time)
521 {
522     LOGCALL_VOID(DB, "ChertDatabase::send_whole_database", conn | end_time);
523 
524     // Send the current revision number in the header.
525     string buf;
526     string uuid = get_uuid();
527     buf += encode_length(uuid.size());
528     buf += uuid;
529     pack_uint(buf, get_revision_number());
530     conn.send_message(REPL_REPLY_DB_HEADER, buf, end_time);
531 
532     // Send all the tables.  The tables which we want to be cached best after
533     // the copy finished are sent last.
534     static const char filenames[] =
535 	"\x0b""termlist.DB""\x0e""termlist.baseA\x0e""termlist.baseB"
536 	"\x0a""synonym.DB""\x0d""synonym.baseA\x0d""synonym.baseB"
537 	"\x0b""spelling.DB""\x0e""spelling.baseA\x0e""spelling.baseB"
538 	"\x09""record.DB""\x0c""record.baseA\x0c""record.baseB"
539 	"\x0b""position.DB""\x0e""position.baseA\x0e""position.baseB"
540 	"\x0b""postlist.DB""\x0e""postlist.baseA\x0e""postlist.baseB"
541 	"\x08""iamchert";
542     string filepath = db_dir;
543     filepath += '/';
544     for (const char * p = filenames; *p; p += *p + 1) {
545 	string leaf(p + 1, size_t(static_cast<unsigned char>(*p)));
546 	filepath.replace(db_dir.size() + 1, string::npos, leaf);
547 #ifdef __WIN32__
548 	int fd = msvc_posix_open(filepath.c_str(), O_RDONLY | O_BINARY);
549 #else
550 	int fd = open(filepath.c_str(), O_RDONLY | O_BINARY);
551 #endif
552 	if (fd >= 0) {
553 	    fdcloser closefd(fd);
554 	    conn.send_message(REPL_REPLY_DB_FILENAME, leaf, end_time);
555 	    conn.send_file(REPL_REPLY_DB_FILEDATA, fd, end_time);
556 	}
557     }
558 }
559 
560 void
write_changesets_to_fd(int fd,const string & revision,bool need_whole_db,ReplicationInfo * info)561 ChertDatabase::write_changesets_to_fd(int fd,
562 				      const string & revision,
563 				      bool need_whole_db,
564 				      ReplicationInfo * info)
565 {
566     LOGCALL_VOID(DB, "ChertDatabase::write_changesets_to_fd", fd | revision | need_whole_db | info);
567 
568     int whole_db_copies_left = MAX_DB_COPIES_PER_CONVERSATION;
569     chert_revision_number_t start_rev_num = 0;
570     string start_uuid = get_uuid();
571 
572     chert_revision_number_t needed_rev_num = 0;
573 
574     const char * rev_ptr = revision.data();
575     const char * rev_end = rev_ptr + revision.size();
576     if (!unpack_uint(&rev_ptr, rev_end, &start_rev_num)) {
577 	need_whole_db = true;
578     }
579 
580     RemoteConnection conn(-1, fd, string());
581 
582     // While the starting revision number is less than the latest revision
583     // number, look for a changeset, and write it.
584     //
585     // FIXME - perhaps we should make hardlinks for all the changesets we're
586     // likely to need, first, and then start sending them, so that there's no
587     // risk of them disappearing while we're sending earlier ones.
588     while (true) {
589 	if (need_whole_db) {
590 	    // Decrease the counter of copies left to be sent, and fail
591 	    // if we've already copied the database enough.  This ensures that
592 	    // synchronisation attempts always terminate eventually.
593 	    if (whole_db_copies_left == 0) {
594 		conn.send_message(REPL_REPLY_FAIL,
595 				  "Database changing too fast",
596 				  0.0);
597 		return;
598 	    }
599 	    whole_db_copies_left--;
600 
601 	    // Send the whole database across.
602 	    start_rev_num = get_revision_number();
603 	    start_uuid = get_uuid();
604 
605 	    send_whole_database(conn, 0.0);
606 	    if (info != NULL)
607 		++(info->fullcopy_count);
608 
609 	    need_whole_db = false;
610 
611 	    reopen();
612 	    if (start_uuid == get_uuid()) {
613 		// Send the latest revision number after sending the tables.
614 		// The update must proceed to that revision number before the
615 		// copy is safe to make live.
616 
617 		string buf;
618 		needed_rev_num = get_revision_number();
619 		pack_uint(buf, needed_rev_num);
620 		conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
621 		if (info != NULL && start_rev_num == needed_rev_num)
622 		    info->changed = true;
623 	    } else {
624 		// Database has been replaced since we did the copy.  Send a
625 		// higher revision number than the revision we've just copied,
626 		// so that the client doesn't make the copy we've just done
627 		// live, and then mark that we need to do a copy again.
628 		// The client will never actually get the required revision,
629 		// because the next message is going to be the start of a new
630 		// database transfer.
631 
632 		string buf;
633 		pack_uint(buf, start_rev_num + 1);
634 		conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
635 		need_whole_db = true;
636 	    }
637 	} else {
638 	    // Check if we've sent all the updates.
639 	    if (start_rev_num >= get_revision_number()) {
640 		reopen();
641 		if (start_uuid != get_uuid()) {
642 		    need_whole_db = true;
643 		    continue;
644 		}
645 		if (start_rev_num >= get_revision_number()) {
646 		    break;
647 		}
648 	    }
649 
650 	    // Look for the changeset for revision start_rev_num.
651 	    string changes_name = db_dir + "/changes" + str(start_rev_num);
652 #ifdef __WIN32__
653 	    int fd_changes = msvc_posix_open(changes_name.c_str(), O_RDONLY | O_BINARY);
654 #else
655 	    int fd_changes = open(changes_name.c_str(), O_RDONLY | O_BINARY);
656 #endif
657 	    if (fd_changes >= 0) {
658 		fdcloser closefd(fd_changes);
659 
660 		// Send it, and also update start_rev_num to the new value
661 		// specified in the changeset.
662 		chert_revision_number_t changeset_start_rev_num;
663 		chert_revision_number_t changeset_end_rev_num;
664 		get_changeset_revisions(changes_name,
665 					&changeset_start_rev_num,
666 					&changeset_end_rev_num);
667 		if (changeset_start_rev_num != start_rev_num) {
668 		    throw Xapian::DatabaseError("Changeset start revision does not match changeset filename");
669 		}
670 		if (changeset_start_rev_num >= changeset_end_rev_num) {
671 		    throw Xapian::DatabaseError("Changeset start revision is not less than end revision");
672 		}
673 
674 		conn.send_file(REPL_REPLY_CHANGESET, fd_changes, 0.0);
675 		start_rev_num = changeset_end_rev_num;
676 		if (info != NULL) {
677 		    ++(info->changeset_count);
678 		    if (start_rev_num >= needed_rev_num)
679 			info->changed = true;
680 		}
681 	    } else {
682 		// The changeset doesn't exist: leave the revision number as it
683 		// is, and mark for doing a full database copy.
684 		need_whole_db = true;
685 	    }
686 	}
687     }
688     conn.send_message(REPL_REPLY_END_OF_CHANGES, string(), 0.0);
689 }
690 
691 void
modifications_failed(chert_revision_number_t old_revision,chert_revision_number_t new_revision,const std::string & msg)692 ChertDatabase::modifications_failed(chert_revision_number_t old_revision,
693 				    chert_revision_number_t new_revision,
694 				    const std::string & msg)
695 {
696     // Modifications failed.  Wipe all the modifications from memory.
697     try {
698 	// Discard any buffered changes and reinitialised cached values
699 	// from the table.
700 	cancel();
701 
702 	// Reopen tables with old revision number.
703 	open_tables(old_revision);
704 
705 	// Increase revision numbers to new revision number plus one,
706 	// writing increased numbers to all tables.
707 	++new_revision;
708 	set_revision_number(new_revision);
709     } catch (const Xapian::Error &e) {
710 	// We can't get the database into a consistent state, so close
711 	// it to avoid the risk of database corruption.
712 	ChertDatabase::close();
713 	throw Xapian::DatabaseError("Modifications failed (" + msg +
714 				    "), and cannot set consistent table "
715 				    "revision numbers: " + e.get_msg());
716     }
717 }
718 
719 void
apply()720 ChertDatabase::apply()
721 {
722     LOGCALL_VOID(DB, "ChertDatabase::apply", NO_ARGS);
723     if (!postlist_table.is_modified() &&
724 	!position_table.is_modified() &&
725 	!termlist_table.is_modified() &&
726 	!value_manager.is_modified() &&
727 	!synonym_table.is_modified() &&
728 	!spelling_table.is_modified() &&
729 	!record_table.is_modified()) {
730 	return;
731     }
732 
733     chert_revision_number_t old_revision = get_revision_number();
734     chert_revision_number_t new_revision = get_next_revision_number();
735 
736     try {
737 	set_revision_number(new_revision);
738     } catch (const Xapian::Error &e) {
739 	modifications_failed(old_revision, new_revision, e.get_description());
740 	throw;
741     } catch (...) {
742 	modifications_failed(old_revision, new_revision, "Unknown error");
743 	throw;
744     }
745 }
746 
747 void
cancel()748 ChertDatabase::cancel()
749 {
750     LOGCALL_VOID(DB, "ChertDatabase::cancel", NO_ARGS);
751     postlist_table.cancel();
752     position_table.cancel();
753     termlist_table.cancel();
754     value_manager.cancel();
755     synonym_table.cancel();
756     spelling_table.cancel();
757     record_table.cancel();
758 }
759 
760 Xapian::doccount
get_doccount() const761 ChertDatabase::get_doccount() const
762 {
763     LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_doccount", NO_ARGS);
764     RETURN(record_table.get_doccount());
765 }
766 
767 Xapian::docid
get_lastdocid() const768 ChertDatabase::get_lastdocid() const
769 {
770     LOGCALL(DB, Xapian::docid, "ChertDatabase::get_lastdocid", NO_ARGS);
771     RETURN(stats.get_last_docid());
772 }
773 
774 totlen_t
get_total_length() const775 ChertDatabase::get_total_length() const
776 {
777     LOGCALL(DB, totlen_t, "ChertDatabase::get_total_length", NO_ARGS);
778     RETURN(stats.get_total_doclen());
779 }
780 
781 Xapian::termcount
get_doclength(Xapian::docid did) const782 ChertDatabase::get_doclength(Xapian::docid did) const
783 {
784     LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_doclength", did);
785     Assert(did != 0);
786     Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
787     RETURN(postlist_table.get_doclength(did, ptrtothis));
788 }
789 
790 Xapian::doccount
get_termfreq(const string & term) const791 ChertDatabase::get_termfreq(const string & term) const
792 {
793     LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_termfreq", term);
794     Assert(!term.empty());
795     RETURN(postlist_table.get_termfreq(term));
796 }
797 
798 Xapian::termcount
get_collection_freq(const string & term) const799 ChertDatabase::get_collection_freq(const string & term) const
800 {
801     LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_collection_freq", term);
802     Assert(!term.empty());
803     RETURN(postlist_table.get_collection_freq(term));
804 }
805 
806 Xapian::doccount
get_value_freq(Xapian::valueno slot) const807 ChertDatabase::get_value_freq(Xapian::valueno slot) const
808 {
809     LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_value_freq", slot);
810     RETURN(value_manager.get_value_freq(slot));
811 }
812 
813 std::string
get_value_lower_bound(Xapian::valueno slot) const814 ChertDatabase::get_value_lower_bound(Xapian::valueno slot) const
815 {
816     LOGCALL(DB, std::string, "ChertDatabase::get_value_lower_bound", slot);
817     RETURN(value_manager.get_value_lower_bound(slot));
818 }
819 
820 std::string
get_value_upper_bound(Xapian::valueno slot) const821 ChertDatabase::get_value_upper_bound(Xapian::valueno slot) const
822 {
823     LOGCALL(DB, std::string, "ChertDatabase::get_value_upper_bound", slot);
824     RETURN(value_manager.get_value_upper_bound(slot));
825 }
826 
827 Xapian::termcount
get_doclength_lower_bound() const828 ChertDatabase::get_doclength_lower_bound() const
829 {
830     return stats.get_doclength_lower_bound();
831 }
832 
833 Xapian::termcount
get_doclength_upper_bound() const834 ChertDatabase::get_doclength_upper_bound() const
835 {
836     return stats.get_doclength_upper_bound();
837 }
838 
839 Xapian::termcount
get_wdf_upper_bound(const string & term) const840 ChertDatabase::get_wdf_upper_bound(const string & term) const
841 {
842     return min(get_collection_freq(term), stats.get_wdf_upper_bound());
843 }
844 
845 bool
term_exists(const string & term) const846 ChertDatabase::term_exists(const string & term) const
847 {
848     LOGCALL(DB, bool, "ChertDatabase::term_exists", term);
849     Assert(!term.empty());
850     RETURN(postlist_table.term_exists(term));
851 }
852 
853 bool
has_positions() const854 ChertDatabase::has_positions() const
855 {
856     return !position_table.empty();
857 }
858 
859 LeafPostList *
open_post_list(const string & term) const860 ChertDatabase::open_post_list(const string& term) const
861 {
862     LOGCALL(DB, LeafPostList *, "ChertDatabase::open_post_list", term);
863     Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
864 
865     if (term.empty()) {
866 	Xapian::doccount doccount = get_doccount();
867 	if (stats.get_last_docid() == doccount) {
868 	    RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
869 	}
870 	RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
871     }
872 
873     RETURN(new ChertPostList(ptrtothis, term, true));
874 }
875 
876 ValueList *
open_value_list(Xapian::valueno slot) const877 ChertDatabase::open_value_list(Xapian::valueno slot) const
878 {
879     LOGCALL(DB, ValueList *, "ChertDatabase::open_value_list", slot);
880     Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
881     RETURN(new ChertValueList(slot, ptrtothis));
882 }
883 
884 TermList *
open_term_list(Xapian::docid did) const885 ChertDatabase::open_term_list(Xapian::docid did) const
886 {
887     LOGCALL(DB, TermList *, "ChertDatabase::open_term_list", did);
888     Assert(did != 0);
889     if (!termlist_table.is_open())
890 	throw_termlist_table_close_exception();
891     Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
892     RETURN(new ChertTermList(ptrtothis, did));
893 }
894 
895 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const896 ChertDatabase::open_document(Xapian::docid did, bool lazy) const
897 {
898     LOGCALL(DB, Xapian::Document::Internal *, "ChertDatabase::open_document", did | lazy);
899     Assert(did != 0);
900     if (!lazy) {
901 	// This will throw DocNotFoundError if the document doesn't exist.
902 	(void)get_doclength(did);
903     }
904 
905     Xapian::Internal::RefCntPtr<const Database::Internal> ptrtothis(this);
906     RETURN(new ChertDocument(ptrtothis, did, &value_manager, &record_table));
907 }
908 
909 PositionList *
open_position_list(Xapian::docid did,const string & term) const910 ChertDatabase::open_position_list(Xapian::docid did, const string & term) const
911 {
912     Assert(did != 0);
913 
914     AutoPtr<ChertPositionList> poslist(new ChertPositionList);
915     if (!poslist->read_data(&position_table, did, term)) {
916 	// As of 1.1.0, we don't check if the did and term exist - we just
917 	// return an empty positionlist.  If the user really needs to know,
918 	// they can check for themselves.
919     }
920 
921     return poslist.release();
922 }
923 
924 TermList *
open_allterms(const string & prefix) const925 ChertDatabase::open_allterms(const string & prefix) const
926 {
927     LOGCALL(DB, TermList *, "ChertDatabase::open_allterms", NO_ARGS);
928     RETURN(new ChertAllTermsList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
929 				 prefix));
930 }
931 
932 TermList *
open_spelling_termlist(const string & word) const933 ChertDatabase::open_spelling_termlist(const string & word) const
934 {
935     return spelling_table.open_termlist(word);
936 }
937 
938 TermList *
open_spelling_wordlist() const939 ChertDatabase::open_spelling_wordlist() const
940 {
941     ChertCursor * cursor = spelling_table.cursor_get();
942     if (!cursor) return NULL;
943     return new ChertSpellingWordsList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
944 				      cursor);
945 }
946 
947 Xapian::doccount
get_spelling_frequency(const string & word) const948 ChertDatabase::get_spelling_frequency(const string & word) const
949 {
950     return spelling_table.get_word_frequency(word);
951 }
952 
953 TermList *
open_synonym_termlist(const string & term) const954 ChertDatabase::open_synonym_termlist(const string & term) const
955 {
956     return synonym_table.open_termlist(term);
957 }
958 
959 TermList *
open_synonym_keylist(const string & prefix) const960 ChertDatabase::open_synonym_keylist(const string & prefix) const
961 {
962     ChertCursor * cursor = synonym_table.cursor_get();
963     if (!cursor) return NULL;
964     return new ChertSynonymTermList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
965 				    cursor, prefix);
966 }
967 
968 string
get_metadata(const string & key) const969 ChertDatabase::get_metadata(const string & key) const
970 {
971     LOGCALL(DB, string, "ChertDatabase::get_metadata", key);
972     string btree_key("\x00\xc0", 2);
973     btree_key += key;
974     string tag;
975     (void)postlist_table.get_exact_entry(btree_key, tag);
976     RETURN(tag);
977 }
978 
979 TermList *
open_metadata_keylist(const std::string & prefix) const980 ChertDatabase::open_metadata_keylist(const std::string &prefix) const
981 {
982     LOGCALL(DB, TermList *, "ChertDatabase::open_metadata_keylist", NO_ARGS);
983     ChertCursor * cursor = postlist_table.cursor_get();
984     RETURN(new ChertMetadataTermList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
985 				     cursor, prefix));
986 }
987 
988 string
get_revision_info() const989 ChertDatabase::get_revision_info() const
990 {
991     LOGCALL(DB, string, "ChertDatabase::get_revision_info", NO_ARGS);
992     string buf;
993     pack_uint(buf, get_revision_number());
994     RETURN(buf);
995 }
996 
997 string
get_uuid() const998 ChertDatabase::get_uuid() const
999 {
1000     LOGCALL(DB, string, "ChertDatabase::get_uuid", NO_ARGS);
1001     RETURN(version_file.get_uuid_string());
1002 }
1003 
1004 void
throw_termlist_table_close_exception() const1005 ChertDatabase::throw_termlist_table_close_exception() const
1006 {
1007     // Either the database has been closed, or else there's no termlist table.
1008     // Check if the postlist table is open to determine which is the case.
1009     if (!postlist_table.is_open())
1010 	ChertTable::throw_database_closed();
1011     throw Xapian::FeatureUnavailableError("Database has no termlist");
1012 }
1013 
1014 ///////////////////////////////////////////////////////////////////////////
1015 
ChertWritableDatabase(const string & dir,int action,int block_size)1016 ChertWritableDatabase::ChertWritableDatabase(const string &dir, int action,
1017 					       int block_size)
1018 	: ChertDatabase(dir, action, block_size),
1019 	  freq_deltas(),
1020 	  doclens(),
1021 	  mod_plists(),
1022 	  change_count(0),
1023 	  flush_threshold(0),
1024 	  modify_shortcut_document(NULL),
1025 	  modify_shortcut_docid(0)
1026 {
1027     LOGCALL_CTOR(DB, "ChertWritableDatabase", dir | action | block_size);
1028 
1029     const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
1030     if (p)
1031 	flush_threshold = atoi(p);
1032     if (flush_threshold == 0)
1033 	flush_threshold = 10000;
1034 }
1035 
~ChertWritableDatabase()1036 ChertWritableDatabase::~ChertWritableDatabase()
1037 {
1038     LOGCALL_DTOR(DB, "~ChertWritableDatabase");
1039     dtor_called();
1040 }
1041 
1042 void
commit()1043 ChertWritableDatabase::commit()
1044 {
1045     if (transaction_active())
1046 	throw Xapian::InvalidOperationError("Can't commit during a transaction");
1047     if (change_count) flush_postlist_changes();
1048     apply();
1049 }
1050 
1051 void
flush_postlist_changes() const1052 ChertWritableDatabase::flush_postlist_changes() const
1053 {
1054     postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
1055     stats.write(postlist_table);
1056 
1057     freq_deltas.clear();
1058     doclens.clear();
1059     mod_plists.clear();
1060     change_count = 0;
1061 }
1062 
1063 void
close()1064 ChertWritableDatabase::close()
1065 {
1066     LOGCALL_VOID(DB, "ChertWritableDatabase::close", NO_ARGS);
1067     if (!transaction_active()) {
1068 	commit();
1069 	// FIXME: if commit() throws, should we still close?
1070     }
1071     ChertDatabase::close();
1072 }
1073 
1074 void
apply()1075 ChertWritableDatabase::apply()
1076 {
1077     value_manager.set_value_stats(value_stats);
1078     ChertDatabase::apply();
1079 }
1080 
1081 void
add_freq_delta(const string & tname,Xapian::termcount_diff tf_delta,Xapian::termcount_diff cf_delta)1082 ChertWritableDatabase::add_freq_delta(const string & tname,
1083 				      Xapian::termcount_diff tf_delta,
1084 				      Xapian::termcount_diff cf_delta)
1085 {
1086     map<string, pair<termcount_diff, termcount_diff> >::iterator i;
1087     i = freq_deltas.find(tname);
1088     if (i == freq_deltas.end()) {
1089 	freq_deltas.insert(make_pair(tname, make_pair(tf_delta, cf_delta)));
1090     } else {
1091 	i->second.first += tf_delta;
1092 	i->second.second += cf_delta;
1093     }
1094 }
1095 
1096 void
insert_mod_plist(Xapian::docid did,const string & tname,Xapian::termcount wdf)1097 ChertWritableDatabase::insert_mod_plist(Xapian::docid did,
1098 					const string & tname,
1099 					Xapian::termcount wdf)
1100 {
1101     // Find or make the appropriate entry in mod_plists.
1102     map<string, map<docid, pair<char, termcount> > >::iterator j;
1103     j = mod_plists.find(tname);
1104     if (j == mod_plists.end()) {
1105 	map<docid, pair<char, termcount> > m;
1106 	j = mod_plists.insert(make_pair(tname, m)).first;
1107     }
1108     j->second[did] = make_pair('A', wdf);
1109 }
1110 
1111 void
update_mod_plist(Xapian::docid did,const string & tname,char type,Xapian::termcount wdf)1112 ChertWritableDatabase::update_mod_plist(Xapian::docid did,
1113 					const string & tname,
1114 					char type,
1115 					Xapian::termcount wdf)
1116 {
1117     // Find or make the appropriate entry in mod_plists.
1118     map<string, map<docid, pair<char, termcount> > >::iterator j;
1119     j = mod_plists.find(tname);
1120     if (j == mod_plists.end()) {
1121 	map<docid, pair<char, termcount> > m;
1122 	j = mod_plists.insert(make_pair(tname, m)).first;
1123     }
1124 
1125     map<docid, pair<char, termcount> >::iterator k;
1126     k = j->second.find(did);
1127     if (k == j->second.end()) {
1128 	j->second.insert(make_pair(did, make_pair(type, wdf)));
1129     } else {
1130 	if (type == 'A') {
1131 	    // Adding an entry which has already been deleted.
1132 	    Assert(k->second.first == 'D');
1133 	    type = 'M';
1134 	}
1135 	k->second = make_pair(type, wdf);
1136     }
1137 }
1138 
1139 Xapian::docid
add_document(const Xapian::Document & document)1140 ChertWritableDatabase::add_document(const Xapian::Document & document)
1141 {
1142     LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document", document);
1143     // Make sure the docid counter doesn't overflow.
1144     if (stats.get_last_docid() == CHERT_MAX_DOCID)
1145 	throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
1146     // Use the next unused document ID.
1147     RETURN(add_document_(stats.get_next_docid(), document));
1148 }
1149 
1150 Xapian::docid
add_document_(Xapian::docid did,const Xapian::Document & document)1151 ChertWritableDatabase::add_document_(Xapian::docid did,
1152 				     const Xapian::Document & document)
1153 {
1154     LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document_", did | document);
1155     Assert(did != 0);
1156     try {
1157 	// Add the record using that document ID.
1158 	record_table.replace_record(document.get_data(), did);
1159 
1160 	// Set the values.
1161 	value_manager.add_document(did, document, value_stats);
1162 
1163 	chert_doclen_t new_doclen = 0;
1164 	{
1165 	    Xapian::TermIterator term = document.termlist_begin();
1166 	    Xapian::TermIterator term_end = document.termlist_end();
1167 	    for ( ; term != term_end; ++term) {
1168 		termcount wdf = term.get_wdf();
1169 		// Calculate the new document length
1170 		new_doclen += wdf;
1171 		stats.check_wdf(wdf);
1172 
1173 		string tname = *term;
1174 		if (tname.size() > MAX_SAFE_TERM_LENGTH)
1175 		    throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + tname);
1176 		add_freq_delta(tname, 1, wdf);
1177 		insert_mod_plist(did, tname, wdf);
1178 
1179 		PositionIterator pos = term.positionlist_begin();
1180 		if (pos != term.positionlist_end()) {
1181 		    position_table.set_positionlist(
1182 			did, tname,
1183 			pos, term.positionlist_end(), false);
1184 		}
1185 	    }
1186 	}
1187 	LOGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
1188 
1189 	// Set the termlist.
1190 	if (termlist_table.is_open())
1191 	    termlist_table.set_termlist(did, document, new_doclen);
1192 
1193 	// Set the new document length
1194 	Assert(doclens.find(did) == doclens.end() || doclens[did] == static_cast<Xapian::termcount>(-1));
1195 	doclens[did] = new_doclen;
1196 	stats.add_document(new_doclen);
1197     } catch (...) {
1198 	// If an error occurs while adding a document, or doing any other
1199 	// transaction, the modifications so far must be cleared before
1200 	// returning control to the user - otherwise partial modifications will
1201 	// persist in memory, and eventually get written to disk.
1202 	cancel();
1203 	throw;
1204     }
1205 
1206     // FIXME: this should be done by checking memory usage, not the number of
1207     // changes.
1208     // We could also look at:
1209     // * mod_plists.size()
1210     // * doclens.size()
1211     // * freq_deltas.size()
1212     //
1213     // cout << "+++ mod_plists.size() " << mod_plists.size() <<
1214     //     ", doclens.size() " << doclens.size() <<
1215     //	   ", freq_deltas.size() " << freq_deltas.size() << endl;
1216     if (++change_count >= flush_threshold) {
1217 	flush_postlist_changes();
1218 	if (!transaction_active()) apply();
1219     }
1220 
1221     RETURN(did);
1222 }
1223 
1224 void
delete_document(Xapian::docid did)1225 ChertWritableDatabase::delete_document(Xapian::docid did)
1226 {
1227     LOGCALL_VOID(DB, "ChertWritableDatabase::delete_document", did);
1228     Assert(did != 0);
1229 
1230     if (!termlist_table.is_open())
1231 	throw_termlist_table_close_exception();
1232 
1233     if (rare(modify_shortcut_docid == did)) {
1234 	// The modify_shortcut document can't be used for a modification
1235 	// shortcut now, because it's been deleted!
1236 	modify_shortcut_document = NULL;
1237 	modify_shortcut_docid = 0;
1238     }
1239 
1240     // Remove the record.  If this fails, just propagate the exception since
1241     // the state should still be consistent (most likely it's
1242     // DocNotFoundError).
1243     record_table.delete_record(did);
1244 
1245     try {
1246 	// Remove the values.
1247 	value_manager.delete_document(did, value_stats);
1248 
1249 	// OK, now add entries to remove the postings in the underlying record.
1250 	Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1251 	ChertTermList termlist(ptrtothis, did);
1252 
1253 	stats.delete_document(termlist.get_doclength());
1254 
1255 	termlist.next();
1256 	while (!termlist.at_end()) {
1257 	    string tname = termlist.get_termname();
1258 	    position_table.delete_positionlist(did, tname);
1259 	    termcount wdf = termlist.get_wdf();
1260 
1261 	    add_freq_delta(tname, -1, -wdf);
1262 	    update_mod_plist(did, tname, 'D', 0u);
1263 
1264 	    termlist.next();
1265 	}
1266 
1267 	// Remove the termlist.
1268 	if (termlist_table.is_open())
1269 	    termlist_table.delete_termlist(did);
1270 
1271 	// Mark this document as removed.
1272 	doclens[did] = static_cast<Xapian::termcount>(-1);
1273     } catch (...) {
1274 	// If an error occurs while deleting a document, or doing any other
1275 	// transaction, the modifications so far must be cleared before
1276 	// returning control to the user - otherwise partial modifications will
1277 	// persist in memory, and eventually get written to disk.
1278 	cancel();
1279 	throw;
1280     }
1281 
1282     if (++change_count >= flush_threshold) {
1283 	flush_postlist_changes();
1284 	if (!transaction_active()) apply();
1285     }
1286 }
1287 
1288 void
replace_document(Xapian::docid did,const Xapian::Document & document)1289 ChertWritableDatabase::replace_document(Xapian::docid did,
1290 					const Xapian::Document & document)
1291 {
1292     LOGCALL_VOID(DB, "ChertWritableDatabase::replace_document", did | document);
1293     Assert(did != 0);
1294 
1295     try {
1296 	if (did > stats.get_last_docid()) {
1297 	    stats.set_last_docid(did);
1298 	    // If this docid is above the highwatermark, then we can't be
1299 	    // replacing an existing document.
1300 	    (void)add_document_(did, document);
1301 	    return;
1302 	}
1303 
1304 	if (!termlist_table.is_open()) {
1305 	    // We can replace an *unused* docid <= last_docid too.
1306 	    Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
1307 	    if (!postlist_table.document_exists(did, ptrtothis)) {
1308 		(void)add_document_(did, document);
1309 		return;
1310 	    }
1311 	    throw_termlist_table_close_exception();
1312 	}
1313 
1314 	// Check for a document read from this database being replaced - ie, a
1315 	// modification operation.
1316 	bool modifying = false;
1317 	if (modify_shortcut_docid &&
1318 	    document.internal->get_docid() == modify_shortcut_docid) {
1319 	    if (document.internal.get() == modify_shortcut_document) {
1320 		// We have a docid, it matches, and the pointer matches, so we
1321 		// can skip modification of any data which hasn't been modified
1322 		// in the document.
1323 		if (!document.internal->modified()) {
1324 		    // If the document is unchanged, we've nothing to do.
1325 		    return;
1326 		}
1327 		modifying = true;
1328 		LOGLINE(DB, "Detected potential document modification shortcut.");
1329 	    } else {
1330 		// The modify_shortcut document can't be used for a
1331 		// modification shortcut now, because it's about to be
1332 		// modified.
1333 		modify_shortcut_document = NULL;
1334 		modify_shortcut_docid = 0;
1335 	    }
1336 	}
1337 
1338 	if (!modifying || document.internal->terms_modified()) {
1339 	    bool pos_modified = !modifying ||
1340 				document.internal->term_positions_modified();
1341 	    Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1342 	    ChertTermList termlist(ptrtothis, did);
1343 	    Xapian::TermIterator term = document.termlist_begin();
1344 	    chert_doclen_t old_doclen = termlist.get_doclength();
1345 	    stats.delete_document(old_doclen);
1346 	    chert_doclen_t new_doclen = old_doclen;
1347 
1348 	    string old_tname, new_tname;
1349 
1350 	    termlist.next();
1351 	    while (!termlist.at_end() || term != document.termlist_end()) {
1352 		int cmp;
1353 		if (termlist.at_end()) {
1354 		    cmp = 1;
1355 		    new_tname = *term;
1356 		} else {
1357 		    old_tname = termlist.get_termname();
1358 		    if (term != document.termlist_end()) {
1359 			new_tname = *term;
1360 			cmp = old_tname.compare(new_tname);
1361 		    } else {
1362 			cmp = -1;
1363 		    }
1364 		}
1365 
1366 		if (cmp < 0) {
1367 		    // Term old_tname has been deleted.
1368 		    termcount old_wdf = termlist.get_wdf();
1369 		    new_doclen -= old_wdf;
1370 		    add_freq_delta(old_tname, -1, -old_wdf);
1371 		    if (pos_modified)
1372 			position_table.delete_positionlist(did, old_tname);
1373 		    update_mod_plist(did, old_tname, 'D', 0u);
1374 		    termlist.next();
1375 		} else if (cmp > 0) {
1376 		    // Term new_tname as been added.
1377 		    termcount new_wdf = term.get_wdf();
1378 		    new_doclen += new_wdf;
1379 		    stats.check_wdf(new_wdf);
1380 		    if (new_tname.size() > MAX_SAFE_TERM_LENGTH)
1381 			throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + new_tname);
1382 		    add_freq_delta(new_tname, 1, new_wdf);
1383 		    update_mod_plist(did, new_tname, 'A', new_wdf);
1384 		    if (pos_modified) {
1385 			PositionIterator pos = term.positionlist_begin();
1386 			if (pos != term.positionlist_end()) {
1387 			    position_table.set_positionlist(
1388 				did, new_tname,
1389 				pos, term.positionlist_end(), false);
1390 			}
1391 		    }
1392 		    ++term;
1393 		} else if (cmp == 0) {
1394 		    // Term already exists: look for wdf and positionlist changes.
1395 		    termcount old_wdf = termlist.get_wdf();
1396 		    termcount new_wdf = term.get_wdf();
1397 
1398 		    // Check the stats even if wdf hasn't changed, because
1399 		    // this is the only document, the stats will have been
1400 		    // zeroed.
1401 		    stats.check_wdf(new_wdf);
1402 
1403 		    if (old_wdf != new_wdf) {
1404 		    	new_doclen += new_wdf - old_wdf;
1405 			add_freq_delta(new_tname, 0, new_wdf - old_wdf);
1406 			update_mod_plist(did, new_tname, 'M', new_wdf);
1407 		    }
1408 
1409 		    if (pos_modified) {
1410 			PositionIterator pos = term.positionlist_begin();
1411 			if (pos != term.positionlist_end()) {
1412 			    position_table.set_positionlist(did, new_tname, pos,
1413 							    term.positionlist_end(),
1414 							    true);
1415 			} else {
1416 			    position_table.delete_positionlist(did, new_tname);
1417 			}
1418 		    }
1419 
1420 		    ++term;
1421 		    termlist.next();
1422 		}
1423 	    }
1424 	    LOGLINE(DB, "Calculated doclen for replacement document " << did << " as " << new_doclen);
1425 
1426 	    // Set the termlist.
1427 	    if (termlist_table.is_open())
1428 		termlist_table.set_termlist(did, document, new_doclen);
1429 
1430 	    // Set the new document length
1431 	    if (new_doclen != old_doclen)
1432 		doclens[did] = new_doclen;
1433 	    stats.add_document(new_doclen);
1434 	}
1435 
1436 	if (!modifying || document.internal->data_modified()) {
1437 	    // Replace the record
1438 	    record_table.replace_record(document.get_data(), did);
1439 	}
1440 
1441 	if (!modifying || document.internal->values_modified()) {
1442 	    // Replace the values.
1443 	    value_manager.replace_document(did, document, value_stats);
1444 	}
1445     } catch (const Xapian::DocNotFoundError &) {
1446 	(void)add_document_(did, document);
1447 	return;
1448     } catch (...) {
1449 	// If an error occurs while replacing a document, or doing any other
1450 	// transaction, the modifications so far must be cleared before
1451 	// returning control to the user - otherwise partial modifications will
1452 	// persist in memory, and eventually get written to disk.
1453 	cancel();
1454 	throw;
1455     }
1456 
1457     if (++change_count >= flush_threshold) {
1458 	flush_postlist_changes();
1459 	if (!transaction_active()) apply();
1460     }
1461 }
1462 
1463 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const1464 ChertWritableDatabase::open_document(Xapian::docid did, bool lazy) const
1465 {
1466     LOGCALL(DB, Xapian::Document::Internal *, "ChertWritableDatabase::open_document", did | lazy);
1467     modify_shortcut_document = ChertDatabase::open_document(did, lazy);
1468     // Store the docid only after open_document() successfully returns, so an
1469     // attempt to open a missing document doesn't overwrite this.
1470     modify_shortcut_docid = did;
1471     RETURN(modify_shortcut_document);
1472 }
1473 
1474 Xapian::termcount
get_doclength(Xapian::docid did) const1475 ChertWritableDatabase::get_doclength(Xapian::docid did) const
1476 {
1477     LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_doclength", did);
1478     map<docid, termcount>::const_iterator i = doclens.find(did);
1479     if (i != doclens.end()) {
1480 	Xapian::termcount doclen = i->second;
1481 	if (doclen == static_cast<Xapian::termcount>(-1)) {
1482 	    throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1483 	}
1484 	RETURN(doclen);
1485     }
1486     RETURN(ChertDatabase::get_doclength(did));
1487 }
1488 
1489 Xapian::doccount
get_termfreq(const string & tname) const1490 ChertWritableDatabase::get_termfreq(const string & tname) const
1491 {
1492     LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_termfreq", tname);
1493     Xapian::doccount termfreq = ChertDatabase::get_termfreq(tname);
1494     map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1495     i = freq_deltas.find(tname);
1496     if (i != freq_deltas.end()) termfreq += i->second.first;
1497     RETURN(termfreq);
1498 }
1499 
1500 Xapian::termcount
get_collection_freq(const string & tname) const1501 ChertWritableDatabase::get_collection_freq(const string & tname) const
1502 {
1503     LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_collection_freq", tname);
1504     Xapian::termcount collfreq = ChertDatabase::get_collection_freq(tname);
1505 
1506     map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1507     i = freq_deltas.find(tname);
1508     if (i != freq_deltas.end()) collfreq += i->second.second;
1509 
1510     RETURN(collfreq);
1511 }
1512 
1513 Xapian::doccount
get_value_freq(Xapian::valueno slot) const1514 ChertWritableDatabase::get_value_freq(Xapian::valueno slot) const
1515 {
1516     LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_value_freq", slot);
1517     map<Xapian::valueno, ValueStats>::const_iterator i;
1518     i = value_stats.find(slot);
1519     if (i != value_stats.end()) RETURN(i->second.freq);
1520     RETURN(ChertDatabase::get_value_freq(slot));
1521 }
1522 
1523 std::string
get_value_lower_bound(Xapian::valueno slot) const1524 ChertWritableDatabase::get_value_lower_bound(Xapian::valueno slot) const
1525 {
1526     LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_lower_bound", slot);
1527     map<Xapian::valueno, ValueStats>::const_iterator i;
1528     i = value_stats.find(slot);
1529     if (i != value_stats.end()) RETURN(i->second.lower_bound);
1530     RETURN(ChertDatabase::get_value_lower_bound(slot));
1531 }
1532 
1533 std::string
get_value_upper_bound(Xapian::valueno slot) const1534 ChertWritableDatabase::get_value_upper_bound(Xapian::valueno slot) const
1535 {
1536     LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_upper_bound", slot);
1537     map<Xapian::valueno, ValueStats>::const_iterator i;
1538     i = value_stats.find(slot);
1539     if (i != value_stats.end()) RETURN(i->second.upper_bound);
1540     RETURN(ChertDatabase::get_value_upper_bound(slot));
1541 }
1542 
1543 bool
term_exists(const string & tname) const1544 ChertWritableDatabase::term_exists(const string & tname) const
1545 {
1546     LOGCALL(DB, bool, "ChertWritableDatabase::term_exists", tname);
1547     RETURN(get_termfreq(tname) != 0);
1548 }
1549 
1550 LeafPostList *
open_post_list(const string & tname) const1551 ChertWritableDatabase::open_post_list(const string& tname) const
1552 {
1553     LOGCALL(DB, LeafPostList *, "ChertWritableDatabase::open_post_list", tname);
1554     Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1555 
1556     if (tname.empty()) {
1557 	Xapian::doccount doccount = get_doccount();
1558 	if (stats.get_last_docid() == doccount) {
1559 	    RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
1560 	}
1561 	if (doclens.empty()) {
1562 	    RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
1563 	}
1564 	RETURN(new ChertAllDocsModifiedPostList(ptrtothis, doccount, doclens));
1565     }
1566 
1567     map<string, map<docid, pair<char, termcount> > >::const_iterator j;
1568     j = mod_plists.find(tname);
1569     if (j != mod_plists.end()) {
1570 	// We've got buffered changes to this term's postlist, so we need to
1571 	// use a ChertModifiedPostList.
1572 	RETURN(new ChertModifiedPostList(ptrtothis, tname, j->second));
1573     }
1574 
1575     RETURN(new ChertPostList(ptrtothis, tname, true));
1576 }
1577 
1578 ValueList *
open_value_list(Xapian::valueno slot) const1579 ChertWritableDatabase::open_value_list(Xapian::valueno slot) const
1580 {
1581     LOGCALL(DB, ValueList *, "ChertWritableDatabase::open_value_list", slot);
1582     // If there are changes, we don't have code to iterate the modified value
1583     // list so we need to flush (but don't commit - there may be a transaction
1584     // in progress).
1585     if (change_count) value_manager.merge_changes();
1586     RETURN(ChertDatabase::open_value_list(slot));
1587 }
1588 
1589 TermList *
open_allterms(const string & prefix) const1590 ChertWritableDatabase::open_allterms(const string & prefix) const
1591 {
1592     LOGCALL(DB, TermList *, "ChertWritableDatabase::open_allterms", NO_ARGS);
1593     // If there are changes, terms may have been added or removed, and so we
1594     // need to flush (but don't commit - there may be a transaction in
1595     // progress).
1596     if (change_count) flush_postlist_changes();
1597     RETURN(ChertDatabase::open_allterms(prefix));
1598 }
1599 
1600 void
cancel()1601 ChertWritableDatabase::cancel()
1602 {
1603     ChertDatabase::cancel();
1604     stats.read(postlist_table);
1605     freq_deltas.clear();
1606     doclens.clear();
1607     mod_plists.clear();
1608     value_stats.clear();
1609     change_count = 0;
1610 }
1611 
1612 void
add_spelling(const string & word,Xapian::termcount freqinc) const1613 ChertWritableDatabase::add_spelling(const string & word,
1614 				    Xapian::termcount freqinc) const
1615 {
1616     spelling_table.add_word(word, freqinc);
1617 }
1618 
1619 void
remove_spelling(const string & word,Xapian::termcount freqdec) const1620 ChertWritableDatabase::remove_spelling(const string & word,
1621 				       Xapian::termcount freqdec) const
1622 {
1623     spelling_table.remove_word(word, freqdec);
1624 }
1625 
1626 TermList *
open_spelling_wordlist() const1627 ChertWritableDatabase::open_spelling_wordlist() const
1628 {
1629     spelling_table.merge_changes();
1630     return ChertDatabase::open_spelling_wordlist();
1631 }
1632 
1633 TermList *
open_synonym_keylist(const string & prefix) const1634 ChertWritableDatabase::open_synonym_keylist(const string & prefix) const
1635 {
1636     synonym_table.merge_changes();
1637     return ChertDatabase::open_synonym_keylist(prefix);
1638 }
1639 
1640 void
add_synonym(const string & term,const string & synonym) const1641 ChertWritableDatabase::add_synonym(const string & term,
1642 				   const string & synonym) const
1643 {
1644     synonym_table.add_synonym(term, synonym);
1645 }
1646 
1647 void
remove_synonym(const string & term,const string & synonym) const1648 ChertWritableDatabase::remove_synonym(const string & term,
1649 				      const string & synonym) const
1650 {
1651     synonym_table.remove_synonym(term, synonym);
1652 }
1653 
1654 void
clear_synonyms(const string & term) const1655 ChertWritableDatabase::clear_synonyms(const string & term) const
1656 {
1657     synonym_table.clear_synonyms(term);
1658 }
1659 
1660 void
set_metadata(const string & key,const string & value)1661 ChertWritableDatabase::set_metadata(const string & key, const string & value)
1662 {
1663     LOGCALL(DB, string, "ChertWritableDatabase::set_metadata", key | value);
1664     string btree_key("\x00\xc0", 2);
1665     btree_key += key;
1666     if (value.empty()) {
1667 	postlist_table.del(btree_key);
1668     } else {
1669 	postlist_table.add(btree_key, value);
1670     }
1671 }
1672 
1673 void
invalidate_doc_object(Xapian::Document::Internal * obj) const1674 ChertWritableDatabase::invalidate_doc_object(Xapian::Document::Internal * obj) const
1675 {
1676     if (obj == modify_shortcut_document) {
1677 	modify_shortcut_document = NULL;
1678 	modify_shortcut_docid = 0;
1679     }
1680 }
1681