1 /* chert_database.cc: chert database
2 *
3 * Copyright 1999,2000,2001 BrightStation PLC
4 * Copyright 2001 Hein Ragas
5 * Copyright 2002 Ananova Ltd
6 * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2015 Olly Betts
7 * Copyright 2006,2008 Lemur Consulting Ltd
8 * Copyright 2009,2010 Richard Boulton
9 * Copyright 2009 Kan-Ru Chen
10 * Copyright 2011 Dan Colish
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License as
14 * published by the Free Software Foundation; either version 2 of the
15 * License, or (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
25 * USA
26 */
27
28 #include <config.h>
29
30 #include "chert_database.h"
31
32 #include <xapian/error.h>
33 #include <xapian/valueiterator.h>
34
35 #include "contiguousalldocspostlist.h"
36 #include "chert_alldocsmodifiedpostlist.h"
37 #include "chert_alldocspostlist.h"
38 #include "chert_alltermslist.h"
39 #include "chert_replicate_internal.h"
40 #include "chert_document.h"
41 #include "../flint_lock.h"
42 #include "chert_metadata.h"
43 #include "chert_modifiedpostlist.h"
44 #include "chert_positionlist.h"
45 #include "chert_postlist.h"
46 #include "chert_record.h"
47 #include "chert_spellingwordslist.h"
48 #include "chert_termlist.h"
49 #include "chert_valuelist.h"
50 #include "chert_values.h"
51 #include "debuglog.h"
52 #include "io_utils.h"
53 #include "net/length.h"
54 #include "pack.h"
55 #include "remoteconnection.h"
56 #include "replicate_utils.h"
57 #include "replication.h"
58 #include "replicationprotocol.h"
59 #include "serialise.h"
60 #include "str.h"
61 #include "stringutils.h"
62 #include "utils.h"
63 #include "valuestats.h"
64
65 #ifdef __WIN32__
66 # include "msvc_posix_wrapper.h"
67 #endif
68
69 #include "safeerrno.h"
70 #include "safesysstat.h"
71 #include <sys/types.h>
72
73 #include <algorithm>
74 #include "autoptr.h"
75 #include <cstdlib>
76 #include <string>
77
78 using namespace std;
79 using namespace Xapian;
80
81 // The maximum safe term length is determined by the postlist. There we
82 // store the term using pack_string_preserving_sort() which takes the
83 // length of the string plus an extra byte (assuming the string doesn't
84 // contain any zero bytes), followed by the docid with encoded with
85 // pack_uint_preserving_sort() which takes up to 5 bytes.
86 //
87 // The Btree manager's key length limit is 252 bytes so the maximum safe term
88 // length is 252 - 1 - 5 = 246 bytes. We use 245 rather than 246 for
89 // consistency with flint.
90 //
91 // If the term contains zero bytes, the limit is lower (by one for each zero
92 // byte in the term).
93 #define MAX_SAFE_TERM_LENGTH 245
94
95 /** Maximum number of times to try opening the tables to get them at a
96 * consistent revision.
97 *
98 * This is mostly just to avoid any chance of an infinite loop - normally
99 * we'll either get then on the first or second try.
100 */
101 const int MAX_OPEN_RETRIES = 100;
102
103 /* This finds the tables, opens them at consistent revisions, manages
104 * determining the current and next revision numbers, and stores handles
105 * to the tables.
106 */
ChertDatabase(const string & chert_dir,int action,unsigned int block_size)107 ChertDatabase::ChertDatabase(const string &chert_dir, int action,
108 unsigned int block_size)
109 : db_dir(chert_dir),
110 readonly(action == XAPIAN_DB_READONLY),
111 version_file(db_dir),
112 postlist_table(db_dir, readonly),
113 position_table(db_dir, readonly),
114 termlist_table(db_dir, readonly),
115 value_manager(&postlist_table, &termlist_table),
116 synonym_table(db_dir, readonly),
117 spelling_table(db_dir, readonly),
118 record_table(db_dir, readonly),
119 lock(db_dir),
120 max_changesets(0)
121 {
122 LOGCALL_CTOR(DB, "ChertDatabase", chert_dir | action | block_size);
123
124 if (action == XAPIAN_DB_READONLY) {
125 open_tables_consistent();
126 return;
127 }
128
129 if (action != Xapian::DB_OPEN && !database_exists()) {
130
131 // Create the directory for the database, if it doesn't exist
132 // already.
133 bool fail = false;
134 struct stat statbuf;
135 if (stat(db_dir, &statbuf) == 0) {
136 if (!S_ISDIR(statbuf.st_mode)) fail = true;
137 } else if (errno != ENOENT || mkdir(db_dir, 0755) == -1) {
138 fail = true;
139 }
140 if (fail) {
141 throw Xapian::DatabaseCreateError("Cannot create directory `" +
142 db_dir + "'", errno);
143 }
144 get_database_write_lock(true);
145
146 create_and_open_tables(block_size);
147 return;
148 }
149
150 if (action == Xapian::DB_CREATE) {
151 throw Xapian::DatabaseCreateError("Can't create new database at `" +
152 db_dir + "': a database already exists and I was told "
153 "not to overwrite it");
154 }
155
156 get_database_write_lock(false);
157 // if we're overwriting, pretend the db doesn't exist
158 if (action == Xapian::DB_CREATE_OR_OVERWRITE) {
159 create_and_open_tables(block_size);
160 return;
161 }
162
163 // Get latest consistent version
164 open_tables_consistent();
165
166 // Check that there are no more recent versions of tables. If there
167 // are, perform recovery by writing a new revision number to all
168 // tables.
169 if (record_table.get_open_revision_number() !=
170 postlist_table.get_latest_revision_number()) {
171 chert_revision_number_t new_revision = get_next_revision_number();
172
173 set_revision_number(new_revision);
174 }
175 }
176
~ChertDatabase()177 ChertDatabase::~ChertDatabase()
178 {
179 LOGCALL_DTOR(DB, "~ChertDatabase");
180 }
181
182 bool
database_exists()183 ChertDatabase::database_exists() {
184 LOGCALL(DB, bool, "ChertDatabase::database_exists", NO_ARGS);
185 RETURN(record_table.exists() && postlist_table.exists());
186 }
187
188 void
create_and_open_tables(unsigned int block_size)189 ChertDatabase::create_and_open_tables(unsigned int block_size)
190 {
191 LOGCALL_VOID(DB, "ChertDatabase::create_and_open_tables", NO_ARGS);
192 // The caller is expected to create the database directory if it doesn't
193 // already exist.
194
195 // Create postlist_table first, and record_table last. Existence of
196 // record_table is considered to imply existence of the database.
197 version_file.create();
198 postlist_table.create_and_open(block_size);
199 position_table.create_and_open(block_size);
200 termlist_table.create_and_open(block_size);
201 synonym_table.create_and_open(block_size);
202 spelling_table.create_and_open(block_size);
203 record_table.create_and_open(block_size);
204
205 Assert(database_exists());
206
207 // Check consistency
208 chert_revision_number_t revision = record_table.get_open_revision_number();
209 if (revision != postlist_table.get_open_revision_number()) {
210 throw Xapian::DatabaseCreateError("Newly created tables are not in consistent state");
211 }
212
213 stats.zero();
214 }
215
216 void
open_tables_consistent()217 ChertDatabase::open_tables_consistent()
218 {
219 LOGCALL_VOID(DB, "ChertDatabase::open_tables_consistent", NO_ARGS);
220 // Open record_table first, since it's the last to be written to,
221 // and hence if a revision is available in it, it should be available
222 // in all the other tables (unless they've moved on already).
223 //
224 // If we find that a table can't open the desired revision, we
225 // go back and open record_table again, until record_table has
226 // the same revision as the last time we opened it.
227
228 chert_revision_number_t cur_rev = record_table.get_open_revision_number();
229
230 // Check the version file unless we're reopening.
231 if (cur_rev == 0) version_file.read_and_check();
232
233 record_table.open();
234 chert_revision_number_t revision = record_table.get_open_revision_number();
235
236 if (cur_rev && cur_rev == revision) {
237 // We're reopening a database and the revision hasn't changed so we
238 // don't need to do anything.
239 return;
240 }
241
242 // Set the block_size for optional tables as they may not currently exist.
243 unsigned int block_size = record_table.get_block_size();
244 position_table.set_block_size(block_size);
245 termlist_table.set_block_size(block_size);
246 synonym_table.set_block_size(block_size);
247 spelling_table.set_block_size(block_size);
248
249 value_manager.reset();
250
251 bool fully_opened = false;
252 int tries_left = MAX_OPEN_RETRIES;
253 while (!fully_opened && (tries_left--) > 0) {
254 if (spelling_table.open(revision) &&
255 synonym_table.open(revision) &&
256 termlist_table.open(revision) &&
257 position_table.open(revision) &&
258 postlist_table.open(revision)) {
259 // Everything now open at the same revision.
260 fully_opened = true;
261 } else {
262 // Couldn't open consistent revision: two cases possible:
263 // i) An update has completed and a second one has begun since
264 // record was opened. This leaves a consistent revision
265 // available, but not the one we were trying to open.
266 // ii) Tables have become corrupt / have no consistent revision
267 // available. In this case, updates must have ceased.
268 //
269 // So, we reopen the record table, and check its revision number,
270 // if it's changed we try the opening again, otherwise we give up.
271 //
272 record_table.open();
273 chert_revision_number_t newrevision =
274 record_table.get_open_revision_number();
275 if (revision == newrevision) {
276 // Revision number hasn't changed - therefore a second index
277 // sweep hasn't begun and the system must have failed. Database
278 // is inconsistent.
279 throw Xapian::DatabaseCorruptError("Cannot open tables at consistent revisions");
280 }
281 revision = newrevision;
282 }
283 }
284
285 if (!fully_opened) {
286 throw Xapian::DatabaseModifiedError("Cannot open tables at stable revision - changing too fast");
287 }
288
289 stats.read(postlist_table);
290 }
291
292 void
open_tables(chert_revision_number_t revision)293 ChertDatabase::open_tables(chert_revision_number_t revision)
294 {
295 LOGCALL_VOID(DB, "ChertDatabase::open_tables", revision);
296 version_file.read_and_check();
297 record_table.open(revision);
298
299 // Set the block_size for optional tables as they may not currently exist.
300 unsigned int block_size = record_table.get_block_size();
301 position_table.set_block_size(block_size);
302 termlist_table.set_block_size(block_size);
303 synonym_table.set_block_size(block_size);
304 spelling_table.set_block_size(block_size);
305
306 value_manager.reset();
307
308 spelling_table.open(revision);
309 synonym_table.open(revision);
310 termlist_table.open(revision);
311 position_table.open(revision);
312 postlist_table.open(revision);
313 }
314
315 chert_revision_number_t
get_revision_number() const316 ChertDatabase::get_revision_number() const
317 {
318 LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_revision_number", NO_ARGS);
319 // We could use any table here, theoretically.
320 RETURN(postlist_table.get_open_revision_number());
321 }
322
323 chert_revision_number_t
get_next_revision_number() const324 ChertDatabase::get_next_revision_number() const
325 {
326 LOGCALL(DB, chert_revision_number_t, "ChertDatabase::get_next_revision_number", NO_ARGS);
327 /* We _must_ use postlist_table here, since it is always the first
328 * to be written, and hence will have the greatest available revision
329 * number.
330 */
331 chert_revision_number_t new_revision =
332 postlist_table.get_latest_revision_number();
333 ++new_revision;
334 RETURN(new_revision);
335 }
336
337 void
get_changeset_revisions(const string & path,chert_revision_number_t * startrev,chert_revision_number_t * endrev) const338 ChertDatabase::get_changeset_revisions(const string & path,
339 chert_revision_number_t * startrev,
340 chert_revision_number_t * endrev) const
341 {
342 int changes_fd = -1;
343 #ifdef __WIN32__
344 changes_fd = msvc_posix_open(path.c_str(), O_RDONLY | O_BINARY);
345 #else
346 changes_fd = open(path.c_str(), O_RDONLY | O_BINARY);
347 #endif
348 fdcloser closer(changes_fd);
349
350 if (changes_fd < 0) {
351 string message = string("Couldn't open changeset ")
352 + path + " to read";
353 throw Xapian::DatabaseError(message, errno);
354 }
355
356 char buf[REASONABLE_CHANGESET_SIZE];
357 const char *start = buf;
358 const char *end = buf + io_read(changes_fd, buf,
359 REASONABLE_CHANGESET_SIZE, 0);
360 if (size_t(end - start) < CONST_STRLEN(CHANGES_MAGIC_STRING))
361 throw Xapian::DatabaseError("Changeset too short at " + path);
362 if (memcmp(start, CHANGES_MAGIC_STRING,
363 CONST_STRLEN(CHANGES_MAGIC_STRING)) != 0) {
364 string message = string("Changeset at ")
365 + path + " does not contain valid magic string";
366 throw Xapian::DatabaseError(message);
367 }
368 start += CONST_STRLEN(CHANGES_MAGIC_STRING);
369
370 unsigned int changes_version;
371 if (!unpack_uint(&start, end, &changes_version))
372 throw Xapian::DatabaseError("Couldn't read a valid version number for "
373 "changeset at " + path);
374 if (changes_version != CHANGES_VERSION)
375 throw Xapian::DatabaseError("Don't support version of changeset at "
376 + path);
377
378 if (!unpack_uint(&start, end, startrev))
379 throw Xapian::DatabaseError("Couldn't read a valid start revision from "
380 "changeset at " + path);
381
382 if (!unpack_uint(&start, end, endrev))
383 throw Xapian::DatabaseError("Couldn't read a valid end revision for "
384 "changeset at " + path);
385 }
386
387 void
set_revision_number(chert_revision_number_t new_revision)388 ChertDatabase::set_revision_number(chert_revision_number_t new_revision)
389 {
390 LOGCALL_VOID(DB, "ChertDatabase::set_revision_number", new_revision);
391
392 value_manager.merge_changes();
393
394 postlist_table.flush_db();
395 position_table.flush_db();
396 termlist_table.flush_db();
397 synonym_table.flush_db();
398 spelling_table.flush_db();
399 record_table.flush_db();
400
401 int changes_fd = -1;
402 string changes_name;
403
404 const char *p = getenv("XAPIAN_MAX_CHANGESETS");
405 if (p) {
406 max_changesets = atoi(p);
407 } else {
408 max_changesets = 0;
409 }
410
411 if (max_changesets > 0) {
412 chert_revision_number_t old_revision = get_revision_number();
413 if (old_revision) {
414 // Don't generate a changeset for the first revision.
415 changes_fd = create_changeset_file(db_dir,
416 "/changes" + str(old_revision),
417 changes_name);
418 }
419 }
420
421 try {
422 fdcloser closefd(changes_fd);
423 if (changes_fd >= 0) {
424 string buf;
425 chert_revision_number_t old_revision = get_revision_number();
426 buf += CHANGES_MAGIC_STRING;
427 pack_uint(buf, CHANGES_VERSION);
428 pack_uint(buf, old_revision);
429 pack_uint(buf, new_revision);
430
431 #ifndef DANGEROUS
432 buf += '\x00'; // Changes can be applied to a live database.
433 #else
434 buf += '\x01';
435 #endif
436
437 io_write(changes_fd, buf.data(), buf.size());
438
439 // Write the changes to the blocks in the tables. Do the postlist
440 // table last, so that ends up cached the most, if the cache
441 // available is limited. Do the position table just before that
442 // as having that cached will also improve search performance.
443 termlist_table.write_changed_blocks(changes_fd);
444 synonym_table.write_changed_blocks(changes_fd);
445 spelling_table.write_changed_blocks(changes_fd);
446 record_table.write_changed_blocks(changes_fd);
447 position_table.write_changed_blocks(changes_fd);
448 postlist_table.write_changed_blocks(changes_fd);
449 }
450
451 postlist_table.commit(new_revision, changes_fd);
452 position_table.commit(new_revision, changes_fd);
453 termlist_table.commit(new_revision, changes_fd);
454 synonym_table.commit(new_revision, changes_fd);
455 spelling_table.commit(new_revision, changes_fd);
456
457 string changes_tail; // Data to be appended to the changes file
458 if (changes_fd >= 0) {
459 changes_tail += '\0';
460 pack_uint(changes_tail, new_revision);
461 }
462 record_table.commit(new_revision, changes_fd, &changes_tail);
463
464 } catch (...) {
465 // Remove the changeset, if there was one.
466 if (changes_fd >= 0) {
467 (void)io_unlink(changes_name);
468 }
469
470 throw;
471 }
472
473 if (changes_fd >= 0 && max_changesets < new_revision) {
474 // While change sets less than N - max_changesets exist, delete them
475 // 1 must be subtracted so we don't delete the changeset we just wrote
476 // when max_changesets = 1
477 unsigned rev = new_revision - max_changesets - 1;
478 while (io_unlink(db_dir + "/changes" + str(rev--))) { }
479 }
480 }
481
482 void
reopen()483 ChertDatabase::reopen()
484 {
485 LOGCALL_VOID(DB, "ChertDatabase::reopen", NO_ARGS);
486 if (readonly) open_tables_consistent();
487 }
488
489 void
close()490 ChertDatabase::close()
491 {
492 LOGCALL_VOID(DB, "ChertDatabase::close", NO_ARGS);
493 postlist_table.close(true);
494 position_table.close(true);
495 termlist_table.close(true);
496 synonym_table.close(true);
497 spelling_table.close(true);
498 record_table.close(true);
499 lock.release();
500 }
501
502 void
get_database_write_lock(bool creating)503 ChertDatabase::get_database_write_lock(bool creating)
504 {
505 LOGCALL_VOID(DB, "ChertDatabase::get_database_write_lock", creating);
506 string explanation;
507 FlintLock::reason why = lock.lock(true, explanation);
508 if (why != FlintLock::SUCCESS) {
509 if (why == FlintLock::UNKNOWN && !creating && !database_exists()) {
510 string msg("No chert database found at path `");
511 msg += db_dir;
512 msg += '\'';
513 throw Xapian::DatabaseOpeningError(msg);
514 }
515 lock.throw_databaselockerror(why, db_dir, explanation);
516 }
517 }
518
519 void
send_whole_database(RemoteConnection & conn,double end_time)520 ChertDatabase::send_whole_database(RemoteConnection & conn, double end_time)
521 {
522 LOGCALL_VOID(DB, "ChertDatabase::send_whole_database", conn | end_time);
523
524 // Send the current revision number in the header.
525 string buf;
526 string uuid = get_uuid();
527 buf += encode_length(uuid.size());
528 buf += uuid;
529 pack_uint(buf, get_revision_number());
530 conn.send_message(REPL_REPLY_DB_HEADER, buf, end_time);
531
532 // Send all the tables. The tables which we want to be cached best after
533 // the copy finished are sent last.
534 static const char filenames[] =
535 "\x0b""termlist.DB""\x0e""termlist.baseA\x0e""termlist.baseB"
536 "\x0a""synonym.DB""\x0d""synonym.baseA\x0d""synonym.baseB"
537 "\x0b""spelling.DB""\x0e""spelling.baseA\x0e""spelling.baseB"
538 "\x09""record.DB""\x0c""record.baseA\x0c""record.baseB"
539 "\x0b""position.DB""\x0e""position.baseA\x0e""position.baseB"
540 "\x0b""postlist.DB""\x0e""postlist.baseA\x0e""postlist.baseB"
541 "\x08""iamchert";
542 string filepath = db_dir;
543 filepath += '/';
544 for (const char * p = filenames; *p; p += *p + 1) {
545 string leaf(p + 1, size_t(static_cast<unsigned char>(*p)));
546 filepath.replace(db_dir.size() + 1, string::npos, leaf);
547 #ifdef __WIN32__
548 int fd = msvc_posix_open(filepath.c_str(), O_RDONLY | O_BINARY);
549 #else
550 int fd = open(filepath.c_str(), O_RDONLY | O_BINARY);
551 #endif
552 if (fd >= 0) {
553 fdcloser closefd(fd);
554 conn.send_message(REPL_REPLY_DB_FILENAME, leaf, end_time);
555 conn.send_file(REPL_REPLY_DB_FILEDATA, fd, end_time);
556 }
557 }
558 }
559
560 void
write_changesets_to_fd(int fd,const string & revision,bool need_whole_db,ReplicationInfo * info)561 ChertDatabase::write_changesets_to_fd(int fd,
562 const string & revision,
563 bool need_whole_db,
564 ReplicationInfo * info)
565 {
566 LOGCALL_VOID(DB, "ChertDatabase::write_changesets_to_fd", fd | revision | need_whole_db | info);
567
568 int whole_db_copies_left = MAX_DB_COPIES_PER_CONVERSATION;
569 chert_revision_number_t start_rev_num = 0;
570 string start_uuid = get_uuid();
571
572 chert_revision_number_t needed_rev_num = 0;
573
574 const char * rev_ptr = revision.data();
575 const char * rev_end = rev_ptr + revision.size();
576 if (!unpack_uint(&rev_ptr, rev_end, &start_rev_num)) {
577 need_whole_db = true;
578 }
579
580 RemoteConnection conn(-1, fd, string());
581
582 // While the starting revision number is less than the latest revision
583 // number, look for a changeset, and write it.
584 //
585 // FIXME - perhaps we should make hardlinks for all the changesets we're
586 // likely to need, first, and then start sending them, so that there's no
587 // risk of them disappearing while we're sending earlier ones.
588 while (true) {
589 if (need_whole_db) {
590 // Decrease the counter of copies left to be sent, and fail
591 // if we've already copied the database enough. This ensures that
592 // synchronisation attempts always terminate eventually.
593 if (whole_db_copies_left == 0) {
594 conn.send_message(REPL_REPLY_FAIL,
595 "Database changing too fast",
596 0.0);
597 return;
598 }
599 whole_db_copies_left--;
600
601 // Send the whole database across.
602 start_rev_num = get_revision_number();
603 start_uuid = get_uuid();
604
605 send_whole_database(conn, 0.0);
606 if (info != NULL)
607 ++(info->fullcopy_count);
608
609 need_whole_db = false;
610
611 reopen();
612 if (start_uuid == get_uuid()) {
613 // Send the latest revision number after sending the tables.
614 // The update must proceed to that revision number before the
615 // copy is safe to make live.
616
617 string buf;
618 needed_rev_num = get_revision_number();
619 pack_uint(buf, needed_rev_num);
620 conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
621 if (info != NULL && start_rev_num == needed_rev_num)
622 info->changed = true;
623 } else {
624 // Database has been replaced since we did the copy. Send a
625 // higher revision number than the revision we've just copied,
626 // so that the client doesn't make the copy we've just done
627 // live, and then mark that we need to do a copy again.
628 // The client will never actually get the required revision,
629 // because the next message is going to be the start of a new
630 // database transfer.
631
632 string buf;
633 pack_uint(buf, start_rev_num + 1);
634 conn.send_message(REPL_REPLY_DB_FOOTER, buf, 0.0);
635 need_whole_db = true;
636 }
637 } else {
638 // Check if we've sent all the updates.
639 if (start_rev_num >= get_revision_number()) {
640 reopen();
641 if (start_uuid != get_uuid()) {
642 need_whole_db = true;
643 continue;
644 }
645 if (start_rev_num >= get_revision_number()) {
646 break;
647 }
648 }
649
650 // Look for the changeset for revision start_rev_num.
651 string changes_name = db_dir + "/changes" + str(start_rev_num);
652 #ifdef __WIN32__
653 int fd_changes = msvc_posix_open(changes_name.c_str(), O_RDONLY | O_BINARY);
654 #else
655 int fd_changes = open(changes_name.c_str(), O_RDONLY | O_BINARY);
656 #endif
657 if (fd_changes >= 0) {
658 fdcloser closefd(fd_changes);
659
660 // Send it, and also update start_rev_num to the new value
661 // specified in the changeset.
662 chert_revision_number_t changeset_start_rev_num;
663 chert_revision_number_t changeset_end_rev_num;
664 get_changeset_revisions(changes_name,
665 &changeset_start_rev_num,
666 &changeset_end_rev_num);
667 if (changeset_start_rev_num != start_rev_num) {
668 throw Xapian::DatabaseError("Changeset start revision does not match changeset filename");
669 }
670 if (changeset_start_rev_num >= changeset_end_rev_num) {
671 throw Xapian::DatabaseError("Changeset start revision is not less than end revision");
672 }
673
674 conn.send_file(REPL_REPLY_CHANGESET, fd_changes, 0.0);
675 start_rev_num = changeset_end_rev_num;
676 if (info != NULL) {
677 ++(info->changeset_count);
678 if (start_rev_num >= needed_rev_num)
679 info->changed = true;
680 }
681 } else {
682 // The changeset doesn't exist: leave the revision number as it
683 // is, and mark for doing a full database copy.
684 need_whole_db = true;
685 }
686 }
687 }
688 conn.send_message(REPL_REPLY_END_OF_CHANGES, string(), 0.0);
689 }
690
691 void
modifications_failed(chert_revision_number_t old_revision,chert_revision_number_t new_revision,const std::string & msg)692 ChertDatabase::modifications_failed(chert_revision_number_t old_revision,
693 chert_revision_number_t new_revision,
694 const std::string & msg)
695 {
696 // Modifications failed. Wipe all the modifications from memory.
697 try {
698 // Discard any buffered changes and reinitialised cached values
699 // from the table.
700 cancel();
701
702 // Reopen tables with old revision number.
703 open_tables(old_revision);
704
705 // Increase revision numbers to new revision number plus one,
706 // writing increased numbers to all tables.
707 ++new_revision;
708 set_revision_number(new_revision);
709 } catch (const Xapian::Error &e) {
710 // We can't get the database into a consistent state, so close
711 // it to avoid the risk of database corruption.
712 ChertDatabase::close();
713 throw Xapian::DatabaseError("Modifications failed (" + msg +
714 "), and cannot set consistent table "
715 "revision numbers: " + e.get_msg());
716 }
717 }
718
719 void
apply()720 ChertDatabase::apply()
721 {
722 LOGCALL_VOID(DB, "ChertDatabase::apply", NO_ARGS);
723 if (!postlist_table.is_modified() &&
724 !position_table.is_modified() &&
725 !termlist_table.is_modified() &&
726 !value_manager.is_modified() &&
727 !synonym_table.is_modified() &&
728 !spelling_table.is_modified() &&
729 !record_table.is_modified()) {
730 return;
731 }
732
733 chert_revision_number_t old_revision = get_revision_number();
734 chert_revision_number_t new_revision = get_next_revision_number();
735
736 try {
737 set_revision_number(new_revision);
738 } catch (const Xapian::Error &e) {
739 modifications_failed(old_revision, new_revision, e.get_description());
740 throw;
741 } catch (...) {
742 modifications_failed(old_revision, new_revision, "Unknown error");
743 throw;
744 }
745 }
746
747 void
cancel()748 ChertDatabase::cancel()
749 {
750 LOGCALL_VOID(DB, "ChertDatabase::cancel", NO_ARGS);
751 postlist_table.cancel();
752 position_table.cancel();
753 termlist_table.cancel();
754 value_manager.cancel();
755 synonym_table.cancel();
756 spelling_table.cancel();
757 record_table.cancel();
758 }
759
760 Xapian::doccount
get_doccount() const761 ChertDatabase::get_doccount() const
762 {
763 LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_doccount", NO_ARGS);
764 RETURN(record_table.get_doccount());
765 }
766
767 Xapian::docid
get_lastdocid() const768 ChertDatabase::get_lastdocid() const
769 {
770 LOGCALL(DB, Xapian::docid, "ChertDatabase::get_lastdocid", NO_ARGS);
771 RETURN(stats.get_last_docid());
772 }
773
774 totlen_t
get_total_length() const775 ChertDatabase::get_total_length() const
776 {
777 LOGCALL(DB, totlen_t, "ChertDatabase::get_total_length", NO_ARGS);
778 RETURN(stats.get_total_doclen());
779 }
780
781 Xapian::termcount
get_doclength(Xapian::docid did) const782 ChertDatabase::get_doclength(Xapian::docid did) const
783 {
784 LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_doclength", did);
785 Assert(did != 0);
786 Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
787 RETURN(postlist_table.get_doclength(did, ptrtothis));
788 }
789
790 Xapian::doccount
get_termfreq(const string & term) const791 ChertDatabase::get_termfreq(const string & term) const
792 {
793 LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_termfreq", term);
794 Assert(!term.empty());
795 RETURN(postlist_table.get_termfreq(term));
796 }
797
798 Xapian::termcount
get_collection_freq(const string & term) const799 ChertDatabase::get_collection_freq(const string & term) const
800 {
801 LOGCALL(DB, Xapian::termcount, "ChertDatabase::get_collection_freq", term);
802 Assert(!term.empty());
803 RETURN(postlist_table.get_collection_freq(term));
804 }
805
806 Xapian::doccount
get_value_freq(Xapian::valueno slot) const807 ChertDatabase::get_value_freq(Xapian::valueno slot) const
808 {
809 LOGCALL(DB, Xapian::doccount, "ChertDatabase::get_value_freq", slot);
810 RETURN(value_manager.get_value_freq(slot));
811 }
812
813 std::string
get_value_lower_bound(Xapian::valueno slot) const814 ChertDatabase::get_value_lower_bound(Xapian::valueno slot) const
815 {
816 LOGCALL(DB, std::string, "ChertDatabase::get_value_lower_bound", slot);
817 RETURN(value_manager.get_value_lower_bound(slot));
818 }
819
820 std::string
get_value_upper_bound(Xapian::valueno slot) const821 ChertDatabase::get_value_upper_bound(Xapian::valueno slot) const
822 {
823 LOGCALL(DB, std::string, "ChertDatabase::get_value_upper_bound", slot);
824 RETURN(value_manager.get_value_upper_bound(slot));
825 }
826
827 Xapian::termcount
get_doclength_lower_bound() const828 ChertDatabase::get_doclength_lower_bound() const
829 {
830 return stats.get_doclength_lower_bound();
831 }
832
833 Xapian::termcount
get_doclength_upper_bound() const834 ChertDatabase::get_doclength_upper_bound() const
835 {
836 return stats.get_doclength_upper_bound();
837 }
838
839 Xapian::termcount
get_wdf_upper_bound(const string & term) const840 ChertDatabase::get_wdf_upper_bound(const string & term) const
841 {
842 return min(get_collection_freq(term), stats.get_wdf_upper_bound());
843 }
844
845 bool
term_exists(const string & term) const846 ChertDatabase::term_exists(const string & term) const
847 {
848 LOGCALL(DB, bool, "ChertDatabase::term_exists", term);
849 Assert(!term.empty());
850 RETURN(postlist_table.term_exists(term));
851 }
852
853 bool
has_positions() const854 ChertDatabase::has_positions() const
855 {
856 return !position_table.empty();
857 }
858
859 LeafPostList *
open_post_list(const string & term) const860 ChertDatabase::open_post_list(const string& term) const
861 {
862 LOGCALL(DB, LeafPostList *, "ChertDatabase::open_post_list", term);
863 Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
864
865 if (term.empty()) {
866 Xapian::doccount doccount = get_doccount();
867 if (stats.get_last_docid() == doccount) {
868 RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
869 }
870 RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
871 }
872
873 RETURN(new ChertPostList(ptrtothis, term, true));
874 }
875
876 ValueList *
open_value_list(Xapian::valueno slot) const877 ChertDatabase::open_value_list(Xapian::valueno slot) const
878 {
879 LOGCALL(DB, ValueList *, "ChertDatabase::open_value_list", slot);
880 Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
881 RETURN(new ChertValueList(slot, ptrtothis));
882 }
883
884 TermList *
open_term_list(Xapian::docid did) const885 ChertDatabase::open_term_list(Xapian::docid did) const
886 {
887 LOGCALL(DB, TermList *, "ChertDatabase::open_term_list", did);
888 Assert(did != 0);
889 if (!termlist_table.is_open())
890 throw_termlist_table_close_exception();
891 Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
892 RETURN(new ChertTermList(ptrtothis, did));
893 }
894
895 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const896 ChertDatabase::open_document(Xapian::docid did, bool lazy) const
897 {
898 LOGCALL(DB, Xapian::Document::Internal *, "ChertDatabase::open_document", did | lazy);
899 Assert(did != 0);
900 if (!lazy) {
901 // This will throw DocNotFoundError if the document doesn't exist.
902 (void)get_doclength(did);
903 }
904
905 Xapian::Internal::RefCntPtr<const Database::Internal> ptrtothis(this);
906 RETURN(new ChertDocument(ptrtothis, did, &value_manager, &record_table));
907 }
908
909 PositionList *
open_position_list(Xapian::docid did,const string & term) const910 ChertDatabase::open_position_list(Xapian::docid did, const string & term) const
911 {
912 Assert(did != 0);
913
914 AutoPtr<ChertPositionList> poslist(new ChertPositionList);
915 if (!poslist->read_data(&position_table, did, term)) {
916 // As of 1.1.0, we don't check if the did and term exist - we just
917 // return an empty positionlist. If the user really needs to know,
918 // they can check for themselves.
919 }
920
921 return poslist.release();
922 }
923
924 TermList *
open_allterms(const string & prefix) const925 ChertDatabase::open_allterms(const string & prefix) const
926 {
927 LOGCALL(DB, TermList *, "ChertDatabase::open_allterms", NO_ARGS);
928 RETURN(new ChertAllTermsList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
929 prefix));
930 }
931
932 TermList *
open_spelling_termlist(const string & word) const933 ChertDatabase::open_spelling_termlist(const string & word) const
934 {
935 return spelling_table.open_termlist(word);
936 }
937
938 TermList *
open_spelling_wordlist() const939 ChertDatabase::open_spelling_wordlist() const
940 {
941 ChertCursor * cursor = spelling_table.cursor_get();
942 if (!cursor) return NULL;
943 return new ChertSpellingWordsList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
944 cursor);
945 }
946
947 Xapian::doccount
get_spelling_frequency(const string & word) const948 ChertDatabase::get_spelling_frequency(const string & word) const
949 {
950 return spelling_table.get_word_frequency(word);
951 }
952
953 TermList *
open_synonym_termlist(const string & term) const954 ChertDatabase::open_synonym_termlist(const string & term) const
955 {
956 return synonym_table.open_termlist(term);
957 }
958
959 TermList *
open_synonym_keylist(const string & prefix) const960 ChertDatabase::open_synonym_keylist(const string & prefix) const
961 {
962 ChertCursor * cursor = synonym_table.cursor_get();
963 if (!cursor) return NULL;
964 return new ChertSynonymTermList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
965 cursor, prefix);
966 }
967
968 string
get_metadata(const string & key) const969 ChertDatabase::get_metadata(const string & key) const
970 {
971 LOGCALL(DB, string, "ChertDatabase::get_metadata", key);
972 string btree_key("\x00\xc0", 2);
973 btree_key += key;
974 string tag;
975 (void)postlist_table.get_exact_entry(btree_key, tag);
976 RETURN(tag);
977 }
978
979 TermList *
open_metadata_keylist(const std::string & prefix) const980 ChertDatabase::open_metadata_keylist(const std::string &prefix) const
981 {
982 LOGCALL(DB, TermList *, "ChertDatabase::open_metadata_keylist", NO_ARGS);
983 ChertCursor * cursor = postlist_table.cursor_get();
984 RETURN(new ChertMetadataTermList(Xapian::Internal::RefCntPtr<const ChertDatabase>(this),
985 cursor, prefix));
986 }
987
988 string
get_revision_info() const989 ChertDatabase::get_revision_info() const
990 {
991 LOGCALL(DB, string, "ChertDatabase::get_revision_info", NO_ARGS);
992 string buf;
993 pack_uint(buf, get_revision_number());
994 RETURN(buf);
995 }
996
997 string
get_uuid() const998 ChertDatabase::get_uuid() const
999 {
1000 LOGCALL(DB, string, "ChertDatabase::get_uuid", NO_ARGS);
1001 RETURN(version_file.get_uuid_string());
1002 }
1003
1004 void
throw_termlist_table_close_exception() const1005 ChertDatabase::throw_termlist_table_close_exception() const
1006 {
1007 // Either the database has been closed, or else there's no termlist table.
1008 // Check if the postlist table is open to determine which is the case.
1009 if (!postlist_table.is_open())
1010 ChertTable::throw_database_closed();
1011 throw Xapian::FeatureUnavailableError("Database has no termlist");
1012 }
1013
1014 ///////////////////////////////////////////////////////////////////////////
1015
ChertWritableDatabase(const string & dir,int action,int block_size)1016 ChertWritableDatabase::ChertWritableDatabase(const string &dir, int action,
1017 int block_size)
1018 : ChertDatabase(dir, action, block_size),
1019 freq_deltas(),
1020 doclens(),
1021 mod_plists(),
1022 change_count(0),
1023 flush_threshold(0),
1024 modify_shortcut_document(NULL),
1025 modify_shortcut_docid(0)
1026 {
1027 LOGCALL_CTOR(DB, "ChertWritableDatabase", dir | action | block_size);
1028
1029 const char *p = getenv("XAPIAN_FLUSH_THRESHOLD");
1030 if (p)
1031 flush_threshold = atoi(p);
1032 if (flush_threshold == 0)
1033 flush_threshold = 10000;
1034 }
1035
~ChertWritableDatabase()1036 ChertWritableDatabase::~ChertWritableDatabase()
1037 {
1038 LOGCALL_DTOR(DB, "~ChertWritableDatabase");
1039 dtor_called();
1040 }
1041
1042 void
commit()1043 ChertWritableDatabase::commit()
1044 {
1045 if (transaction_active())
1046 throw Xapian::InvalidOperationError("Can't commit during a transaction");
1047 if (change_count) flush_postlist_changes();
1048 apply();
1049 }
1050
1051 void
flush_postlist_changes() const1052 ChertWritableDatabase::flush_postlist_changes() const
1053 {
1054 postlist_table.merge_changes(mod_plists, doclens, freq_deltas);
1055 stats.write(postlist_table);
1056
1057 freq_deltas.clear();
1058 doclens.clear();
1059 mod_plists.clear();
1060 change_count = 0;
1061 }
1062
1063 void
close()1064 ChertWritableDatabase::close()
1065 {
1066 LOGCALL_VOID(DB, "ChertWritableDatabase::close", NO_ARGS);
1067 if (!transaction_active()) {
1068 commit();
1069 // FIXME: if commit() throws, should we still close?
1070 }
1071 ChertDatabase::close();
1072 }
1073
1074 void
apply()1075 ChertWritableDatabase::apply()
1076 {
1077 value_manager.set_value_stats(value_stats);
1078 ChertDatabase::apply();
1079 }
1080
1081 void
add_freq_delta(const string & tname,Xapian::termcount_diff tf_delta,Xapian::termcount_diff cf_delta)1082 ChertWritableDatabase::add_freq_delta(const string & tname,
1083 Xapian::termcount_diff tf_delta,
1084 Xapian::termcount_diff cf_delta)
1085 {
1086 map<string, pair<termcount_diff, termcount_diff> >::iterator i;
1087 i = freq_deltas.find(tname);
1088 if (i == freq_deltas.end()) {
1089 freq_deltas.insert(make_pair(tname, make_pair(tf_delta, cf_delta)));
1090 } else {
1091 i->second.first += tf_delta;
1092 i->second.second += cf_delta;
1093 }
1094 }
1095
1096 void
insert_mod_plist(Xapian::docid did,const string & tname,Xapian::termcount wdf)1097 ChertWritableDatabase::insert_mod_plist(Xapian::docid did,
1098 const string & tname,
1099 Xapian::termcount wdf)
1100 {
1101 // Find or make the appropriate entry in mod_plists.
1102 map<string, map<docid, pair<char, termcount> > >::iterator j;
1103 j = mod_plists.find(tname);
1104 if (j == mod_plists.end()) {
1105 map<docid, pair<char, termcount> > m;
1106 j = mod_plists.insert(make_pair(tname, m)).first;
1107 }
1108 j->second[did] = make_pair('A', wdf);
1109 }
1110
1111 void
update_mod_plist(Xapian::docid did,const string & tname,char type,Xapian::termcount wdf)1112 ChertWritableDatabase::update_mod_plist(Xapian::docid did,
1113 const string & tname,
1114 char type,
1115 Xapian::termcount wdf)
1116 {
1117 // Find or make the appropriate entry in mod_plists.
1118 map<string, map<docid, pair<char, termcount> > >::iterator j;
1119 j = mod_plists.find(tname);
1120 if (j == mod_plists.end()) {
1121 map<docid, pair<char, termcount> > m;
1122 j = mod_plists.insert(make_pair(tname, m)).first;
1123 }
1124
1125 map<docid, pair<char, termcount> >::iterator k;
1126 k = j->second.find(did);
1127 if (k == j->second.end()) {
1128 j->second.insert(make_pair(did, make_pair(type, wdf)));
1129 } else {
1130 if (type == 'A') {
1131 // Adding an entry which has already been deleted.
1132 Assert(k->second.first == 'D');
1133 type = 'M';
1134 }
1135 k->second = make_pair(type, wdf);
1136 }
1137 }
1138
1139 Xapian::docid
add_document(const Xapian::Document & document)1140 ChertWritableDatabase::add_document(const Xapian::Document & document)
1141 {
1142 LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document", document);
1143 // Make sure the docid counter doesn't overflow.
1144 if (stats.get_last_docid() == CHERT_MAX_DOCID)
1145 throw Xapian::DatabaseError("Run out of docids - you'll have to use copydatabase to eliminate any gaps before you can add more documents");
1146 // Use the next unused document ID.
1147 RETURN(add_document_(stats.get_next_docid(), document));
1148 }
1149
1150 Xapian::docid
add_document_(Xapian::docid did,const Xapian::Document & document)1151 ChertWritableDatabase::add_document_(Xapian::docid did,
1152 const Xapian::Document & document)
1153 {
1154 LOGCALL(DB, Xapian::docid, "ChertWritableDatabase::add_document_", did | document);
1155 Assert(did != 0);
1156 try {
1157 // Add the record using that document ID.
1158 record_table.replace_record(document.get_data(), did);
1159
1160 // Set the values.
1161 value_manager.add_document(did, document, value_stats);
1162
1163 chert_doclen_t new_doclen = 0;
1164 {
1165 Xapian::TermIterator term = document.termlist_begin();
1166 Xapian::TermIterator term_end = document.termlist_end();
1167 for ( ; term != term_end; ++term) {
1168 termcount wdf = term.get_wdf();
1169 // Calculate the new document length
1170 new_doclen += wdf;
1171 stats.check_wdf(wdf);
1172
1173 string tname = *term;
1174 if (tname.size() > MAX_SAFE_TERM_LENGTH)
1175 throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + tname);
1176 add_freq_delta(tname, 1, wdf);
1177 insert_mod_plist(did, tname, wdf);
1178
1179 PositionIterator pos = term.positionlist_begin();
1180 if (pos != term.positionlist_end()) {
1181 position_table.set_positionlist(
1182 did, tname,
1183 pos, term.positionlist_end(), false);
1184 }
1185 }
1186 }
1187 LOGLINE(DB, "Calculated doclen for new document " << did << " as " << new_doclen);
1188
1189 // Set the termlist.
1190 if (termlist_table.is_open())
1191 termlist_table.set_termlist(did, document, new_doclen);
1192
1193 // Set the new document length
1194 Assert(doclens.find(did) == doclens.end() || doclens[did] == static_cast<Xapian::termcount>(-1));
1195 doclens[did] = new_doclen;
1196 stats.add_document(new_doclen);
1197 } catch (...) {
1198 // If an error occurs while adding a document, or doing any other
1199 // transaction, the modifications so far must be cleared before
1200 // returning control to the user - otherwise partial modifications will
1201 // persist in memory, and eventually get written to disk.
1202 cancel();
1203 throw;
1204 }
1205
1206 // FIXME: this should be done by checking memory usage, not the number of
1207 // changes.
1208 // We could also look at:
1209 // * mod_plists.size()
1210 // * doclens.size()
1211 // * freq_deltas.size()
1212 //
1213 // cout << "+++ mod_plists.size() " << mod_plists.size() <<
1214 // ", doclens.size() " << doclens.size() <<
1215 // ", freq_deltas.size() " << freq_deltas.size() << endl;
1216 if (++change_count >= flush_threshold) {
1217 flush_postlist_changes();
1218 if (!transaction_active()) apply();
1219 }
1220
1221 RETURN(did);
1222 }
1223
1224 void
delete_document(Xapian::docid did)1225 ChertWritableDatabase::delete_document(Xapian::docid did)
1226 {
1227 LOGCALL_VOID(DB, "ChertWritableDatabase::delete_document", did);
1228 Assert(did != 0);
1229
1230 if (!termlist_table.is_open())
1231 throw_termlist_table_close_exception();
1232
1233 if (rare(modify_shortcut_docid == did)) {
1234 // The modify_shortcut document can't be used for a modification
1235 // shortcut now, because it's been deleted!
1236 modify_shortcut_document = NULL;
1237 modify_shortcut_docid = 0;
1238 }
1239
1240 // Remove the record. If this fails, just propagate the exception since
1241 // the state should still be consistent (most likely it's
1242 // DocNotFoundError).
1243 record_table.delete_record(did);
1244
1245 try {
1246 // Remove the values.
1247 value_manager.delete_document(did, value_stats);
1248
1249 // OK, now add entries to remove the postings in the underlying record.
1250 Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1251 ChertTermList termlist(ptrtothis, did);
1252
1253 stats.delete_document(termlist.get_doclength());
1254
1255 termlist.next();
1256 while (!termlist.at_end()) {
1257 string tname = termlist.get_termname();
1258 position_table.delete_positionlist(did, tname);
1259 termcount wdf = termlist.get_wdf();
1260
1261 add_freq_delta(tname, -1, -wdf);
1262 update_mod_plist(did, tname, 'D', 0u);
1263
1264 termlist.next();
1265 }
1266
1267 // Remove the termlist.
1268 if (termlist_table.is_open())
1269 termlist_table.delete_termlist(did);
1270
1271 // Mark this document as removed.
1272 doclens[did] = static_cast<Xapian::termcount>(-1);
1273 } catch (...) {
1274 // If an error occurs while deleting a document, or doing any other
1275 // transaction, the modifications so far must be cleared before
1276 // returning control to the user - otherwise partial modifications will
1277 // persist in memory, and eventually get written to disk.
1278 cancel();
1279 throw;
1280 }
1281
1282 if (++change_count >= flush_threshold) {
1283 flush_postlist_changes();
1284 if (!transaction_active()) apply();
1285 }
1286 }
1287
1288 void
replace_document(Xapian::docid did,const Xapian::Document & document)1289 ChertWritableDatabase::replace_document(Xapian::docid did,
1290 const Xapian::Document & document)
1291 {
1292 LOGCALL_VOID(DB, "ChertWritableDatabase::replace_document", did | document);
1293 Assert(did != 0);
1294
1295 try {
1296 if (did > stats.get_last_docid()) {
1297 stats.set_last_docid(did);
1298 // If this docid is above the highwatermark, then we can't be
1299 // replacing an existing document.
1300 (void)add_document_(did, document);
1301 return;
1302 }
1303
1304 if (!termlist_table.is_open()) {
1305 // We can replace an *unused* docid <= last_docid too.
1306 Xapian::Internal::RefCntPtr<const ChertDatabase> ptrtothis(this);
1307 if (!postlist_table.document_exists(did, ptrtothis)) {
1308 (void)add_document_(did, document);
1309 return;
1310 }
1311 throw_termlist_table_close_exception();
1312 }
1313
1314 // Check for a document read from this database being replaced - ie, a
1315 // modification operation.
1316 bool modifying = false;
1317 if (modify_shortcut_docid &&
1318 document.internal->get_docid() == modify_shortcut_docid) {
1319 if (document.internal.get() == modify_shortcut_document) {
1320 // We have a docid, it matches, and the pointer matches, so we
1321 // can skip modification of any data which hasn't been modified
1322 // in the document.
1323 if (!document.internal->modified()) {
1324 // If the document is unchanged, we've nothing to do.
1325 return;
1326 }
1327 modifying = true;
1328 LOGLINE(DB, "Detected potential document modification shortcut.");
1329 } else {
1330 // The modify_shortcut document can't be used for a
1331 // modification shortcut now, because it's about to be
1332 // modified.
1333 modify_shortcut_document = NULL;
1334 modify_shortcut_docid = 0;
1335 }
1336 }
1337
1338 if (!modifying || document.internal->terms_modified()) {
1339 bool pos_modified = !modifying ||
1340 document.internal->term_positions_modified();
1341 Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1342 ChertTermList termlist(ptrtothis, did);
1343 Xapian::TermIterator term = document.termlist_begin();
1344 chert_doclen_t old_doclen = termlist.get_doclength();
1345 stats.delete_document(old_doclen);
1346 chert_doclen_t new_doclen = old_doclen;
1347
1348 string old_tname, new_tname;
1349
1350 termlist.next();
1351 while (!termlist.at_end() || term != document.termlist_end()) {
1352 int cmp;
1353 if (termlist.at_end()) {
1354 cmp = 1;
1355 new_tname = *term;
1356 } else {
1357 old_tname = termlist.get_termname();
1358 if (term != document.termlist_end()) {
1359 new_tname = *term;
1360 cmp = old_tname.compare(new_tname);
1361 } else {
1362 cmp = -1;
1363 }
1364 }
1365
1366 if (cmp < 0) {
1367 // Term old_tname has been deleted.
1368 termcount old_wdf = termlist.get_wdf();
1369 new_doclen -= old_wdf;
1370 add_freq_delta(old_tname, -1, -old_wdf);
1371 if (pos_modified)
1372 position_table.delete_positionlist(did, old_tname);
1373 update_mod_plist(did, old_tname, 'D', 0u);
1374 termlist.next();
1375 } else if (cmp > 0) {
1376 // Term new_tname as been added.
1377 termcount new_wdf = term.get_wdf();
1378 new_doclen += new_wdf;
1379 stats.check_wdf(new_wdf);
1380 if (new_tname.size() > MAX_SAFE_TERM_LENGTH)
1381 throw Xapian::InvalidArgumentError("Term too long (> " STRINGIZE(MAX_SAFE_TERM_LENGTH) "): " + new_tname);
1382 add_freq_delta(new_tname, 1, new_wdf);
1383 update_mod_plist(did, new_tname, 'A', new_wdf);
1384 if (pos_modified) {
1385 PositionIterator pos = term.positionlist_begin();
1386 if (pos != term.positionlist_end()) {
1387 position_table.set_positionlist(
1388 did, new_tname,
1389 pos, term.positionlist_end(), false);
1390 }
1391 }
1392 ++term;
1393 } else if (cmp == 0) {
1394 // Term already exists: look for wdf and positionlist changes.
1395 termcount old_wdf = termlist.get_wdf();
1396 termcount new_wdf = term.get_wdf();
1397
1398 // Check the stats even if wdf hasn't changed, because
1399 // this is the only document, the stats will have been
1400 // zeroed.
1401 stats.check_wdf(new_wdf);
1402
1403 if (old_wdf != new_wdf) {
1404 new_doclen += new_wdf - old_wdf;
1405 add_freq_delta(new_tname, 0, new_wdf - old_wdf);
1406 update_mod_plist(did, new_tname, 'M', new_wdf);
1407 }
1408
1409 if (pos_modified) {
1410 PositionIterator pos = term.positionlist_begin();
1411 if (pos != term.positionlist_end()) {
1412 position_table.set_positionlist(did, new_tname, pos,
1413 term.positionlist_end(),
1414 true);
1415 } else {
1416 position_table.delete_positionlist(did, new_tname);
1417 }
1418 }
1419
1420 ++term;
1421 termlist.next();
1422 }
1423 }
1424 LOGLINE(DB, "Calculated doclen for replacement document " << did << " as " << new_doclen);
1425
1426 // Set the termlist.
1427 if (termlist_table.is_open())
1428 termlist_table.set_termlist(did, document, new_doclen);
1429
1430 // Set the new document length
1431 if (new_doclen != old_doclen)
1432 doclens[did] = new_doclen;
1433 stats.add_document(new_doclen);
1434 }
1435
1436 if (!modifying || document.internal->data_modified()) {
1437 // Replace the record
1438 record_table.replace_record(document.get_data(), did);
1439 }
1440
1441 if (!modifying || document.internal->values_modified()) {
1442 // Replace the values.
1443 value_manager.replace_document(did, document, value_stats);
1444 }
1445 } catch (const Xapian::DocNotFoundError &) {
1446 (void)add_document_(did, document);
1447 return;
1448 } catch (...) {
1449 // If an error occurs while replacing a document, or doing any other
1450 // transaction, the modifications so far must be cleared before
1451 // returning control to the user - otherwise partial modifications will
1452 // persist in memory, and eventually get written to disk.
1453 cancel();
1454 throw;
1455 }
1456
1457 if (++change_count >= flush_threshold) {
1458 flush_postlist_changes();
1459 if (!transaction_active()) apply();
1460 }
1461 }
1462
1463 Xapian::Document::Internal *
open_document(Xapian::docid did,bool lazy) const1464 ChertWritableDatabase::open_document(Xapian::docid did, bool lazy) const
1465 {
1466 LOGCALL(DB, Xapian::Document::Internal *, "ChertWritableDatabase::open_document", did | lazy);
1467 modify_shortcut_document = ChertDatabase::open_document(did, lazy);
1468 // Store the docid only after open_document() successfully returns, so an
1469 // attempt to open a missing document doesn't overwrite this.
1470 modify_shortcut_docid = did;
1471 RETURN(modify_shortcut_document);
1472 }
1473
1474 Xapian::termcount
get_doclength(Xapian::docid did) const1475 ChertWritableDatabase::get_doclength(Xapian::docid did) const
1476 {
1477 LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_doclength", did);
1478 map<docid, termcount>::const_iterator i = doclens.find(did);
1479 if (i != doclens.end()) {
1480 Xapian::termcount doclen = i->second;
1481 if (doclen == static_cast<Xapian::termcount>(-1)) {
1482 throw Xapian::DocNotFoundError("Document " + str(did) + " not found");
1483 }
1484 RETURN(doclen);
1485 }
1486 RETURN(ChertDatabase::get_doclength(did));
1487 }
1488
1489 Xapian::doccount
get_termfreq(const string & tname) const1490 ChertWritableDatabase::get_termfreq(const string & tname) const
1491 {
1492 LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_termfreq", tname);
1493 Xapian::doccount termfreq = ChertDatabase::get_termfreq(tname);
1494 map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1495 i = freq_deltas.find(tname);
1496 if (i != freq_deltas.end()) termfreq += i->second.first;
1497 RETURN(termfreq);
1498 }
1499
1500 Xapian::termcount
get_collection_freq(const string & tname) const1501 ChertWritableDatabase::get_collection_freq(const string & tname) const
1502 {
1503 LOGCALL(DB, Xapian::termcount, "ChertWritableDatabase::get_collection_freq", tname);
1504 Xapian::termcount collfreq = ChertDatabase::get_collection_freq(tname);
1505
1506 map<string, pair<termcount_diff, termcount_diff> >::const_iterator i;
1507 i = freq_deltas.find(tname);
1508 if (i != freq_deltas.end()) collfreq += i->second.second;
1509
1510 RETURN(collfreq);
1511 }
1512
1513 Xapian::doccount
get_value_freq(Xapian::valueno slot) const1514 ChertWritableDatabase::get_value_freq(Xapian::valueno slot) const
1515 {
1516 LOGCALL(DB, Xapian::doccount, "ChertWritableDatabase::get_value_freq", slot);
1517 map<Xapian::valueno, ValueStats>::const_iterator i;
1518 i = value_stats.find(slot);
1519 if (i != value_stats.end()) RETURN(i->second.freq);
1520 RETURN(ChertDatabase::get_value_freq(slot));
1521 }
1522
1523 std::string
get_value_lower_bound(Xapian::valueno slot) const1524 ChertWritableDatabase::get_value_lower_bound(Xapian::valueno slot) const
1525 {
1526 LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_lower_bound", slot);
1527 map<Xapian::valueno, ValueStats>::const_iterator i;
1528 i = value_stats.find(slot);
1529 if (i != value_stats.end()) RETURN(i->second.lower_bound);
1530 RETURN(ChertDatabase::get_value_lower_bound(slot));
1531 }
1532
1533 std::string
get_value_upper_bound(Xapian::valueno slot) const1534 ChertWritableDatabase::get_value_upper_bound(Xapian::valueno slot) const
1535 {
1536 LOGCALL(DB, std::string, "ChertWritableDatabase::get_value_upper_bound", slot);
1537 map<Xapian::valueno, ValueStats>::const_iterator i;
1538 i = value_stats.find(slot);
1539 if (i != value_stats.end()) RETURN(i->second.upper_bound);
1540 RETURN(ChertDatabase::get_value_upper_bound(slot));
1541 }
1542
1543 bool
term_exists(const string & tname) const1544 ChertWritableDatabase::term_exists(const string & tname) const
1545 {
1546 LOGCALL(DB, bool, "ChertWritableDatabase::term_exists", tname);
1547 RETURN(get_termfreq(tname) != 0);
1548 }
1549
1550 LeafPostList *
open_post_list(const string & tname) const1551 ChertWritableDatabase::open_post_list(const string& tname) const
1552 {
1553 LOGCALL(DB, LeafPostList *, "ChertWritableDatabase::open_post_list", tname);
1554 Xapian::Internal::RefCntPtr<const ChertWritableDatabase> ptrtothis(this);
1555
1556 if (tname.empty()) {
1557 Xapian::doccount doccount = get_doccount();
1558 if (stats.get_last_docid() == doccount) {
1559 RETURN(new ContiguousAllDocsPostList(ptrtothis, doccount));
1560 }
1561 if (doclens.empty()) {
1562 RETURN(new ChertAllDocsPostList(ptrtothis, doccount));
1563 }
1564 RETURN(new ChertAllDocsModifiedPostList(ptrtothis, doccount, doclens));
1565 }
1566
1567 map<string, map<docid, pair<char, termcount> > >::const_iterator j;
1568 j = mod_plists.find(tname);
1569 if (j != mod_plists.end()) {
1570 // We've got buffered changes to this term's postlist, so we need to
1571 // use a ChertModifiedPostList.
1572 RETURN(new ChertModifiedPostList(ptrtothis, tname, j->second));
1573 }
1574
1575 RETURN(new ChertPostList(ptrtothis, tname, true));
1576 }
1577
1578 ValueList *
open_value_list(Xapian::valueno slot) const1579 ChertWritableDatabase::open_value_list(Xapian::valueno slot) const
1580 {
1581 LOGCALL(DB, ValueList *, "ChertWritableDatabase::open_value_list", slot);
1582 // If there are changes, we don't have code to iterate the modified value
1583 // list so we need to flush (but don't commit - there may be a transaction
1584 // in progress).
1585 if (change_count) value_manager.merge_changes();
1586 RETURN(ChertDatabase::open_value_list(slot));
1587 }
1588
1589 TermList *
open_allterms(const string & prefix) const1590 ChertWritableDatabase::open_allterms(const string & prefix) const
1591 {
1592 LOGCALL(DB, TermList *, "ChertWritableDatabase::open_allterms", NO_ARGS);
1593 // If there are changes, terms may have been added or removed, and so we
1594 // need to flush (but don't commit - there may be a transaction in
1595 // progress).
1596 if (change_count) flush_postlist_changes();
1597 RETURN(ChertDatabase::open_allterms(prefix));
1598 }
1599
1600 void
cancel()1601 ChertWritableDatabase::cancel()
1602 {
1603 ChertDatabase::cancel();
1604 stats.read(postlist_table);
1605 freq_deltas.clear();
1606 doclens.clear();
1607 mod_plists.clear();
1608 value_stats.clear();
1609 change_count = 0;
1610 }
1611
1612 void
add_spelling(const string & word,Xapian::termcount freqinc) const1613 ChertWritableDatabase::add_spelling(const string & word,
1614 Xapian::termcount freqinc) const
1615 {
1616 spelling_table.add_word(word, freqinc);
1617 }
1618
1619 void
remove_spelling(const string & word,Xapian::termcount freqdec) const1620 ChertWritableDatabase::remove_spelling(const string & word,
1621 Xapian::termcount freqdec) const
1622 {
1623 spelling_table.remove_word(word, freqdec);
1624 }
1625
1626 TermList *
open_spelling_wordlist() const1627 ChertWritableDatabase::open_spelling_wordlist() const
1628 {
1629 spelling_table.merge_changes();
1630 return ChertDatabase::open_spelling_wordlist();
1631 }
1632
1633 TermList *
open_synonym_keylist(const string & prefix) const1634 ChertWritableDatabase::open_synonym_keylist(const string & prefix) const
1635 {
1636 synonym_table.merge_changes();
1637 return ChertDatabase::open_synonym_keylist(prefix);
1638 }
1639
1640 void
add_synonym(const string & term,const string & synonym) const1641 ChertWritableDatabase::add_synonym(const string & term,
1642 const string & synonym) const
1643 {
1644 synonym_table.add_synonym(term, synonym);
1645 }
1646
1647 void
remove_synonym(const string & term,const string & synonym) const1648 ChertWritableDatabase::remove_synonym(const string & term,
1649 const string & synonym) const
1650 {
1651 synonym_table.remove_synonym(term, synonym);
1652 }
1653
1654 void
clear_synonyms(const string & term) const1655 ChertWritableDatabase::clear_synonyms(const string & term) const
1656 {
1657 synonym_table.clear_synonyms(term);
1658 }
1659
1660 void
set_metadata(const string & key,const string & value)1661 ChertWritableDatabase::set_metadata(const string & key, const string & value)
1662 {
1663 LOGCALL(DB, string, "ChertWritableDatabase::set_metadata", key | value);
1664 string btree_key("\x00\xc0", 2);
1665 btree_key += key;
1666 if (value.empty()) {
1667 postlist_table.del(btree_key);
1668 } else {
1669 postlist_table.add(btree_key, value);
1670 }
1671 }
1672
1673 void
invalidate_doc_object(Xapian::Document::Internal * obj) const1674 ChertWritableDatabase::invalidate_doc_object(Xapian::Document::Internal * obj) const
1675 {
1676 if (obj == modify_shortcut_document) {
1677 modify_shortcut_document = NULL;
1678 modify_shortcut_docid = 0;
1679 }
1680 }
1681