1 /* flint_postlist.cc: Postlists in a flint database
2  *
3  * Copyright 1999,2000,2001 BrightStation PLC
4  * Copyright 2002,2003,2004,2005,2007,2008,2009 Olly Betts
5  * Copyright 2007,2009 Lemur Consulting Ltd
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License as
9  * published by the Free Software Foundation; either version 2 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
20  * USA
21  */
22 
23 #include <config.h>
24 #include "flint_postlist.h"
25 
26 #include "flint_cursor.h"
27 #include "flint_database.h"
28 #include "flint_utils.h"
29 #include "debuglog.h"
30 #include "noreturn.h"
31 #include "str.h"
32 
33 Xapian::doccount
get_termfreq(const string & term) const34 FlintPostListTable::get_termfreq(const string & term) const
35 {
36     string key = make_key(term);
37     string tag;
38     if (!get_exact_entry(key, tag)) return 0;
39 
40     Xapian::doccount termfreq;
41     const char * p = tag.data();
42     FlintPostList::read_number_of_entries(&p, p + tag.size(), &termfreq, NULL);
43     return termfreq;
44 }
45 
46 Xapian::termcount
get_collection_freq(const string & term) const47 FlintPostListTable::get_collection_freq(const string & term) const
48 {
49     string key = make_key(term);
50     string tag;
51     if (!get_exact_entry(key, tag)) return 0;
52 
53     Xapian::termcount collfreq;
54     const char * p = tag.data();
55     FlintPostList::read_number_of_entries(&p, p + tag.size(), NULL, &collfreq);
56     return collfreq;
57 }
58 
59 // How big should chunks in the posting list be?  (They
60 // will grow slightly bigger than this, but not more than a
61 // few bytes extra) - FIXME: tune this value to try to
62 // maximise how well blocks are used.  Or performance.
63 // Or indexing speed.  Or something...
64 const unsigned int CHUNKSIZE = 2000;
65 
66 /** FlintPostlistChunkWriter is a wrapper which acts roughly as an
67  *  output iterator on a postlist chunk, taking care of the
68  *  messy details.  It's intended to be used with deletion and
69  *  replacing of entries, not for adding to the end, when it's
70  *  not really needed.
71  */
72 class FlintPostlistChunkWriter {
73     public:
74 	FlintPostlistChunkWriter(const string &orig_key_,
75 			    bool is_first_chunk_,
76 			    const string &tname_,
77 			    bool is_last_chunk_);
78 
79 	/// Append an entry to this chunk.
80 	void append(FlintTable * table, Xapian::docid did,
81 		    Xapian::termcount wdf, flint_doclen_t doclen);
82 
83 	/// Append a block of raw entries to this chunk.
raw_append(Xapian::docid first_did_,Xapian::docid current_did_,const string & s)84 	void raw_append(Xapian::docid first_did_, Xapian::docid current_did_,
85 			const string & s) {
86 	    Assert(!started);
87 	    first_did = first_did_;
88 	    current_did = current_did_;
89 	    if (!s.empty()) {
90 		chunk.append(s);
91 		started = true;
92 	    }
93 	}
94 
95 	/** Flush the chunk to the buffered table.  Note: this may write it
96 	 *  with a different key to the original one, if for example the first
97 	 *  entry has changed.
98 	 */
99 	void flush(FlintTable *table);
100 
101     private:
102 	string orig_key;
103 	string tname;
104 	bool is_first_chunk;
105 	bool is_last_chunk;
106 	bool started;
107 
108 	Xapian::docid first_did;
109 	Xapian::docid current_did;
110 
111 	string chunk;
112 };
113 
114 // Static functions
115 
116 /// Report an error when reading the posting list.
117 XAPIAN_NORETURN(static void report_read_error(const char * position));
report_read_error(const char * position)118 static void report_read_error(const char * position)
119 {
120     if (position == 0) {
121 	// data ran out
122 	throw Xapian::DatabaseCorruptError("Data ran out unexpectedly when reading posting list.");
123     }
124     // overflow
125     throw Xapian::RangeError("Value in posting list too large.");
126 }
127 
get_tname_from_key(const char ** src,const char * end,string & tname)128 static inline bool get_tname_from_key(const char **src, const char *end,
129 			       string &tname)
130 {
131     return F_unpack_string_preserving_sort(src, end, tname);
132 }
133 
134 static inline bool
check_tname_in_key_lite(const char ** keypos,const char * keyend,const string & tname)135 check_tname_in_key_lite(const char **keypos, const char *keyend, const string &tname)
136 {
137     string tname_in_key;
138 
139     // Read the termname.
140     if (!get_tname_from_key(keypos, keyend, tname_in_key)) {
141 	report_read_error(*keypos);
142     }
143 
144     // This should only fail if the postlist doesn't exist at all.
145     return tname_in_key == tname;
146 }
147 
148 static inline bool
check_tname_in_key(const char ** keypos,const char * keyend,const string & tname)149 check_tname_in_key(const char **keypos, const char *keyend, const string &tname)
150 {
151     if (*keypos == keyend) return false;
152 
153     return check_tname_in_key_lite(keypos, keyend, tname);
154 }
155 
156 /// Read the start of the first chunk in the posting list.
157 static Xapian::docid
read_start_of_first_chunk(const char ** posptr,const char * end,Xapian::doccount * number_of_entries_ptr,Xapian::termcount * collection_freq_ptr)158 read_start_of_first_chunk(const char ** posptr,
159 			  const char * end,
160 			  Xapian::doccount * number_of_entries_ptr,
161 			  Xapian::termcount * collection_freq_ptr)
162 {
163     LOGCALL_STATIC(DB, Xapian::docid, "read_start_of_first_chunk", (const void *)posptr | (const void *)end | (void *)number_of_entries_ptr | (void *)collection_freq_ptr);
164 
165     FlintPostList::read_number_of_entries(posptr, end,
166 			   number_of_entries_ptr, collection_freq_ptr);
167     if (number_of_entries_ptr)
168 	LOGVALUE(DB, *number_of_entries_ptr);
169     if (collection_freq_ptr)
170 	LOGVALUE(DB, *collection_freq_ptr);
171 
172     Xapian::docid did;
173     // Read the docid of the first entry in the posting list.
174     if (!F_unpack_uint(posptr, end, &did))
175 	report_read_error(*posptr);
176     ++did;
177     LOGVALUE(DB, did);
178     RETURN(did);
179 }
180 
read_did_increase(const char ** posptr,const char * end,Xapian::docid * did_ptr)181 static inline void read_did_increase(const char ** posptr,
182 			      const char * end,
183 			      Xapian::docid * did_ptr)
184 {
185     Xapian::docid did_increase;
186     if (!F_unpack_uint(posptr, end, &did_increase)) report_read_error(*posptr);
187     *did_ptr += did_increase + 1;
188 }
189 
190 /// Read the wdf and the document length of an item.
read_wdf_and_length(const char ** posptr,const char * end,Xapian::termcount * wdf_ptr,flint_doclen_t * doclength_ptr)191 static inline void read_wdf_and_length(const char ** posptr,
192 				const char * end,
193 				Xapian::termcount * wdf_ptr,
194 				flint_doclen_t * doclength_ptr)
195 {
196     if (!F_unpack_uint(posptr, end, wdf_ptr)) report_read_error(*posptr);
197     if (!F_unpack_uint(posptr, end, doclength_ptr)) report_read_error(*posptr);
198 }
199 
200 /// Read the start of a chunk.
201 static Xapian::docid
read_start_of_chunk(const char ** posptr,const char * end,Xapian::docid first_did_in_chunk,bool * is_last_chunk_ptr)202 read_start_of_chunk(const char ** posptr,
203 		    const char * end,
204 		    Xapian::docid first_did_in_chunk,
205 		    bool * is_last_chunk_ptr)
206 {
207     LOGCALL_STATIC(DB, Xapian::docid, "read_start_of_chunk", reinterpret_cast<const void*>(posptr) | reinterpret_cast<const void*>(end) | first_did_in_chunk | reinterpret_cast<const void*>(is_last_chunk_ptr));
208 
209     // Read whether this is the last chunk
210     if (!F_unpack_bool(posptr, end, is_last_chunk_ptr))
211 	report_read_error(*posptr);
212     if (is_last_chunk_ptr)
213 	LOGVALUE(DB, *is_last_chunk_ptr);
214 
215     // Read what the final document ID in this chunk is.
216     Xapian::docid increase_to_last;
217     if (!F_unpack_uint(posptr, end, &increase_to_last))
218 	report_read_error(*posptr);
219     ++increase_to_last;
220     Xapian::docid last_did_in_chunk = first_did_in_chunk + increase_to_last;
221     LOGVALUE(DB, last_did_in_chunk);
222     RETURN(last_did_in_chunk);
223 }
224 
make_wdf_and_length(Xapian::termcount wdf,flint_doclen_t doclength)225 static string make_wdf_and_length(Xapian::termcount wdf, flint_doclen_t doclength)
226 {
227     return F_pack_uint(wdf) + F_pack_uint(doclength);
228 }
229 
write_start_of_chunk(string & chunk,unsigned int start_of_chunk_header,unsigned int end_of_chunk_header,bool is_last_chunk,Xapian::docid first_did_in_chunk,Xapian::docid last_did_in_chunk)230 static void write_start_of_chunk(string & chunk,
231 				 unsigned int start_of_chunk_header,
232 				 unsigned int end_of_chunk_header,
233 				 bool is_last_chunk,
234 				 Xapian::docid first_did_in_chunk,
235 				 Xapian::docid last_did_in_chunk)
236 {
237     Assert((size_t)(end_of_chunk_header - start_of_chunk_header) <= chunk.size());
238     Assert(last_did_in_chunk >= first_did_in_chunk);
239     Xapian::docid increase_to_last = last_did_in_chunk - first_did_in_chunk;
240 
241     chunk.replace(start_of_chunk_header,
242 		  end_of_chunk_header - start_of_chunk_header,
243 		  F_pack_bool(is_last_chunk) + F_pack_uint(increase_to_last - 1));
244     // FIXME - storing increase_to_last - 1 is bogus as this value is
245     // -1 when a postlist chunk has a single entry!  Luckily the code
246     // works despite this, but it's ugly.
247 }
248 
249 /** FlintPostlistChunkReader is essentially an iterator wrapper
250  *  around a postlist chunk.  It simply iterates through the
251  *  entries in a postlist.
252  */
253 class FlintPostlistChunkReader {
254     public:
255 	/** Initialise the postlist chunk reader.
256 	 *
257 	 *  @param first_did  First document id in this chunk.
258 	 *  @param data       The tag string with the header removed.
259 	 */
FlintPostlistChunkReader(Xapian::docid first_did,const string & data_)260 	FlintPostlistChunkReader(Xapian::docid first_did, const string & data_)
261 	    : data(data_), pos(data.data()), end(pos + data.length()), at_end(data.empty()), did(first_did)
262 	{
263 	    if (!at_end) read_wdf_and_length(&pos, end, &wdf, &doclength);
264 	}
265 
get_docid() const266 	Xapian::docid get_docid() const {
267 	    return did;
268 	}
get_wdf() const269 	Xapian::termcount get_wdf() const {
270 	    return wdf;
271 	}
get_doclength() const272 	flint_doclen_t get_doclength() const {
273 	    LOGCALL(DB, flint_doclen_t, "FlintPostlistChunkReader::get_doclength", NO_ARGS);
274 	    RETURN(doclength);
275 	}
276 
is_at_end() const277 	bool is_at_end() const {
278 	    return at_end;
279 	}
280 
281 	/** Advance to the next entry.  Set at_end if we run off the end.
282 	 */
283 	void next();
284 
285     private:
286 	string data;
287 
288 	const char *pos;
289 	const char *end;
290 
291 	bool at_end;
292 
293 	Xapian::docid did;
294 	Xapian::termcount wdf;
295 	flint_doclen_t doclength;
296 };
297 
298 void
next()299 FlintPostlistChunkReader::next()
300 {
301     if (pos == end) {
302 	at_end = true;
303     } else {
304 	read_did_increase(&pos, end, &did);
305 	read_wdf_and_length(&pos, end, &wdf, &doclength);
306     }
307 }
308 
FlintPostlistChunkWriter(const string & orig_key_,bool is_first_chunk_,const string & tname_,bool is_last_chunk_)309 FlintPostlistChunkWriter::FlintPostlistChunkWriter(const string &orig_key_,
310 					 bool is_first_chunk_,
311 					 const string &tname_,
312 					 bool is_last_chunk_)
313 	: orig_key(orig_key_),
314 	  tname(tname_), is_first_chunk(is_first_chunk_),
315 	  is_last_chunk(is_last_chunk_),
316 	  started(false)
317 {
318     LOGCALL_VOID(DB, "FlintPostlistChunkWriter::FlintPostlistChunkWriter", orig_key_ | is_first_chunk_ | tname_ | is_last_chunk_);
319 }
320 
321 void
append(FlintTable * table,Xapian::docid did,Xapian::termcount wdf,flint_doclen_t doclen)322 FlintPostlistChunkWriter::append(FlintTable * table, Xapian::docid did,
323 			    Xapian::termcount wdf, flint_doclen_t doclen)
324 {
325     if (!started) {
326 	started = true;
327 	first_did = did;
328     } else {
329 	Assert(did > current_did);
330 	// Start a new chunk if this one has grown to the threshold.
331 	if (chunk.size() >= CHUNKSIZE) {
332 	    bool save_is_last_chunk = is_last_chunk;
333 	    is_last_chunk = false;
334 	    flush(table);
335 	    is_last_chunk = save_is_last_chunk;
336 	    is_first_chunk = false;
337 	    first_did = did;
338 	    chunk.resize(0);
339 	    orig_key = FlintPostListTable::make_key(tname, first_did);
340 	} else {
341 	    chunk.append(F_pack_uint(did - current_did - 1));
342 	}
343     }
344     current_did = did;
345     chunk.append(make_wdf_and_length(wdf, doclen));
346 }
347 
348 /** Make the data to go at the start of the very first chunk.
349  */
350 static inline string
make_start_of_first_chunk(Xapian::doccount entries,Xapian::termcount collectionfreq,Xapian::docid new_did)351 make_start_of_first_chunk(Xapian::doccount entries,
352 			  Xapian::termcount collectionfreq,
353 			  Xapian::docid new_did)
354 {
355     return F_pack_uint(entries) + F_pack_uint(collectionfreq) + F_pack_uint(new_did - 1);
356 }
357 
358 /** Make the data to go at the start of a standard chunk.
359  */
360 static inline string
make_start_of_chunk(bool new_is_last_chunk,Xapian::docid new_first_did,Xapian::docid new_final_did)361 make_start_of_chunk(bool new_is_last_chunk,
362 		    Xapian::docid new_first_did,
363 		    Xapian::docid new_final_did)
364 {
365     Assert(new_final_did >= new_first_did);
366     return F_pack_bool(new_is_last_chunk) +
367 	    F_pack_uint(new_final_did - new_first_did - 1);
368 }
369 
370 void
flush(FlintTable * table)371 FlintPostlistChunkWriter::flush(FlintTable *table)
372 {
373     LOGCALL_VOID(DB, "FlintPostlistChunkWriter::flush", table);
374 
375     /* This is one of the more messy parts involved with updating posting
376      * list chunks.
377      *
378      * Depending on circumstances, we may have to delete an entire chunk
379      * or file it under a different key, as well as possibly modifying both
380      * the previous and next chunk of the postlist.
381      */
382 
383     if (!started) {
384 	/* This chunk is now empty so disappears entirely.
385 	 *
386 	 * If this was the last chunk, then the previous chunk
387 	 * must have its "is_last_chunk" flag updated.
388 	 *
389 	 * If this was the first chunk, then the next chunk must
390 	 * be transformed into the first chunk.  Messy!
391 	 */
392 	LOGLINE(DB, "FlintPostlistChunkWriter::flush(): deleting chunk");
393 	Assert(!orig_key.empty());
394 	if (is_first_chunk) {
395 	    LOGLINE(DB, "FlintPostlistChunkWriter::flush(): deleting first chunk");
396 	    if (is_last_chunk) {
397 		/* This is the first and the last chunk, ie the only
398 		 * chunk, so just delete the tag.
399 		 */
400 		table->del(orig_key);
401 		return;
402 	    }
403 
404 	    /* This is the messiest case.  The first chunk is to
405 	     * be removed, and there is at least one chunk after
406 	     * it.  Need to rewrite the next chunk as the first
407 	     * chunk.
408 	     */
409 	    AutoPtr<FlintCursor> cursor(table->cursor_get());
410 
411 	    if (!cursor->find_entry(orig_key)) {
412 		throw Xapian::DatabaseCorruptError("The key we're working on has disappeared");
413 	    }
414 
415 	    // Extract existing counts from the first chunk so we can reinsert
416 	    // them into the block we're renaming.
417 	    Xapian::doccount num_ent;
418 	    Xapian::termcount coll_freq;
419 	    {
420 		cursor->read_tag();
421 		const char *tagpos = cursor->current_tag.data();
422 		const char *tagend = tagpos + cursor->current_tag.size();
423 
424 		(void)read_start_of_first_chunk(&tagpos, tagend,
425 						&num_ent, &coll_freq);
426 	    }
427 
428 	    // Seek to the next chunk.
429 	    cursor->next();
430 	    if (cursor->after_end()) {
431 		throw Xapian::DatabaseCorruptError("Expected another key but found none");
432 	    }
433 	    const char *kpos = cursor->current_key.data();
434 	    const char *kend = kpos + cursor->current_key.size();
435 	    if (!check_tname_in_key(&kpos, kend, tname)) {
436 		throw Xapian::DatabaseCorruptError("Expected another key with the same term name but found a different one");
437 	    }
438 
439 	    // Read the new first docid
440 	    Xapian::docid new_first_did;
441 	    if (!F_unpack_uint_preserving_sort(&kpos, kend, &new_first_did)) {
442 		report_read_error(kpos);
443 	    }
444 
445 	    cursor->read_tag();
446 	    const char *tagpos = cursor->current_tag.data();
447 	    const char *tagend = tagpos + cursor->current_tag.size();
448 
449 	    // Read the chunk header
450 	    bool new_is_last_chunk;
451 	    Xapian::docid new_last_did_in_chunk =
452 		read_start_of_chunk(&tagpos, tagend, new_first_did,
453 				    &new_is_last_chunk);
454 
455 	    string chunk_data(tagpos, tagend);
456 
457 	    // First remove the renamed tag
458 	    table->del(cursor->current_key);
459 
460 	    // And now write it as the first chunk
461 	    string tag;
462 	    tag = make_start_of_first_chunk(num_ent, coll_freq, new_first_did);
463 	    tag += make_start_of_chunk(new_is_last_chunk,
464 					      new_first_did,
465 					      new_last_did_in_chunk);
466 	    tag += chunk_data;
467 	    table->add(orig_key, tag);
468 	    return;
469 	}
470 
471 	LOGLINE(DB, "FlintPostlistChunkWriter::flush(): deleting secondary chunk");
472 	/* This isn't the first chunk.  Check whether we're the last
473 	 * chunk.
474 	 */
475 
476 	// Delete this chunk
477 	table->del(orig_key);
478 
479 	if (is_last_chunk) {
480 	    LOGLINE(DB, "FlintPostlistChunkWriter::flush(): deleting secondary last chunk");
481 	    // Update the previous chunk's is_last_chunk flag.
482 	    AutoPtr<FlintCursor> cursor(table->cursor_get());
483 
484 	    /* Should not find the key we just deleted, but should
485 	     * find the previous chunk. */
486 	    if (cursor->find_entry(orig_key)) {
487 		throw Xapian::DatabaseCorruptError("Flint key not deleted as we expected");
488 	    }
489 	    // Make sure this is a chunk with the right term attached.
490 	    const char * keypos = cursor->current_key.data();
491 	    const char * keyend = keypos + cursor->current_key.size();
492 	    if (!check_tname_in_key(&keypos, keyend, tname)) {
493 		throw Xapian::DatabaseCorruptError("Couldn't find chunk before delete chunk");
494 	    }
495 
496 	    bool is_prev_first_chunk = (keypos == keyend);
497 
498 	    // Now update the last_chunk
499 	    cursor->read_tag();
500 	    string tag = cursor->current_tag;
501 
502 	    const char *tagpos = tag.data();
503 	    const char *tagend = tagpos + tag.size();
504 
505 	    // Skip first chunk header
506 	    Xapian::docid first_did_in_chunk;
507 	    if (is_prev_first_chunk) {
508 		first_did_in_chunk = read_start_of_first_chunk(&tagpos, tagend,
509 							       0, 0);
510 	    } else {
511 		if (!F_unpack_uint_preserving_sort(&keypos, keyend,
512 						   &first_did_in_chunk))
513 		    report_read_error(keypos);
514 	    }
515 	    bool wrong_is_last_chunk;
516 	    string::size_type start_of_chunk_header = tagpos - tag.data();
517 	    Xapian::docid last_did_in_chunk =
518 		read_start_of_chunk(&tagpos, tagend, first_did_in_chunk,
519 				    &wrong_is_last_chunk);
520 	    string::size_type end_of_chunk_header = tagpos - tag.data();
521 
522 	    // write new is_last flag
523 	    write_start_of_chunk(tag,
524 				 start_of_chunk_header,
525 				 end_of_chunk_header,
526 				 true, // is_last_chunk
527 				 first_did_in_chunk,
528 				 last_did_in_chunk);
529 	    table->add(cursor->current_key, tag);
530 	}
531     } else {
532 	LOGLINE(DB, "FlintPostlistChunkWriter::flush(): updating chunk which still has items in it");
533 	/* The chunk still has some items in it.  Two major subcases:
534 	 * a) This is the first chunk.
535 	 * b) This isn't the first chunk.
536 	 *
537 	 * The subcases just affect the chunk header.
538 	 */
539 	string tag;
540 
541 	/* First write the header, which depends on whether this is the
542 	 * first chunk.
543 	 */
544 	if (is_first_chunk) {
545 	    /* The first chunk.  This is the relatively easy case,
546 	     * and we just have to write this one back to disk.
547 	     */
548 	    LOGLINE(DB, "FlintPostlistChunkWriter::flush(): rewriting the first chunk, which still has items in it");
549 	    string key = FlintPostListTable::make_key(tname);
550 	    bool ok = table->get_exact_entry(key, tag);
551 	    (void)ok;
552 	    Assert(ok);
553 	    Assert(!tag.empty());
554 
555 	    Xapian::doccount num_ent;
556 	    Xapian::termcount coll_freq;
557 	    {
558 		const char * tagpos = tag.data();
559 		const char * tagend = tagpos + tag.size();
560 		(void)read_start_of_first_chunk(&tagpos, tagend,
561 						&num_ent, &coll_freq);
562 	    }
563 
564 	    tag = make_start_of_first_chunk(num_ent, coll_freq, first_did);
565 
566 	    tag += make_start_of_chunk(is_last_chunk, first_did, current_did);
567 	    tag += chunk;
568 	    table->add(key, tag);
569 	    return;
570 	}
571 
572 	LOGLINE(DB, "FlintPostlistChunkWriter::flush(): updating secondary chunk which still has items in it");
573 	/* Not the first chunk.
574 	 *
575 	 * This has the easy sub-sub-case:
576 	 *   The first entry in the chunk hasn't changed
577 	 * ...and the hard sub-sub-case:
578 	 *   The first entry in the chunk has changed.  This is
579 	 *   harder because the key for the chunk changes, so
580 	 *   we've got to do a switch.
581 	 */
582 
583 	// First find out the initial docid
584 	const char *keypos = orig_key.data();
585 	const char *keyend = keypos + orig_key.size();
586 	if (!check_tname_in_key(&keypos, keyend, tname)) {
587 	    throw Xapian::DatabaseCorruptError("Have invalid key writing to postlist");
588 	}
589 	Xapian::docid initial_did;
590 	if (!F_unpack_uint_preserving_sort(&keypos, keyend, &initial_did)) {
591 	    report_read_error(keypos);
592 	}
593 	string new_key;
594 	if (initial_did != first_did) {
595 	    /* The fiddlier case:
596 	     * Create a new tag with the correct key, and replace
597 	     * the old one.
598 	     */
599 	    new_key = FlintPostListTable::make_key(tname, first_did);
600 	    table->del(orig_key);
601 	} else {
602 	    new_key = orig_key;
603 	}
604 
605 	// ...and write the start of this chunk.
606 	tag = make_start_of_chunk(is_last_chunk, first_did, current_did);
607 
608 	tag += chunk;
609 	table->add(new_key, tag);
610     }
611 }
612 
613 /** Read the number of entries in the posting list.
614  *  This must only be called when *posptr is pointing to the start of
615  *  the first chunk of the posting list.
616  */
read_number_of_entries(const char ** posptr,const char * end,Xapian::doccount * number_of_entries_ptr,Xapian::termcount * collection_freq_ptr)617 void FlintPostList::read_number_of_entries(const char ** posptr,
618 				   const char * end,
619 				   Xapian::doccount * number_of_entries_ptr,
620 				   Xapian::termcount * collection_freq_ptr)
621 {
622     if (!F_unpack_uint(posptr, end, number_of_entries_ptr))
623 	report_read_error(*posptr);
624     if (!F_unpack_uint(posptr, end, collection_freq_ptr))
625 	report_read_error(*posptr);
626 }
627 
628 /** The format of a postlist is:
629  *
630  *  Split into chunks.  Key for first chunk is the termname (encoded as
631  *  length : name).  Key for subsequent chunks is the same, followed by the
632  *  document ID of the first document in the chunk (encoded as length of
633  *  representation in first byte, and then docid).
634  *
635  *  A chunk (except for the first chunk) contains:
636  *
637  *  1)  bool - true if this is the last chunk.
638  *  2)  difference between final docid in chunk and first docid.
639  *  3)  wdf, then doclength of first item.
640  *  4)  increment in docid to next item, followed by wdf and doclength of item
641  *  5)  (4) repeatedly.
642  *
643  *  The first chunk begins with the number of entries, the collection
644  *  frequency, then the docid of the first document, then has the header of a
645  *  standard chunk.
646  */
FlintPostList(Xapian::Internal::RefCntPtr<const FlintDatabase> this_db_,const string & term_)647 FlintPostList::FlintPostList(Xapian::Internal::RefCntPtr<const FlintDatabase> this_db_,
648 			     const string & term_)
649 	: LeafPostList(term_),
650 	  this_db(this_db_),
651 	  have_started(false),
652 	  cursor(this_db->postlist_table.cursor_get()),
653 	  is_at_end(false)
654 {
655     LOGCALL_VOID(DB, "FlintPostList::FlintPostList", this_db_.get() | term_);
656     string key = FlintPostListTable::make_key(term);
657     int found = cursor->find_entry(key);
658     if (!found) {
659 	number_of_entries = 0;
660 	is_at_end = true;
661 	pos = 0;
662 	end = 0;
663 	first_did_in_chunk = 0;
664 	last_did_in_chunk = 0;
665 	return;
666     }
667     cursor->read_tag();
668     pos = cursor->current_tag.data();
669     end = pos + cursor->current_tag.size();
670 
671     did = read_start_of_first_chunk(&pos, end, &number_of_entries, NULL);
672     first_did_in_chunk = did;
673     last_did_in_chunk = read_start_of_chunk(&pos, end, first_did_in_chunk,
674 					    &is_last_chunk);
675     read_wdf_and_length(&pos, end, &wdf, &doclength);
676 }
677 
~FlintPostList()678 FlintPostList::~FlintPostList()
679 {
680     LOGCALL_VOID(DB, "FlintPostList::~FlintPostList", NO_ARGS);
681 }
682 
683 bool
next_in_chunk()684 FlintPostList::next_in_chunk()
685 {
686     LOGCALL(DB, bool, "FlintPostList::next_in_chunk", NO_ARGS);
687     if (pos == end) RETURN(false);
688 
689     read_did_increase(&pos, end, &did);
690     read_wdf_and_length(&pos, end, &wdf, &doclength);
691 
692     // Either not at last doc in chunk, or pos == end, but not both.
693     Assert(did <= last_did_in_chunk);
694     Assert(did < last_did_in_chunk || pos == end);
695     Assert(pos != end || did == last_did_in_chunk);
696 
697     RETURN(true);
698 }
699 
700 void
next_chunk()701 FlintPostList::next_chunk()
702 {
703     LOGCALL_VOID(DB, "FlintPostList::next_chunk", NO_ARGS);
704     if (is_last_chunk) {
705 	is_at_end = true;
706 	return;
707     }
708 
709     cursor->next();
710     if (cursor->after_end()) {
711 	is_at_end = true;
712 	throw Xapian::DatabaseCorruptError("Unexpected end of posting list for `" +
713 				     term + "'");
714     }
715     const char * keypos = cursor->current_key.data();
716     const char * keyend = keypos + cursor->current_key.size();
717     // Check we're still in same postlist
718     if (!check_tname_in_key_lite(&keypos, keyend, term)) {
719 	is_at_end = true;
720 	throw Xapian::DatabaseCorruptError("Unexpected end of posting list for `" +
721 				     term + "'");
722     }
723 
724     Xapian::docid newdid;
725     if (!F_unpack_uint_preserving_sort(&keypos, keyend, &newdid)) {
726 	report_read_error(keypos);
727     }
728     if (newdid <= did) {
729 	throw Xapian::DatabaseCorruptError("Document ID in new chunk of postlist (" +
730 		str(newdid) +
731 		") is not greater than final document ID in previous chunk (" +
732 		str(did) + ")");
733     }
734     did = newdid;
735 
736     cursor->read_tag();
737     pos = cursor->current_tag.data();
738     end = pos + cursor->current_tag.size();
739 
740     first_did_in_chunk = did;
741     last_did_in_chunk = read_start_of_chunk(&pos, end, first_did_in_chunk,
742 					    &is_last_chunk);
743     read_wdf_and_length(&pos, end, &wdf, &doclength);
744 }
745 
746 Xapian::termcount
get_doclength() const747 FlintPostList::get_doclength() const
748 {
749     LOGCALL(DB, Xapian::termcount, "FlintPostList::get_doclength", NO_ARGS);
750     Assert(have_started);
751     RETURN(static_cast<Xapian::termcount>(doclength));
752 }
753 
754 PositionList *
read_position_list()755 FlintPostList::read_position_list()
756 {
757     LOGCALL(DB, PositionList *, "FlintPostList::read_position_list", NO_ARGS);
758     positionlist.read_data(&this_db->position_table, did, term);
759     RETURN(&positionlist);
760 }
761 
762 PositionList *
open_position_list() const763 FlintPostList::open_position_list() const
764 {
765     LOGCALL(DB, PositionList *, "FlintPostList::open_position_list", NO_ARGS);
766     RETURN(new FlintPositionList(&this_db->position_table, did, term));
767 }
768 
769 PostList *
next(Xapian::weight w_min)770 FlintPostList::next(Xapian::weight w_min)
771 {
772     LOGCALL(DB, PostList *, "FlintPostList::next", w_min);
773     (void)w_min; // no warning
774 
775     if (!have_started) {
776 	have_started = true;
777     } else {
778 	if (!next_in_chunk()) next_chunk();
779     }
780 
781     if (is_at_end) {
782 	LOGLINE(DB, "Moved to end");
783     } else {
784 	LOGLINE(DB, "Moved to docid " << did << ", wdf = " << wdf <<
785 	  	", doclength = " << doclength);
786     }
787 
788     RETURN(NULL);
789 }
790 
791 bool
current_chunk_contains(Xapian::docid desired_did)792 FlintPostList::current_chunk_contains(Xapian::docid desired_did)
793 {
794     LOGCALL(DB, bool, "FlintPostList::current_chunk_contains", desired_did);
795     if (desired_did >= first_did_in_chunk &&
796 	desired_did <= last_did_in_chunk) {
797 	RETURN(true);
798     }
799     RETURN(false);
800 }
801 
802 void
move_to_chunk_containing(Xapian::docid desired_did)803 FlintPostList::move_to_chunk_containing(Xapian::docid desired_did)
804 {
805     LOGCALL_VOID(DB, "FlintPostList::move_to_chunk_containing", desired_did);
806     (void)cursor->find_entry(FlintPostListTable::make_key(term, desired_did));
807     Assert(!cursor->after_end());
808 
809     const char * keypos = cursor->current_key.data();
810     const char * keyend = keypos + cursor->current_key.size();
811     // Check we're still in same postlist
812     if (!check_tname_in_key_lite(&keypos, keyend, term)) {
813 	// This should only happen if the postlist doesn't exist at all.
814 	is_at_end = true;
815 	is_last_chunk = true;
816 	return;
817     }
818     is_at_end = false;
819 
820     cursor->read_tag();
821     pos = cursor->current_tag.data();
822     end = pos + cursor->current_tag.size();
823 
824     if (keypos == keyend) {
825 	// In first chunk
826 #ifdef XAPIAN_ASSERTIONS
827 	Xapian::doccount old_number_of_entries = number_of_entries;
828 	did = read_start_of_first_chunk(&pos, end, &number_of_entries, NULL);
829 	Assert(old_number_of_entries == number_of_entries);
830 #else
831 	did = read_start_of_first_chunk(&pos, end, NULL, NULL);
832 #endif
833     } else {
834 	// In normal chunk
835 	if (!F_unpack_uint_preserving_sort(&keypos, keyend, &did)) {
836 	    report_read_error(keypos);
837 	}
838     }
839 
840     first_did_in_chunk = did;
841     last_did_in_chunk = read_start_of_chunk(&pos, end, first_did_in_chunk,
842 					    &is_last_chunk);
843     read_wdf_and_length(&pos, end, &wdf, &doclength);
844 
845     // Possible, since desired_did might be after end of this chunk and before
846     // the next.
847     if (desired_did > last_did_in_chunk) next_chunk();
848 }
849 
850 bool
move_forward_in_chunk_to_at_least(Xapian::docid desired_did)851 FlintPostList::move_forward_in_chunk_to_at_least(Xapian::docid desired_did)
852 {
853     LOGCALL(DB, bool, "FlintPostList::move_forward_in_chunk_to_at_least", desired_did);
854     if (desired_did > last_did_in_chunk) {
855 	pos = end;
856 	RETURN(false);
857     }
858     while (did < desired_did) {
859 	// FIXME: perhaps we don't need to decode the wdf and document length
860 	// for documents we're skipping past.
861 	bool at_end_of_chunk = !next_in_chunk();
862 	if (at_end_of_chunk) RETURN(false);
863     }
864     RETURN(true);
865 }
866 
867 PostList *
skip_to(Xapian::docid desired_did,Xapian::weight w_min)868 FlintPostList::skip_to(Xapian::docid desired_did, Xapian::weight w_min)
869 {
870     LOGCALL(DB, PostList *, "FlintPostList::skip_to", desired_did | w_min);
871     (void)w_min; // no warning
872     // We've started now - if we hadn't already, we're already positioned
873     // at start so there's no need to actually do anything.
874     have_started = true;
875 
876     // Don't skip back, and don't need to do anything if already there.
877     if (is_at_end || desired_did <= did) RETURN(NULL);
878 
879     // Move to correct chunk
880     if (!current_chunk_contains(desired_did)) {
881 	move_to_chunk_containing(desired_did);
882 	// Might be at_end now, so we need to check before trying to move
883 	// forward in chunk.
884 	if (is_at_end) RETURN(NULL);
885     }
886 
887     // Move to correct position in chunk
888     bool have_document = move_forward_in_chunk_to_at_least(desired_did);
889     (void)have_document;
890     Assert(have_document);
891 
892     if (is_at_end) {
893 	LOGLINE(DB, "Skipped to end");
894     } else {
895 	LOGLINE(DB, "Skipped to docid " << did << ", wdf = " << wdf <<
896 	  	", doclength = " << doclength);
897     }
898 
899     RETURN(NULL);
900 }
901 
902 string
get_description() const903 FlintPostList::get_description() const
904 {
905     return term + ":" + str(number_of_entries);
906 }
907 
908 // Returns the last did to allow in this chunk.
909 Xapian::docid
get_chunk(const string & tname,Xapian::docid did,bool adding,FlintPostlistChunkReader ** from,FlintPostlistChunkWriter ** to)910 FlintPostListTable::get_chunk(const string &tname,
911 	  Xapian::docid did, bool adding,
912 	  FlintPostlistChunkReader ** from, FlintPostlistChunkWriter **to)
913 {
914     // Get chunk containing entry
915     string key = make_key(tname, did);
916 
917     // Find the right chunk
918     AutoPtr<FlintCursor> cursor(cursor_get());
919 
920     cursor->find_entry(key);
921     Assert(!cursor->after_end());
922 
923     const char * keypos = cursor->current_key.data();
924     const char * keyend = keypos + cursor->current_key.size();
925 
926     if (!check_tname_in_key(&keypos, keyend, tname)) {
927 	// Postlist for this termname doesn't exist.
928 	if (!adding)
929 	    throw Xapian::DatabaseCorruptError("Attempted to delete or modify an entry in a non-existent posting list for " + tname);
930 
931 	*from = NULL;
932 	*to = new FlintPostlistChunkWriter(string(), true, tname, true);
933 	return Xapian::docid(-1);
934     }
935 
936     // See if we're appending - if so we can shortcut by just copying
937     // the data part of the chunk wholesale.
938     bool is_first_chunk = (keypos == keyend);
939 
940     cursor->read_tag();
941     const char * pos = cursor->current_tag.data();
942     const char * end = pos + cursor->current_tag.size();
943     Xapian::docid first_did_in_chunk;
944     if (is_first_chunk) {
945 	first_did_in_chunk = read_start_of_first_chunk(&pos, end, NULL, NULL);
946     } else {
947 	if (!F_unpack_uint_preserving_sort(&keypos, keyend,
948 					   &first_did_in_chunk)) {
949 	    report_read_error(keypos);
950 	}
951     }
952 
953     bool is_last_chunk;
954     Xapian::docid last_did_in_chunk;
955     last_did_in_chunk = read_start_of_chunk(&pos, end, first_did_in_chunk, &is_last_chunk);
956     *to = new FlintPostlistChunkWriter(cursor->current_key, is_first_chunk, tname,
957 				  is_last_chunk);
958     if (did > last_did_in_chunk) {
959 	// This is the shortcut.  Not very pretty, but I'll leave refactoring
960 	// until I've a clearer picture of everything which needs to be done.
961 	// (FIXME)
962 	*from = NULL;
963 	(*to)->raw_append(first_did_in_chunk, last_did_in_chunk,
964 			  string(pos, end));
965     } else {
966 	*from = new FlintPostlistChunkReader(first_did_in_chunk, string(pos, end));
967     }
968     if (is_last_chunk) return Xapian::docid(-1);
969 
970     // Find first did of next tag.
971     cursor->next();
972     if (cursor->after_end()) {
973 	throw Xapian::DatabaseCorruptError("Expected another key but found none");
974     }
975     const char *kpos = cursor->current_key.data();
976     const char *kend = kpos + cursor->current_key.size();
977     if (!check_tname_in_key(&kpos, kend, tname)) {
978 	throw Xapian::DatabaseCorruptError("Expected another key with the same term name but found a different one");
979     }
980 
981     // Read the new first docid
982     Xapian::docid first_did_of_next_chunk;
983     if (!F_unpack_uint_preserving_sort(&kpos, kend, &first_did_of_next_chunk)) {
984 	report_read_error(kpos);
985     }
986     return first_did_of_next_chunk - 1;
987 }
988 
989 void
merge_changes(const map<string,map<Xapian::docid,pair<char,Xapian::termcount>>> & mod_plists,const map<Xapian::docid,Xapian::termcount> & doclens,const map<string,pair<Xapian::termcount_diff,Xapian::termcount_diff>> & freq_deltas)990 FlintPostListTable::merge_changes(
991     const map<string, map<Xapian::docid, pair<char, Xapian::termcount> > > & mod_plists,
992     const map<Xapian::docid, Xapian::termcount> & doclens,
993     const map<string, pair<Xapian::termcount_diff, Xapian::termcount_diff> > & freq_deltas)
994 {
995     LOGCALL_VOID(DB, "FlintPostListTable::merge_changes", mod_plists | doclens | freq_deltas);
996     map<string, map<Xapian::docid, pair<char, Xapian::termcount> > >::const_iterator i;
997     for (i = mod_plists.begin(); i != mod_plists.end(); ++i) {
998 	if (i->second.empty()) continue;
999 	string tname = i->first;
1000 	{
1001 	    // Rewrite the first chunk of this posting list with the updated
1002 	    // termfreq and collfreq.
1003 	    map<string, pair<Xapian::termcount_diff, Xapian::termcount_diff> >::const_iterator deltas = freq_deltas.find(tname);
1004 	    Assert(deltas != freq_deltas.end());
1005 
1006 	    string current_key = make_key(tname);
1007 	    string tag;
1008 	    (void)get_exact_entry(current_key, tag);
1009 
1010 	    // Read start of first chunk to get termfreq and collfreq.
1011 	    const char *pos = tag.data();
1012 	    const char *end = pos + tag.size();
1013 	    Xapian::doccount termfreq;
1014 	    Xapian::termcount collfreq;
1015 	    Xapian::docid firstdid, lastdid;
1016 	    bool islast;
1017 	    if (pos == end) {
1018 		termfreq = 0;
1019 		collfreq = 0;
1020 		firstdid = 0;
1021 		lastdid = 0;
1022 		islast = true;
1023 	    } else {
1024 		firstdid = read_start_of_first_chunk(&pos, end,
1025 						     &termfreq, &collfreq);
1026 		// Handle the generic start of chunk header.
1027 		lastdid = read_start_of_chunk(&pos, end, firstdid, &islast);
1028 	    }
1029 
1030 	    termfreq += deltas->second.first;
1031 	    if (termfreq == 0) {
1032 		// All postings deleted!  So we can shortcut by zapping the
1033 		// posting list.
1034 		if (islast) {
1035 		    // Only one entry for this posting list.
1036 		    del(current_key);
1037 		    continue;
1038 		}
1039 		AutoPtr<FlintCursor> cursor(cursor_get());
1040 		bool found = cursor->find_entry(current_key);
1041 		Assert(found);
1042 		if (!found) continue; // Reduce damage!
1043 		while (cursor->del()) {
1044 		    const char *kpos = cursor->current_key.data();
1045 		    const char *kend = kpos + cursor->current_key.size();
1046 		    if (!check_tname_in_key_lite(&kpos, kend, tname)) break;
1047 		}
1048 		continue;
1049 	    }
1050 	    collfreq += deltas->second.second;
1051 
1052 	    // Rewrite start of first chunk to update termfreq and collfreq.
1053 	    string newhdr = make_start_of_first_chunk(termfreq, collfreq, firstdid);
1054 	    newhdr += make_start_of_chunk(islast, firstdid, lastdid);
1055 	    if (pos == end) {
1056 		add(current_key, newhdr);
1057 	    } else {
1058 		Assert((size_t)(pos - tag.data()) <= tag.size());
1059 		tag.replace(0, pos - tag.data(), newhdr);
1060 		add(current_key, tag);
1061 	    }
1062 	}
1063 	map<Xapian::docid, pair<char, Xapian::termcount> >::const_iterator j;
1064 	j = i->second.begin();
1065 	Assert(j != i->second.end()); // This case is caught above.
1066 
1067 	Xapian::docid max_did;
1068 	FlintPostlistChunkReader *from;
1069 	FlintPostlistChunkWriter *to;
1070 	max_did = get_chunk(tname, j->first, j->second.first == 'A',
1071 			    &from, &to);
1072 	for ( ; j != i->second.end(); ++j) {
1073 	    Xapian::docid did = j->first;
1074 
1075 next_chunk:
1076 	    LOGLINE(DB, "Updating tname=" << tname << ", did=" << did);
1077 	    if (from) while (!from->is_at_end()) {
1078 		Xapian::docid copy_did = from->get_docid();
1079 		if (copy_did >= did) {
1080 		    if (copy_did == did) {
1081 			Assert(j->second.first != 'A');
1082 			from->next();
1083 		    }
1084 		    break;
1085 		}
1086 		to->append(this, copy_did,
1087 			   from->get_wdf(), from->get_doclength());
1088 		from->next();
1089 	    }
1090 	    if ((!from || from->is_at_end()) && did > max_did) {
1091 		delete from;
1092 		to->flush(this);
1093 		delete to;
1094 		max_did = get_chunk(tname, did, false, &from, &to);
1095 		goto next_chunk;
1096 	    }
1097 
1098 	    if (j->second.first != 'D') {
1099 		map<Xapian::docid, Xapian::termcount>::const_iterator k = doclens.find(did);
1100 		Assert(k != doclens.end());
1101 		Xapian::termcount new_doclen = k->second;
1102 		Xapian::termcount new_wdf = j->second.second;
1103 
1104 		to->append(this, did, new_wdf, new_doclen);
1105 	    }
1106 	}
1107 
1108 	if (from) {
1109 	    while (!from->is_at_end()) {
1110 		to->append(this, from->get_docid(),
1111 			   from->get_wdf(), from->get_doclength());
1112 		from->next();
1113 	    }
1114 	    delete from;
1115 	}
1116 	to->flush(this);
1117 	delete to;
1118     }
1119 }
1120