1 /* multimatch.cc
2  *
3  * Copyright 1999,2000,2001 BrightStation PLC
4  * Copyright 2001,2002 Ananova Ltd
5  * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010 Olly Betts
6  * Copyright 2003 Orange PCS Ltd
7  * Copyright 2003 Sam Liddicott
8  * Copyright 2007,2008,2009 Lemur Consulting Ltd
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public License as
12  * published by the Free Software Foundation; either version 2 of the
13  * License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
23  * USA
24  */
25 
26 #include <config.h>
27 
28 #include "multimatch.h"
29 
30 #include "autoptr.h"
31 #include "collapser.h"
32 #include "debuglog.h"
33 #include "submatch.h"
34 #include "localsubmatch.h"
35 #include "omassert.h"
36 #include "omenquireinternal.h"
37 
38 #include "emptypostlist.h"
39 #include "branchpostlist.h"
40 #include "mergepostlist.h"
41 
42 #include "document.h"
43 #include "omqueryinternal.h"
44 
45 #include "msetcmp.h"
46 
47 #include "valuestreamdocument.h"
48 #include "weightinternal.h"
49 
50 #include <xapian/errorhandler.h>
51 #include <xapian/matchspy.h>
52 #include <xapian/version.h> // For XAPIAN_HAS_REMOTE_BACKEND
53 
54 #ifdef XAPIAN_HAS_REMOTE_BACKEND
55 #include "remotesubmatch.h"
56 #include "remote-database.h"
57 #endif /* XAPIAN_HAS_REMOTE_BACKEND */
58 
59 #include <algorithm>
60 #include <cfloat> // For DBL_EPSILON.
61 #include <climits> // For UINT_MAX.
62 #include <vector>
63 #include <map>
64 #include <set>
65 
66 using namespace std;
67 
68 const Xapian::Enquire::Internal::sort_setting REL =
69 	Xapian::Enquire::Internal::REL;
70 const Xapian::Enquire::Internal::sort_setting REL_VAL =
71 	Xapian::Enquire::Internal::REL_VAL;
72 const Xapian::Enquire::Internal::sort_setting VAL =
73 	Xapian::Enquire::Internal::VAL;
74 #if 0 // VAL_REL isn't currently used which causes a warning with SGI CC.
75 const Xapian::Enquire::Internal::sort_setting VAL_REL =
76 	Xapian::Enquire::Internal::VAL_REL;
77 #endif
78 
79 /** Split an RSet into several sub rsets, one for each database.
80  *
81  *  @param rset The RSet to split.
82  *  @param number_of_subdbs The number of sub databases which exist.
83  *  @param subrsets Vector of RSets which will the sub rsets will be placed in.
84  *                  This should be empty when the function is called.
85  */
86 static void
split_rset_by_db(const Xapian::RSet * rset,Xapian::doccount number_of_subdbs,vector<Xapian::RSet> & subrsets)87 split_rset_by_db(const Xapian::RSet * rset,
88 		 Xapian::doccount number_of_subdbs,
89 		 vector<Xapian::RSet> & subrsets)
90 {
91     LOGCALL_STATIC_VOID(MATCH, "split_rset_by_db", rset | number_of_subdbs | subrsets);
92     if (rset) {
93 	if (number_of_subdbs == 1) {
94 	    // The common case of a single database is easy to handle.
95 	    subrsets.push_back(*rset);
96 	} else {
97 	    // Can't just use vector::resize() here, since that creates N
98 	    // copies of the same RSet!
99 	    subrsets.reserve(number_of_subdbs);
100 	    for (size_t i = 0; i < number_of_subdbs; ++i) {
101 		subrsets.push_back(Xapian::RSet());
102 	    }
103 
104 	    const set<Xapian::docid> & rsetitems = rset->internal->get_items();
105 	    set<Xapian::docid>::const_iterator j;
106 	    for (j = rsetitems.begin(); j != rsetitems.end(); ++j) {
107 		Xapian::doccount local_docid = (*j - 1) / number_of_subdbs + 1;
108 		Xapian::doccount subdatabase = (*j - 1) % number_of_subdbs;
109 		subrsets[subdatabase].add_document(local_docid);
110 	    }
111 	}
112     } else {
113 	// NB vector::resize() creates N copies of the same empty RSet.
114 	subrsets.resize(number_of_subdbs);
115     }
116     Assert(subrsets.size() == number_of_subdbs);
117 }
118 
119 /** Prepare some SubMatches.
120  *
121  *  This calls the prepare_match() method on each SubMatch object, causing them
122  *  to lookup various statistics.
123  *
124  *  This method is rather complicated in order to handle remote matches
125  *  efficiently.  Instead of simply calling "prepare_match()" on each submatch
126  *  and waiting for it to return, it first calls "prepare_match(true)" on each
127  *  submatch.  If any of these calls return false, indicating that the required
128  *  information has not yet been received from the server, the method will try
129  *  those which returned false again, passing "false" as a parameter to
130  *  indicate that they should block until the information is ready.
131  *
132  *  This should improve performance in the case of mixed local-and-remote
133  *  searches - the local searchers will all fetch their statistics from disk
134  *  without waiting for the remote searchers, so as soon as the remote searcher
135  *  statistics arrive, we can move on to the next step.
136  */
137 static void
prepare_sub_matches(vector<Xapian::Internal::RefCntPtr<SubMatch>> & leaves,Xapian::ErrorHandler * errorhandler,Xapian::Weight::Internal & stats)138 prepare_sub_matches(vector<Xapian::Internal::RefCntPtr<SubMatch> > & leaves,
139 		    Xapian::ErrorHandler * errorhandler,
140 		    Xapian::Weight::Internal & stats)
141 {
142     LOGCALL_STATIC_VOID(MATCH, "prepare_sub_matches", leaves | errorhandler | stats);
143     // We use a vector<bool> to track which SubMatches we're already prepared.
144     vector<bool> prepared;
145     prepared.resize(leaves.size(), false);
146     size_t unprepared = leaves.size();
147     bool nowait = true;
148     while (unprepared) {
149 	for (size_t leaf = 0; leaf < leaves.size(); ++leaf) {
150 	    if (prepared[leaf]) continue;
151 	    try {
152 		SubMatch * submatch = leaves[leaf].get();
153 		if (!submatch || submatch->prepare_match(nowait, stats)) {
154 		    prepared[leaf] = true;
155 		    --unprepared;
156 		}
157 	    } catch (Xapian::Error & e) {
158 		if (!errorhandler) throw;
159 
160 		LOGLINE(EXCEPTION, "Calling error handler for prepare_match() on a SubMatch.");
161 		(*errorhandler)(e);
162 		// Continue match without this sub-match.
163 		leaves[leaf] = NULL;
164 		prepared[leaf] = true;
165 		--unprepared;
166 	    }
167 	}
168 	// Use blocking IO on subsequent passes, so that we don't go into
169 	// a tight loop.
170 	nowait = false;
171     }
172 }
173 
174 /// Class which applies several match spies in turn.
175 class MultipleMatchSpy : public Xapian::MatchSpy {
176   private:
177     /// List of match spies to call, in order.
178     const std::vector<Xapian::MatchSpy *> & spies;
179 
180   public:
MultipleMatchSpy(const std::vector<Xapian::MatchSpy * > & spies_)181     MultipleMatchSpy(const std::vector<Xapian::MatchSpy *> & spies_)
182 	    : spies(spies_) {}
183 
184     /** Implementation of virtual operator().
185      *
186      *  This implementation calls all the spies in turn.
187      */
188     void operator()(const Xapian::Document &doc, Xapian::weight wt);
189 };
190 
191 void
operator ()(const Xapian::Document & doc,Xapian::weight wt)192 MultipleMatchSpy::operator()(const Xapian::Document &doc, Xapian::weight wt) {
193     LOGCALL_VOID(MATCH, "MultipleMatchSpy::operator()", doc | wt);
194     vector<Xapian::MatchSpy *>::const_iterator i;
195     for (i = spies.begin(); i != spies.end(); ++i) {
196 	(**i)(doc, wt);
197     }
198 }
199 
200 ////////////////////////////////////
201 // Initialisation and cleaning up //
202 ////////////////////////////////////
MultiMatch(const Xapian::Database & db_,const Xapian::Query::Internal * query_,Xapian::termcount qlen,const Xapian::RSet * omrset,Xapian::doccount collapse_max_,Xapian::valueno collapse_key_,int percent_cutoff_,Xapian::weight weight_cutoff_,Xapian::Enquire::docid_order order_,Xapian::valueno sort_key_,Xapian::Enquire::Internal::sort_setting sort_by_,bool sort_value_forward_,Xapian::ErrorHandler * errorhandler_,Xapian::Weight::Internal & stats,const Xapian::Weight * weight_,const vector<Xapian::MatchSpy * > & matchspies_,bool have_sorter,bool have_mdecider)203 MultiMatch::MultiMatch(const Xapian::Database &db_,
204 		       const Xapian::Query::Internal * query_,
205 		       Xapian::termcount qlen,
206 		       const Xapian::RSet * omrset,
207 		       Xapian::doccount collapse_max_,
208 		       Xapian::valueno collapse_key_,
209 		       int percent_cutoff_, Xapian::weight weight_cutoff_,
210 		       Xapian::Enquire::docid_order order_,
211 		       Xapian::valueno sort_key_,
212 		       Xapian::Enquire::Internal::sort_setting sort_by_,
213 		       bool sort_value_forward_,
214 		       Xapian::ErrorHandler * errorhandler_,
215 		       Xapian::Weight::Internal & stats,
216 		       const Xapian::Weight * weight_,
217 		       const vector<Xapian::MatchSpy *> & matchspies_,
218 		       bool have_sorter, bool have_mdecider)
219 	: db(db_), query(query_),
220 	  collapse_max(collapse_max_), collapse_key(collapse_key_),
221 	  percent_cutoff(percent_cutoff_), weight_cutoff(weight_cutoff_),
222 	  order(order_),
223 	  sort_key(sort_key_), sort_by(sort_by_),
224 	  sort_value_forward(sort_value_forward_),
225 	  errorhandler(errorhandler_), weight(weight_),
226 	  is_remote(db.internal.size()),
227 	  matchspies(matchspies_)
228 {
229     LOGCALL_CTOR(MATCH, "MultiMatch", db_ | query_ | qlen | omrset | collapse_max_ | collapse_key_ | percent_cutoff_ | weight_cutoff_ | int(order_) | sort_key_ | int(sort_by_) | sort_value_forward_ | errorhandler_ | stats | weight_ | matchspies_ | have_sorter | have_mdecider);
230 
231     if (!query) return;
232     query->validate_query();
233 
234     Xapian::doccount number_of_subdbs = db.internal.size();
235     vector<Xapian::RSet> subrsets;
236     split_rset_by_db(omrset, number_of_subdbs, subrsets);
237 
238     for (size_t i = 0; i != number_of_subdbs; ++i) {
239 	Xapian::Database::Internal *subdb = db.internal[i].get();
240 	Assert(subdb);
241 	Xapian::Internal::RefCntPtr<SubMatch> smatch;
242 	try {
243 	    // There is currently only one special case, for network databases.
244 #ifdef XAPIAN_HAS_REMOTE_BACKEND
245 	    RemoteDatabase *rem_db = subdb->as_remotedatabase();
246 	    if (rem_db) {
247 		if (have_sorter) {
248 		    throw Xapian::UnimplementedError("Xapian::KeyMaker not supported for the remote backend");
249 		}
250 		if (have_mdecider) {
251 		    throw Xapian::UnimplementedError("Xapian::MatchDecider not supported for the remote backend");
252 		}
253 		rem_db->set_query(query, qlen, collapse_max, collapse_key,
254 				  order, sort_key, sort_by, sort_value_forward,
255 				  percent_cutoff, weight_cutoff, weight,
256 				  subrsets[i], matchspies);
257 		bool decreasing_relevance =
258 		    (sort_by == REL || sort_by == REL_VAL);
259 		smatch = new RemoteSubMatch(rem_db, decreasing_relevance, matchspies);
260 		is_remote[i] = true;
261 	    } else {
262 		smatch = new LocalSubMatch(subdb, query, qlen, subrsets[i], weight);
263 	    }
264 #else
265 	    // Avoid unused parameter warnings.
266 	    (void)have_sorter;
267 	    (void)have_mdecider;
268 	    smatch = new LocalSubMatch(subdb, query, qlen, subrsets[i], weight);
269 #endif /* XAPIAN_HAS_REMOTE_BACKEND */
270 	} catch (Xapian::Error & e) {
271 	    if (!errorhandler) throw;
272 	    LOGLINE(EXCEPTION, "Calling error handler for creation of a SubMatch from a database and query.");
273 	    (*errorhandler)(e);
274 	    // Continue match without this sub-postlist.
275 	    smatch = NULL;
276 	}
277 	leaves.push_back(smatch);
278     }
279 
280     stats.mark_wanted_terms(*query);
281     prepare_sub_matches(leaves, errorhandler, stats);
282     stats.set_bounds_from_db(db);
283 }
284 
285 Xapian::weight
getorrecalc_maxweight(PostList * pl)286 MultiMatch::getorrecalc_maxweight(PostList *pl)
287 {
288     LOGCALL(MATCH, Xapian::weight, "MultiMatch::getorrecalc_maxweight", pl);
289     Xapian::weight wt;
290     if (recalculate_w_max) {
291 	LOGLINE(MATCH, "recalculating max weight");
292 	wt = pl->recalc_maxweight();
293 	recalculate_w_max = false;
294     } else {
295 	wt = pl->get_maxweight();
296 	LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
297 	AssertEqDoubleParanoid(wt, pl->recalc_maxweight());
298     }
299     LOGLINE(MATCH, "max possible doc weight = " << wt);
300     RETURN(wt);
301 }
302 
303 void
get_mset(Xapian::doccount first,Xapian::doccount maxitems,Xapian::doccount check_at_least,Xapian::MSet & mset,const Xapian::Weight::Internal & stats,const Xapian::MatchDecider * mdecider,const Xapian::MatchDecider * matchspy_legacy,const Xapian::KeyMaker * sorter)304 MultiMatch::get_mset(Xapian::doccount first, Xapian::doccount maxitems,
305 		     Xapian::doccount check_at_least,
306 		     Xapian::MSet & mset,
307 		     const Xapian::Weight::Internal & stats,
308 		     const Xapian::MatchDecider *mdecider,
309 		     const Xapian::MatchDecider *matchspy_legacy,
310 		     const Xapian::KeyMaker *sorter)
311 {
312     LOGCALL_VOID(MATCH, "MultiMatch::get_mset", first | maxitems | check_at_least | Literal("mset") | stats | Literal("mdecider") | Literal("matchspy_legacy") | Literal("sorter"));
313     AssertRel(check_at_least,>=,maxitems);
314 
315     if (!query) {
316 	mset = Xapian::MSet(new Xapian::MSet::Internal());
317 	mset.internal->firstitem = first;
318 	return;
319     }
320 
321     Assert(!leaves.empty());
322 
323 #ifdef XAPIAN_HAS_REMOTE_BACKEND
324     // If there's only one database and it's remote, we can just unserialise
325     // its MSet and return that.
326     if (leaves.size() == 1 && is_remote[0]) {
327 	RemoteSubMatch * rem_match;
328 	rem_match = static_cast<RemoteSubMatch*>(leaves[0].get());
329 	rem_match->start_match(first, maxitems, check_at_least, stats);
330 	rem_match->get_mset(mset);
331 	return;
332     }
333 #endif
334 
335     // Start matchers.
336     {
337 	vector<Xapian::Internal::RefCntPtr<SubMatch> >::iterator leaf;
338 	for (leaf = leaves.begin(); leaf != leaves.end(); ++leaf) {
339 	    if (!(*leaf).get()) continue;
340 	    try {
341 		(*leaf)->start_match(0, first + maxitems,
342 				     first + check_at_least, stats);
343 	    } catch (Xapian::Error & e) {
344 		if (!errorhandler) throw;
345 		LOGLINE(EXCEPTION, "Calling error handler for "
346 				   "start_match() on a SubMatch.");
347 		(*errorhandler)(e);
348 		// Continue match without this sub-match.
349 		*leaf = NULL;
350 	    }
351 	}
352     }
353 
354     // Get postlists and term info
355     vector<PostList *> postlists;
356     map<string, Xapian::MSet::Internal::TermFreqAndWeight> termfreqandwts;
357     map<string, Xapian::MSet::Internal::TermFreqAndWeight> * termfreqandwts_ptr;
358     termfreqandwts_ptr = &termfreqandwts;
359 
360     Xapian::termcount total_subqs = 0;
361     // Keep a count of matches which we know exist, but we won't see.  This
362     // occurs when a submatch is remote, and returns a lower bound on the
363     // number of matching documents which is higher than the number of
364     // documents it returns (because it wasn't asked for more documents).
365     Xapian::doccount definite_matches_not_seen = 0;
366     for (size_t i = 0; i != leaves.size(); ++i) {
367 	PostList *pl;
368 	try {
369 	    pl = leaves[i]->get_postlist_and_term_info(this,
370 						       termfreqandwts_ptr,
371 						       &total_subqs);
372 	    if (termfreqandwts_ptr && !termfreqandwts.empty())
373 		termfreqandwts_ptr = NULL;
374 	    if (is_remote[i]) {
375 		if (pl->get_termfreq_min() > first + maxitems) {
376 		    LOGLINE(MATCH, "Found " <<
377 				   pl->get_termfreq_min() - (first + maxitems)
378 				   << " definite matches in remote submatch "
379 				   "which aren't passed to local match");
380 		    definite_matches_not_seen += pl->get_termfreq_min();
381 		    definite_matches_not_seen -= first + maxitems;
382 		}
383 	    }
384 	} catch (Xapian::Error & e) {
385 	    if (!errorhandler) throw;
386 	    LOGLINE(EXCEPTION, "Calling error handler for "
387 			       "get_term_info() on a SubMatch.");
388 	    (*errorhandler)(e);
389 	    // FIXME: check if *ALL* the remote servers have failed!
390 	    // Continue match without this sub-match.
391 	    leaves[i] = NULL;
392 	    pl = new EmptyPostList;
393 	}
394 	postlists.push_back(pl);
395     }
396     Assert(!postlists.empty());
397 
398     ValueStreamDocument vsdoc(db);
399     ++vsdoc.ref_count;
400     Xapian::Document doc(&vsdoc);
401 
402     // Get a single combined postlist
403     AutoPtr<PostList> pl;
404     if (postlists.size() == 1) {
405 	pl.reset(postlists.front());
406     } else {
407 	pl.reset(new MergePostList(postlists, this, vsdoc, errorhandler));
408     }
409 
410     LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
411 
412 #ifdef XAPIAN_DEBUG_LOG
413     {
414 	map<string, Xapian::MSet::Internal::TermFreqAndWeight>::const_iterator tfwi;
415 	for (tfwi = termfreqandwts.begin(); tfwi != termfreqandwts.end(); ++tfwi) {
416 	    LOGLINE(MATCH, "termfreqandwts[" << tfwi->first << "] = " << tfwi->second.termfreq << ", " << tfwi->second.termweight);
417 	}
418     }
419 #endif
420 
421     // Empty result set
422     Xapian::doccount docs_matched = 0;
423     Xapian::weight greatest_wt = 0;
424     Xapian::termcount greatest_wt_subqs_matched = 0;
425 #ifdef XAPIAN_HAS_REMOTE_BACKEND
426     unsigned greatest_wt_subqs_db_num = UINT_MAX;
427 #endif
428     vector<Xapian::Internal::MSetItem> items;
429 
430     // maximum weight a document could possibly have
431     const Xapian::weight max_possible = pl->recalc_maxweight();
432 
433     LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
434     recalculate_w_max = false;
435 
436     Xapian::doccount matches_upper_bound = pl->get_termfreq_max();
437     Xapian::doccount matches_lower_bound = 0;
438     Xapian::doccount matches_estimated   = pl->get_termfreq_est();
439 
440     if (mdecider == NULL && matchspy_legacy == NULL) {
441 	// If we have a matcher decider or match spy, the lower bound must be
442 	// set to 0 as we could discard all hits.  Otherwise set it to the
443 	// minimum number of entries which the postlist could return.
444 	matches_lower_bound = pl->get_termfreq_min();
445     }
446 
447     // Prepare the matchspy
448     Xapian::MatchSpy *matchspy = NULL;
449     MultipleMatchSpy multispy(matchspies);
450     if (!matchspies.empty()) {
451 	if (matchspies.size() == 1) {
452 	    matchspy = matchspies[0];
453 	} else {
454 	    matchspy = &multispy;
455 	}
456     }
457 
458     // Check if any results have been asked for (might just be wanting
459     // maxweight).
460     if (check_at_least == 0) {
461 	pl.reset(NULL);
462 	Xapian::doccount uncollapsed_lower_bound = matches_lower_bound;
463 	if (collapse_max) {
464 	    // Lower bound must be set to no more than collapse_max, since it's
465 	    // possible that all matching documents have the same collapse_key
466 	    // value and so are collapsed together.
467 	    if (matches_lower_bound > collapse_max)
468 		matches_lower_bound = collapse_max;
469 	}
470 
471 	mset = Xapian::MSet(new Xapian::MSet::Internal(
472 					   first,
473 					   matches_upper_bound,
474 					   matches_lower_bound,
475 					   matches_estimated,
476 					   matches_upper_bound,
477 					   uncollapsed_lower_bound,
478 					   matches_estimated,
479 					   max_possible, greatest_wt, items,
480 					   termfreqandwts,
481 					   0));
482 	return;
483     }
484 
485     // Number of documents considered by a decider or matchspy_legacy.
486     Xapian::doccount decider_considered = 0;
487     // Number of documents denied by the decider or matchspy_legacy.
488     Xapian::doccount decider_denied = 0;
489 
490     // Set max number of results that we want - this is used to decide
491     // when to throw away unwanted items.
492     Xapian::doccount max_msize = first + maxitems;
493     items.reserve(max_msize + 1);
494 
495     // Tracks the minimum item currently eligible for the MSet - we compare
496     // candidate items against this.
497     Xapian::Internal::MSetItem min_item(0.0, 0);
498 
499     // Minimum weight an item must have to be worth considering.
500     Xapian::weight min_weight = weight_cutoff;
501 
502     // Factor to multiply maximum weight seen by to get the cutoff weight.
503     Xapian::weight percent_cutoff_factor = percent_cutoff / 100.0;
504     // Corresponding correction to that in omenquire.cc to account for excess
505     // precision on x86.
506     percent_cutoff_factor -= DBL_EPSILON;
507 
508     // Object to handle collapsing.
509     Collapser collapser(collapse_key, collapse_max);
510 
511     /// Comparison functor for sorting MSet
512     bool sort_forward = (order != Xapian::Enquire::DESCENDING);
513     MSetCmp mcmp(get_msetcmp_function(sort_by, sort_forward, sort_value_forward));
514 
515     // Perform query
516 
517     // We form the mset in two stages.  In the first we fill up our working
518     // mset.  Adding a new document does not remove another.
519     //
520     // In the second, we consider documents which rank higher than the current
521     // lowest ranking document in the mset.  Each document added expels the
522     // current lowest ranking document.
523     //
524     // If a percentage cutoff is in effect, it can cause the matcher to return
525     // from the second stage from the first.
526 
527     // Is the mset a valid heap?
528     bool is_heap = false;
529 
530     while (true) {
531 	bool pushback;
532 
533 	if (rare(recalculate_w_max)) {
534 	    if (min_weight > 0.0) {
535 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
536 		    LOGLINE(MATCH, "*** TERMINATING EARLY (1)");
537 		    break;
538 		}
539 	    }
540 	}
541 
542 	PostList * pl_copy = pl.get();
543 	if (rare(next_handling_prune(pl_copy, min_weight, this))) {
544 	    (void)pl.release();
545 	    pl.reset(pl_copy);
546 	    LOGLINE(MATCH, "*** REPLACING ROOT");
547 
548 	    if (min_weight > 0.0) {
549 		// No need for a full recalc (unless we've got to do one
550 		// because of a prune elsewhere) - we're just switching to a
551 		// subtree.
552 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
553 		    LOGLINE(MATCH, "*** TERMINATING EARLY (2)");
554 		    break;
555 		}
556 	    }
557 	}
558 
559 	if (rare(pl->at_end())) {
560 	    LOGLINE(MATCH, "Reached end of potential matches");
561 	    break;
562 	}
563 
564 	// Only calculate the weight if we need it for mcmp, or there's a
565 	// percentage or weight cutoff in effect.  Otherwise we calculate it
566 	// below if we haven't already rejected this candidate.
567 	Xapian::weight wt = 0.0;
568 	bool calculated_weight = false;
569 	if (sort_by != VAL || min_weight > 0.0) {
570 	    wt = pl->get_weight();
571 	    if (wt < min_weight) {
572 		LOGLINE(MATCH, "Rejecting potential match due to insufficient weight");
573 		continue;
574 	    }
575 	    calculated_weight = true;
576 	}
577 
578 	Xapian::docid did = pl->get_docid();
579 	vsdoc.set_document(did);
580 	LOGLINE(MATCH, "Candidate document id " << did << " wt " << wt);
581 	Xapian::Internal::MSetItem new_item(wt, did);
582 	if (sort_by != REL) {
583 	    const string * ptr = pl->get_sort_key();
584 	    if (ptr) {
585 		new_item.sort_key = *ptr;
586 	    } else if (sorter) {
587 		new_item.sort_key = (*sorter)(doc);
588 	    } else {
589 		new_item.sort_key = vsdoc.get_value(sort_key);
590 	    }
591 
592 	    // We're sorting by value (in part at least), so compare the item
593 	    // against the lowest currently in the proto-mset.  If sort_by is
594 	    // VAL, then new_item.wt won't yet be set, but that doesn't
595 	    // matter since it's not used by the sort function.
596 	    if (!mcmp(new_item, min_item)) {
597 		if (mdecider == NULL && !collapser && matchspy_legacy == NULL) {
598 		    // Document was definitely suitable for mset - no more
599 		    // processing needed.
600 		    LOGLINE(MATCH, "Making note of match item which sorts lower than min_item");
601 		    ++docs_matched;
602 		    if (!calculated_weight) wt = pl->get_weight();
603 		    if (matchspy) {
604 			matchspy->operator()(doc, wt);
605 		    }
606 		    if (wt > greatest_wt) goto new_greatest_weight;
607 		    continue;
608 		}
609 		if (docs_matched >= check_at_least) {
610 		    // We've seen enough items - we can drop this one.
611 		    LOGLINE(MATCH, "Dropping candidate which sorts lower than min_item");
612 		    // FIXME: hmm, match decider might have rejected this...
613 		    if (!calculated_weight) wt = pl->get_weight();
614 		    if (wt > greatest_wt) goto new_greatest_weight;
615 		    continue;
616 		}
617 		// We can't drop the item, because we need to show it
618 		// to the matchspy_legacy, test whether the mdecider would
619 		// accept it, and/or test whether it would be collapsed.
620 		LOGLINE(MATCH, "Keeping candidate which sorts lower than min_item for further investigation");
621 	    }
622 	}
623 
624 	// Use the match spy and/or decision functors (if specified).
625 	if (matchspy != NULL || mdecider != NULL || matchspy_legacy != NULL) {
626 	    const unsigned int multiplier = db.internal.size();
627 	    Assert(multiplier != 0);
628 	    Xapian::doccount n = (did - 1) % multiplier; // which actual database
629 	    // If the results are from a remote database, then the functor will
630 	    // already have been applied there so we can skip this step.
631 	    if (!is_remote[n]) {
632 		++decider_considered;
633 		if (matchspy_legacy && !matchspy_legacy->operator()(doc)) {
634 		    ++decider_denied;
635 		    continue;
636 		}
637 		if (mdecider && !mdecider->operator()(doc)) {
638 		    ++decider_denied;
639 		    continue;
640 		}
641 		if (matchspy) {
642 		    if (!calculated_weight) {
643 			wt = pl->get_weight();
644 			new_item.wt = wt;
645 			calculated_weight = true;
646 		    }
647 		    matchspy->operator()(doc, wt);
648 		}
649 	    }
650 	}
651 
652 	if (!calculated_weight) {
653 	    // we didn't calculate the weight above, but now we will need it
654 	    wt = pl->get_weight();
655 	    new_item.wt = wt;
656 	}
657 
658 	pushback = true;
659 
660 	// Perform collapsing on key if requested.
661 	if (collapser) {
662 	    collapse_result res;
663 	    res = collapser.process(new_item, pl.get(), vsdoc, mcmp);
664 	    if (res == REJECTED) {
665 		// If we're sorting by relevance primarily, then we throw away
666 		// the lower weighted document anyway.
667 		if (sort_by != REL && sort_by != REL_VAL) {
668 		    if (wt > greatest_wt) goto new_greatest_weight;
669 		}
670 		continue;
671 	    }
672 
673 	    if (res == REPLACED) {
674 		// There was a previous item in the collapse tab so
675 		// the MSet can't be empty.
676 		Assert(!items.empty());
677 
678 		const Xapian::Internal::MSetItem & old_item =
679 		    collapser.old_item;
680 		// This is one of the best collapse_max potential MSet entries
681 		// with this key which we've seen so far.  Check if the
682 		// entry with this key which it displaced might still be in the
683 		// proto-MSet.  If it might be, we need to check through for
684 		// it.
685 		Xapian::weight old_wt = old_item.wt;
686 		if (old_wt >= min_weight && mcmp(old_item, min_item)) {
687 		    // Scan through (unsorted) MSet looking for entry.
688 		    // FIXME: more efficient way than just scanning?
689 		    Xapian::docid olddid = old_item.did;
690 		    vector<Xapian::Internal::MSetItem>::iterator i;
691 		    for (i = items.begin(); i != items.end(); ++i) {
692 			if (i->did == olddid) {
693 			    LOGLINE(MATCH, "collapse: removing " <<
694 					   olddid << ": " <<
695 					   new_item.collapse_key);
696 			    // We can replace an arbitrary element in O(log N)
697 			    // but have to do it by hand (in this case the new
698 			    // elt is bigger, so we just swap down the tree).
699 			    // FIXME: implement this, and clean up is_heap
700 			    // handling
701 			    *i = new_item;
702 			    pushback = false;
703 			    is_heap = false;
704 			    break;
705 			}
706 		    }
707 		}
708 	    }
709 	}
710 
711 	// OK, actually add the item to the mset.
712 	if (pushback) {
713 	    ++docs_matched;
714 	    if (items.size() >= max_msize) {
715 		items.push_back(new_item);
716 		if (!is_heap) {
717 		    is_heap = true;
718 		    make_heap(items.begin(), items.end(), mcmp);
719 		} else {
720 		    push_heap<vector<Xapian::Internal::MSetItem>::iterator,
721 			      MSetCmp>(items.begin(), items.end(), mcmp);
722 		}
723 		pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
724 			 MSetCmp>(items.begin(), items.end(), mcmp);
725 		items.pop_back();
726 
727 		min_item = items.front();
728 		if (sort_by == REL || sort_by == REL_VAL) {
729 		    if (docs_matched >= check_at_least) {
730 			if (sort_by == REL) {
731 			    // We're done if this is a forward boolean match
732 			    // with only one database (bodgetastic, FIXME
733 			    // better if we can!)
734 			    if (rare(max_possible == 0 && sort_forward)) {
735 				// In the multi database case, MergePostList
736 				// currently processes each database
737 				// sequentially (which actually may well be
738 				// more efficient) so the docids in general
739 				// won't arrive in order.
740 				if (leaves.size() == 1) break;
741 			    }
742 			}
743 			if (min_item.wt > min_weight) {
744 			    LOGLINE(MATCH, "Setting min_weight to " <<
745 				    min_item.wt << " from " << min_weight);
746 			    min_weight = min_item.wt;
747 			}
748 		    }
749 		}
750 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
751 		    LOGLINE(MATCH, "*** TERMINATING EARLY (3)");
752 		    break;
753 		}
754 	    } else {
755 		items.push_back(new_item);
756 		is_heap = false;
757 		if (sort_by == REL && items.size() == max_msize) {
758 		    if (docs_matched >= check_at_least) {
759 			// We're done if this is a forward boolean match
760 			// with only one database (bodgetastic, FIXME
761 			// better if we can!)
762 			if (rare(max_possible == 0 && sort_forward)) {
763 			    // In the multi database case, MergePostList
764 			    // currently processes each database
765 			    // sequentially (which actually may well be
766 			    // more efficient) so the docids in general
767 			    // won't arrive in order.
768 			    if (leaves.size() == 1) break;
769 			}
770 		    }
771 		}
772 	    }
773 	}
774 
775 	// Keep a track of the greatest weight we've seen.
776 	if (wt > greatest_wt) {
777 new_greatest_weight:
778 	    greatest_wt = wt;
779 #ifdef XAPIAN_HAS_REMOTE_BACKEND
780 	    const unsigned int multiplier = db.internal.size();
781 	    unsigned int db_num = (did - 1) % multiplier;
782 	    if (is_remote[db_num]) {
783 		// Note that the greatest weighted document came from a remote
784 		// database, and which one.
785 		greatest_wt_subqs_db_num = db_num;
786 	    } else
787 #endif
788 	    {
789 		greatest_wt_subqs_matched = pl->count_matching_subqs();
790 #ifdef XAPIAN_HAS_REMOTE_BACKEND
791 		greatest_wt_subqs_db_num = UINT_MAX;
792 #endif
793 	    }
794 	    if (percent_cutoff) {
795 		Xapian::weight w = wt * percent_cutoff_factor;
796 		if (w > min_weight) {
797 		    min_weight = w;
798 		    if (!is_heap) {
799 			is_heap = true;
800 			make_heap<vector<Xapian::Internal::MSetItem>::iterator,
801 				  MSetCmp>(items.begin(), items.end(), mcmp);
802 		    }
803 		    while (!items.empty() && items.front().wt < min_weight) {
804 			pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
805 				 MSetCmp>(items.begin(), items.end(), mcmp);
806 			Assert(items.back().wt < min_weight);
807 			items.pop_back();
808 		    }
809 #ifdef XAPIAN_ASSERTIONS_PARANOID
810 		    vector<Xapian::Internal::MSetItem>::const_iterator i;
811 		    for (i = items.begin(); i != items.end(); ++i) {
812 			Assert(i->wt >= min_weight);
813 		    }
814 #endif
815 		}
816 	    }
817 	}
818     }
819 
820     // done with posting list tree
821     pl.reset(NULL);
822 
823     double percent_scale = 0;
824     if (!items.empty() && greatest_wt > 0) {
825 #ifdef XAPIAN_HAS_REMOTE_BACKEND
826 	if (greatest_wt_subqs_db_num != UINT_MAX) {
827 	    const unsigned int n = greatest_wt_subqs_db_num;
828 	    RemoteSubMatch * rem_match;
829 	    rem_match = static_cast<RemoteSubMatch*>(leaves[n].get());
830 	    percent_scale = rem_match->get_percent_factor() / 100.0;
831 	} else
832 #endif
833 	{
834 	    percent_scale = greatest_wt_subqs_matched / double(total_subqs);
835 	    percent_scale /= greatest_wt;
836 	}
837 	Assert(percent_scale > 0);
838 	if (percent_cutoff) {
839 	    // FIXME: better to sort and binary chop maybe?
840 	    // Or we could just use a linear scan here instead.
841 
842 	    // trim the mset to the correct answer...
843 	    Xapian::weight min_wt = percent_cutoff_factor / percent_scale;
844 	    if (!is_heap) {
845 		is_heap = true;
846 		make_heap<vector<Xapian::Internal::MSetItem>::iterator,
847 			  MSetCmp>(items.begin(), items.end(), mcmp);
848 	    }
849 	    while (!items.empty() && items.front().wt < min_wt) {
850 		pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
851 			 MSetCmp>(items.begin(), items.end(), mcmp);
852 		Assert(items.back().wt < min_wt);
853 		items.pop_back();
854 	    }
855 #ifdef XAPIAN_ASSERTIONS_PARANOID
856 	    vector<Xapian::Internal::MSetItem>::const_iterator j;
857 	    for (j = items.begin(); j != items.end(); ++j) {
858 		Assert(j->wt >= min_wt);
859 	    }
860 #endif
861 	}
862     }
863 
864     LOGLINE(MATCH,
865 	    "docs_matched = " << docs_matched <<
866 	    ", definite_matches_not_seen = " << definite_matches_not_seen <<
867 	    ", matches_lower_bound = " << matches_lower_bound <<
868 	    ", matches_estimated = " << matches_estimated <<
869 	    ", matches_upper_bound = " << matches_upper_bound);
870 
871     // Adjust docs_matched to take account of documents which matched remotely
872     // but weren't sent across.
873     docs_matched += definite_matches_not_seen;
874 
875     Xapian::doccount uncollapsed_lower_bound = matches_lower_bound;
876     Xapian::doccount uncollapsed_upper_bound = matches_upper_bound;
877     Xapian::doccount uncollapsed_estimated = matches_estimated;
878     if (items.size() < max_msize) {
879 	// We have fewer items in the mset than we tried to get for it, so we
880 	// must have all the matches in it.
881 	LOGLINE(MATCH, "items.size() = " << items.size() <<
882 		", max_msize = " << max_msize << ", setting bounds equal");
883 	Assert(definite_matches_not_seen == 0);
884 	Assert(percent_cutoff || docs_matched == items.size());
885 	matches_lower_bound = matches_upper_bound = matches_estimated
886 	    = items.size();
887 	if (collapser && matches_lower_bound > uncollapsed_lower_bound)
888 	    uncollapsed_lower_bound = matches_lower_bound;
889     } else if (!collapser && docs_matched < check_at_least) {
890 	// We have seen fewer matches than we checked for, so we must have seen
891 	// all the matches.
892 	LOGLINE(MATCH, "Setting bounds equal");
893 	matches_lower_bound = matches_upper_bound = matches_estimated
894 	    = docs_matched;
895 	if (collapser && matches_lower_bound > uncollapsed_lower_bound)
896 	   uncollapsed_lower_bound = matches_lower_bound;
897     } else {
898 	AssertRel(matches_estimated,>=,matches_lower_bound);
899 	AssertRel(matches_estimated,<=,matches_upper_bound);
900 
901 	// We can end up scaling the estimate more than once, so collect
902 	// the scale factors and apply them in one go to avoid rounding
903 	// more than once.
904 	double estimate_scale = 1.0;
905 	double unique_rate = 1.0;
906 
907 	if (collapser) {
908 	    LOGLINE(MATCH, "Adjusting bounds due to collapse_key");
909 	    matches_lower_bound = collapser.get_matches_lower_bound();
910 	    if (matches_lower_bound > uncollapsed_lower_bound)
911 	       uncollapsed_lower_bound = matches_lower_bound;
912 
913 	    // The estimate for the number of hits can be modified by
914 	    // multiplying it by the rate at which we've been finding
915 	    // unique documents.
916 	    Xapian::doccount docs_considered = collapser.get_docs_considered();
917 	    Xapian::doccount dups_ignored = collapser.get_dups_ignored();
918 	    if (docs_considered > 0) {
919 		double unique = double(docs_considered - dups_ignored);
920 		unique_rate = unique / double(docs_considered);
921 	    }
922 
923 	    // We can safely reduce the upper bound by the number of duplicates
924 	    // we've ignored.
925 	    matches_upper_bound -= dups_ignored;
926 
927 	    LOGLINE(MATCH, "matches_lower_bound=" << matches_lower_bound <<
928 		    ", matches_estimated=" << matches_estimated <<
929 		    "*" << estimate_scale << "*" << unique_rate <<
930 		    ", matches_upper_bound=" << matches_upper_bound);
931 	}
932 
933 	if (mdecider || matchspy_legacy) {
934 	    if (!percent_cutoff) {
935 		if (!collapser) {
936 		    // We're not collapsing or doing a percentage cutoff, so
937 		    // docs_matched is a lower bound on the total number of
938 		    // matches.
939 		    matches_lower_bound = max(docs_matched, matches_lower_bound);
940 		}
941 	    }
942 
943 	    // The estimate for the number of hits can be modified by
944 	    // multiplying it by the rate at which the match decider has
945 	    // been accepting documents.
946 	    if (decider_considered > 0) {
947 		double accept = double(decider_considered - decider_denied);
948 		double accept_rate = accept / double(decider_considered);
949 		estimate_scale *= accept_rate;
950 	    }
951 
952 	    // If a document is denied by a match decider, it is not possible
953 	    // for it to found to be a duplicate, so it is safe to also reduce
954 	    // the upper bound by the number of documents denied by a match
955 	    // decider.
956 	    matches_upper_bound -= decider_denied;
957 	    if (collapser) uncollapsed_upper_bound -= decider_denied;
958 	}
959 
960 	if (percent_cutoff) {
961 	    estimate_scale *= (1.0 - percent_cutoff_factor);
962 	    // another approach:
963 	    // Xapian::doccount new_est = items.size() * (1 - percent_cutoff_factor) / (1 - min_weight / greatest_wt);
964 	    // and another: items.size() + (1 - greatest_wt * percent_cutoff_factor / min_weight) * (matches_estimated - items.size());
965 
966 	    // Very likely an underestimate, but we can't really do better
967 	    // without checking further matches...  Only possibility would be
968 	    // to track how many docs made the min weight test but didn't make
969 	    // the candidate set since the last greatest_wt change, which we
970 	    // could use if the top documents matched all the prob terms.
971 	    matches_lower_bound = items.size();
972 	    if (collapser) uncollapsed_lower_bound = matches_lower_bound;
973 
974 	    // matches_upper_bound could be reduced by the number of documents
975 	    // which fail the min weight test (FIXME)
976 
977 	    LOGLINE(MATCH, "Adjusted bounds due to percent_cutoff (" <<
978 		    percent_cutoff << "): now have matches_estimated=" <<
979 		    matches_estimated << ", matches_lower_bound=" <<
980 		    matches_lower_bound);
981 	}
982 
983 	if (collapser && estimate_scale != 1.0) {
984 	    uncollapsed_estimated =
985 		Xapian::doccount(uncollapsed_estimated * estimate_scale + 0.5);
986 	}
987 
988 	estimate_scale *= unique_rate;
989 
990 	if (estimate_scale != 1.0) {
991 	    matches_estimated =
992 		Xapian::doccount(matches_estimated * estimate_scale + 0.5);
993 	    if (matches_estimated < matches_lower_bound)
994 	       	matches_estimated = matches_lower_bound;
995 	}
996 
997 	if (collapser || mdecider || matchspy_legacy) {
998 	    LOGLINE(MATCH, "Clamping estimate between bounds: "
999 		    "matches_lower_bound = " << matches_lower_bound <<
1000 		    ", matches_estimated = " << matches_estimated <<
1001 		    ", matches_upper_bound = " << matches_upper_bound);
1002 	    AssertRel(matches_lower_bound,<=,matches_upper_bound);
1003 	    matches_estimated = max(matches_estimated, matches_lower_bound);
1004 	    matches_estimated = min(matches_estimated, matches_upper_bound);
1005 	} else if (!percent_cutoff) {
1006 	    AssertRel(docs_matched,<=,matches_upper_bound);
1007 	    if (docs_matched > matches_lower_bound)
1008 		matches_lower_bound = docs_matched;
1009 	    if (docs_matched > matches_estimated)
1010 		matches_estimated = docs_matched;
1011 	}
1012 
1013 	if (collapser && !mdecider && !percent_cutoff && !matchspy_legacy) {
1014 	    AssertRel(docs_matched,<=,uncollapsed_upper_bound);
1015 	    if (docs_matched > uncollapsed_lower_bound)
1016 		uncollapsed_lower_bound = docs_matched;
1017 	}
1018     }
1019 
1020     if (collapser) {
1021 	AssertRel(uncollapsed_lower_bound,<=,uncollapsed_upper_bound);
1022 	if (uncollapsed_estimated < uncollapsed_lower_bound) {
1023 	    uncollapsed_estimated = uncollapsed_lower_bound;
1024 	} else if (uncollapsed_estimated > uncollapsed_upper_bound) {
1025 	    uncollapsed_estimated = uncollapsed_upper_bound;
1026 	}
1027     } else {
1028 	// We're not collapsing, so the bounds must be the same.
1029 	uncollapsed_lower_bound = matches_lower_bound;
1030 	uncollapsed_upper_bound = matches_upper_bound;
1031 	uncollapsed_estimated = matches_estimated;
1032     }
1033 
1034     LOGLINE(MATCH, items.size() << " items in potential mset");
1035 
1036     if (first > 0) {
1037 	// Remove unwanted leading entries
1038 	if (items.size() <= first) {
1039 	    items.clear();
1040 	} else {
1041 	    LOGLINE(MATCH, "finding " << first << "th");
1042 	    // We perform nth_element() on reverse iterators so that the
1043 	    // unwanted elements end up at the end of items, which means
1044 	    // that the call to erase() to remove them doesn't have to copy
1045 	    // any elements.
1046 	    vector<Xapian::Internal::MSetItem>::reverse_iterator nth;
1047 	    nth = items.rbegin() + first;
1048 	    nth_element(items.rbegin(), nth, items.rend(), mcmp);
1049 	    // Erase the trailing ``first'' elements
1050 	    items.erase(items.begin() + items.size() - first, items.end());
1051 	}
1052     }
1053 
1054     LOGLINE(MATCH, "sorting " << items.size() << " entries");
1055 
1056     // Need a stable sort, but this is provided by comparison operator
1057     sort(items.begin(), items.end(), mcmp);
1058 
1059     if (!items.empty()) {
1060 	LOGLINE(MATCH, "min weight in mset = " << items.back().wt);
1061 	LOGLINE(MATCH, "max weight in mset = " << items[0].wt);
1062     }
1063 
1064     AssertRel(matches_estimated,>=,matches_lower_bound);
1065     AssertRel(matches_estimated,<=,matches_upper_bound);
1066 
1067     AssertRel(uncollapsed_estimated,>=,uncollapsed_lower_bound);
1068     AssertRel(uncollapsed_estimated,<=,uncollapsed_upper_bound);
1069 
1070     // We may need to qualify any collapse_count to see if the highest weight
1071     // collapsed item would have qualified percent_cutoff
1072     // We WILL need to restore collapse_count to the mset by taking from
1073     // collapse_tab; this is what comes of copying around whole objects
1074     // instead of taking references, we find it hard to update collapse_count
1075     // of an item that has already been pushed-back as we don't know where it
1076     // is any more.  If we keep or find references we won't need to mess with
1077     // is_heap so much maybe?
1078     if (!items.empty() && collapser && !collapser.empty()) {
1079 	// Nicked this formula from above.
1080 	Xapian::weight min_wt = 0.0;
1081 	if (percent_scale > 0.0)
1082 	    min_wt = percent_cutoff_factor / percent_scale;
1083 	Xapian::doccount entries = collapser.entries();
1084 	vector<Xapian::Internal::MSetItem>::iterator i;
1085 	for (i = items.begin(); i != items.end(); ++i) {
1086 	    // Skip entries without a collapse key.
1087 	    if (i->collapse_key.empty()) continue;
1088 
1089 	    // Set collapse_count appropriately.
1090 	    i->collapse_count = collapser.get_collapse_count(i->collapse_key, percent_cutoff, min_wt);
1091 	    if (--entries == 0) {
1092 		// Stop once we've processed all items with collapse keys.
1093 		break;
1094 	    }
1095 	}
1096     }
1097 
1098     mset = Xapian::MSet(new Xapian::MSet::Internal(
1099 				       first,
1100 				       matches_upper_bound,
1101 				       matches_lower_bound,
1102 				       matches_estimated,
1103 				       uncollapsed_upper_bound,
1104 				       uncollapsed_lower_bound,
1105 				       uncollapsed_estimated,
1106 				       max_possible, greatest_wt, items,
1107 				       termfreqandwts,
1108 				       percent_scale * 100.0));
1109 }
1110