1 /* multimatch.cc
2  *
3  * Copyright 1999,2000,2001 BrightStation PLC
4  * Copyright 2001,2002 Ananova Ltd
5  * Copyright 2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2013,2014,2015,2016 Olly Betts
6  * Copyright 2003 Orange PCS Ltd
7  * Copyright 2003 Sam Liddicott
8  * Copyright 2007,2008,2009 Lemur Consulting Ltd
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public License as
12  * published by the Free Software Foundation; either version 2 of the
13  * License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with this program; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301
23  * USA
24  */
25 
26 #include <config.h>
27 
28 #include "multimatch.h"
29 
30 #include "autoptr.h"
31 #include "collapser.h"
32 #include "debuglog.h"
33 #include "submatch.h"
34 #include "localsubmatch.h"
35 #include "omassert.h"
36 #include "api/omenquireinternal.h"
37 #include "realtime.h"
38 
39 #include "api/emptypostlist.h"
40 #include "branchpostlist.h"
41 #include "mergepostlist.h"
42 
43 #include "backends/document.h"
44 
45 #include "msetcmp.h"
46 
47 #include "valuestreamdocument.h"
48 #include "weight/weightinternal.h"
49 
50 #include <xapian/error.h>
51 #include <xapian/matchspy.h>
52 #include <xapian/version.h> // For XAPIAN_HAS_REMOTE_BACKEND
53 
54 #ifdef XAPIAN_HAS_REMOTE_BACKEND
55 #include "remotesubmatch.h"
56 #include "backends/remote/remote-database.h"
57 #endif /* XAPIAN_HAS_REMOTE_BACKEND */
58 
59 #include <algorithm>
60 #include <cfloat> // For DBL_EPSILON.
61 #include <climits> // For UINT_MAX.
62 #include <vector>
63 #include <map>
64 #include <set>
65 
66 #ifdef HAVE_TIMER_CREATE
67 #include <signal.h>
68 #include <time.h>
69 #include "safeunistd.h" // For _POSIX_* feature test macros.
70 
71 extern "C" {
72 
73 static void
set_timeout_flag(union sigval sv)74 set_timeout_flag(union sigval sv)
75 {
76     *(reinterpret_cast<volatile bool*>(sv.sival_ptr)) = true;
77 }
78 
79 }
80 
81 // The monotonic clock is the better basis for timeouts, but not always
82 // available.
83 
84 #ifndef _POSIX_MONOTONIC_CLOCK
85 const clockid_t TIMEOUT_CLOCK = CLOCK_REALTIME;
86 #elif defined __sun
87 // Solaris defines CLOCK_MONOTONIC, but "man timer_create" doesn't mention it
88 // and when using it timer_create() fails with "EPERM" (perhaps you need to be
89 // root to use it?  I can't test that).
90 //
91 // Solaris defines _POSIX_MONOTONIC_CLOCK so we need to special case.
92 const clockid_t TIMEOUT_CLOCK = CLOCK_REALTIME;
93 #elif defined __CYGWIN__
94 // https://cygwin.com/cygwin-api/std-notes.html currently (2016-05-13) says:
95 //
96 //     clock_nanosleep currently supports only CLOCK_REALTIME and
97 //     CLOCK_MONOTONIC.  clock_setres, clock_settime, and timer_create
98 //     currently support only CLOCK_REALTIME.
99 //
100 // So CLOCK_MONOTONIC is defined, but not supported by timer_create().
101 const clockid_t TIMEOUT_CLOCK = CLOCK_REALTIME;
102 #else
103 const clockid_t TIMEOUT_CLOCK = CLOCK_MONOTONIC;
104 #endif
105 
106 class TimeOut {
107     struct sigevent sev;
108     timer_t timerid;
109     volatile bool expired;
110 
111   public:
TimeOut(double limit)112     explicit TimeOut(double limit) : expired(false) {
113 	if (limit > 0) {
114 	    sev.sigev_notify = SIGEV_THREAD;
115 	    sev.sigev_notify_function = set_timeout_flag;
116 	    sev.sigev_notify_attributes = NULL;
117 	    sev.sigev_value.sival_ptr =
118 		static_cast<void*>(const_cast<bool*>(&expired));
119 	    if (usual(timer_create(TIMEOUT_CLOCK, &sev, &timerid) == 0)) {
120 		struct itimerspec interval;
121 		interval.it_interval.tv_sec = 0;
122 		interval.it_interval.tv_nsec = 0;
123 		RealTime::to_timespec(limit, &interval.it_value);
124 		if (usual(timer_settime(timerid, 0, &interval, NULL) == 0)) {
125 		    // Timeout successfully set.
126 		    return;
127 		}
128 		timer_delete(timerid);
129 	    }
130 	}
131 	sev.sigev_notify = SIGEV_NONE;
132     }
133 
~TimeOut()134     ~TimeOut() {
135 	if (sev.sigev_notify != SIGEV_NONE) {
136 	    timer_delete(timerid);
137 	    sev.sigev_notify = SIGEV_NONE;
138 	}
139     }
140 
timed_out() const141     bool timed_out() const { return expired; }
142 };
143 #else
144 class TimeOut {
145   public:
TimeOut(double)146     explicit TimeOut(double) { }
timed_out() const147     bool timed_out() const { return false; }
148 };
149 #endif
150 
151 using namespace std;
152 using Xapian::Internal::intrusive_ptr;
153 
154 const Xapian::Enquire::Internal::sort_setting REL =
155 	Xapian::Enquire::Internal::REL;
156 const Xapian::Enquire::Internal::sort_setting REL_VAL =
157 	Xapian::Enquire::Internal::REL_VAL;
158 const Xapian::Enquire::Internal::sort_setting VAL =
159 	Xapian::Enquire::Internal::VAL;
160 #if 0 // VAL_REL isn't currently used so avoid compiler warnings.
161 const Xapian::Enquire::Internal::sort_setting VAL_REL =
162 	Xapian::Enquire::Internal::VAL_REL;
163 #endif
164 
165 /** Split an RSet into several sub rsets, one for each database.
166  *
167  *  @param rset The RSet to split.
168  *  @param number_of_subdbs The number of sub databases which exist.
169  *  @param subrsets Vector of RSets which will the sub rsets will be placed in.
170  *                  This should be empty when the function is called.
171  */
172 static void
split_rset_by_db(const Xapian::RSet * rset,Xapian::doccount number_of_subdbs,vector<Xapian::RSet> & subrsets)173 split_rset_by_db(const Xapian::RSet * rset,
174 		 Xapian::doccount number_of_subdbs,
175 		 vector<Xapian::RSet> & subrsets)
176 {
177     LOGCALL_STATIC_VOID(MATCH, "split_rset_by_db", rset | number_of_subdbs | subrsets);
178     if (rset && !rset->empty()) {
179 	if (number_of_subdbs == 1) {
180 	    // The common case of a single database is easy to handle.
181 	    subrsets.push_back(*rset);
182 	} else {
183 	    // Can't just use vector::resize() here, since that creates N
184 	    // copies of the same RSet!
185 	    subrsets.reserve(number_of_subdbs);
186 	    for (size_t i = 0; i < number_of_subdbs; ++i) {
187 		subrsets.push_back(Xapian::RSet());
188 	    }
189 
190 	    const set<Xapian::docid> & rsetitems = rset->internal->get_items();
191 	    set<Xapian::docid>::const_iterator j;
192 	    for (j = rsetitems.begin(); j != rsetitems.end(); ++j) {
193 		Xapian::doccount local_docid = (*j - 1) / number_of_subdbs + 1;
194 		Xapian::doccount subdatabase = (*j - 1) % number_of_subdbs;
195 		subrsets[subdatabase].add_document(local_docid);
196 	    }
197 	}
198     } else {
199 	// NB vector::resize() creates N copies of the same empty RSet.
200 	subrsets.resize(number_of_subdbs);
201     }
202     Assert(subrsets.size() == number_of_subdbs);
203 }
204 
205 /** Prepare some SubMatches.
206  *
207  *  This calls the prepare_match() method on each SubMatch object, causing them
208  *  to lookup various statistics.
209  *
210  *  This method is rather complicated in order to handle remote matches
211  *  efficiently.  Instead of simply calling "prepare_match()" on each submatch
212  *  and waiting for it to return, it first calls "prepare_match(true)" on each
213  *  submatch.  If any of these calls return false, indicating that the required
214  *  information has not yet been received from the server, the method will try
215  *  those which returned false again, passing "false" as a parameter to
216  *  indicate that they should block until the information is ready.
217  *
218  *  This should improve performance in the case of mixed local-and-remote
219  *  searches - the local searchers will all fetch their statistics from disk
220  *  without waiting for the remote searchers, so as soon as the remote searcher
221  *  statistics arrive, we can move on to the next step.
222  */
223 static void
prepare_sub_matches(vector<intrusive_ptr<SubMatch>> & leaves,Xapian::Weight::Internal & stats)224 prepare_sub_matches(vector<intrusive_ptr<SubMatch> > & leaves,
225 		    Xapian::Weight::Internal & stats)
226 {
227     LOGCALL_STATIC_VOID(MATCH, "prepare_sub_matches", leaves | stats);
228     // We use a vector<bool> to track which SubMatches we're already prepared.
229     vector<bool> prepared;
230     prepared.resize(leaves.size(), false);
231     size_t unprepared = leaves.size();
232     bool nowait = true;
233     while (unprepared) {
234 	for (size_t leaf = 0; leaf < leaves.size(); ++leaf) {
235 	    if (prepared[leaf]) continue;
236 	    SubMatch * submatch = leaves[leaf].get();
237 	    if (!submatch || submatch->prepare_match(nowait, stats)) {
238 		prepared[leaf] = true;
239 		--unprepared;
240 	    }
241 	}
242 	// Use blocking IO on subsequent passes, so that we don't go into
243 	// a tight loop.
244 	nowait = false;
245     }
246 }
247 
248 /// Class which applies several match spies in turn.
249 class MultipleMatchSpy : public Xapian::MatchSpy {
250   private:
251     /// List of match spies to call, in order.
252     const std::vector<Xapian::Internal::opt_intrusive_ptr<Xapian::MatchSpy>> & spies;
253 
254   public:
255     explicit
MultipleMatchSpy(const std::vector<Xapian::Internal::opt_intrusive_ptr<Xapian::MatchSpy>> & spies_)256     MultipleMatchSpy(const std::vector<Xapian::Internal::opt_intrusive_ptr<Xapian::MatchSpy>> & spies_)
257 	    : spies(spies_) {}
258 
259     /** Implementation of virtual operator().
260      *
261      *  This implementation calls all the spies in turn.
262      */
263     void operator()(const Xapian::Document &doc, double wt);
264 };
265 
266 void
operator ()(const Xapian::Document & doc,double wt)267 MultipleMatchSpy::operator()(const Xapian::Document &doc, double wt) {
268     LOGCALL_VOID(MATCH, "MultipleMatchSpy::operator()", doc | wt);
269     for (auto i : spies) {
270 	(*i)(doc, wt);
271     }
272 }
273 
274 ////////////////////////////////////
275 // Initialisation and cleaning up //
276 ////////////////////////////////////
MultiMatch(const Xapian::Database & db_,const Xapian::Query & query_,Xapian::termcount qlen,const Xapian::RSet * omrset,Xapian::doccount collapse_max_,Xapian::valueno collapse_key_,int percent_cutoff_,double weight_cutoff_,Xapian::Enquire::docid_order order_,Xapian::valueno sort_key_,Xapian::Enquire::Internal::sort_setting sort_by_,bool sort_value_forward_,double time_limit_,Xapian::Weight::Internal & stats,const Xapian::Weight * weight_,const vector<Xapian::Internal::opt_intrusive_ptr<Xapian::MatchSpy>> & matchspies_,bool have_sorter,bool have_mdecider)277 MultiMatch::MultiMatch(const Xapian::Database &db_,
278 		       const Xapian::Query & query_,
279 		       Xapian::termcount qlen,
280 		       const Xapian::RSet * omrset,
281 		       Xapian::doccount collapse_max_,
282 		       Xapian::valueno collapse_key_,
283 		       int percent_cutoff_, double weight_cutoff_,
284 		       Xapian::Enquire::docid_order order_,
285 		       Xapian::valueno sort_key_,
286 		       Xapian::Enquire::Internal::sort_setting sort_by_,
287 		       bool sort_value_forward_,
288 		       double time_limit_,
289 		       Xapian::Weight::Internal & stats,
290 		       const Xapian::Weight * weight_,
291 		       const vector<Xapian::Internal::opt_intrusive_ptr<Xapian::MatchSpy>> & matchspies_,
292 		       bool have_sorter, bool have_mdecider)
293 	: db(db_), query(query_),
294 	  collapse_max(collapse_max_), collapse_key(collapse_key_),
295 	  percent_cutoff(percent_cutoff_), weight_cutoff(weight_cutoff_),
296 	  order(order_),
297 	  sort_key(sort_key_), sort_by(sort_by_),
298 	  sort_value_forward(sort_value_forward_),
299 	  time_limit(time_limit_),
300 	  weight(weight_),
301 	  is_remote(db.internal.size()),
302 	  matchspies(matchspies_)
303 {
304     LOGCALL_CTOR(MATCH, "MultiMatch", db_ | query_ | qlen | omrset | collapse_max_ | collapse_key_ | percent_cutoff_ | weight_cutoff_ | int(order_) | sort_key_ | int(sort_by_) | sort_value_forward_ | time_limit_| stats | weight_ | matchspies_ | have_sorter | have_mdecider);
305 
306     if (query.empty()) return;
307 
308     Xapian::doccount number_of_subdbs = db.internal.size();
309     vector<Xapian::RSet> subrsets;
310     split_rset_by_db(omrset, number_of_subdbs, subrsets);
311 
312     for (size_t i = 0; i != number_of_subdbs; ++i) {
313 	Xapian::Database::Internal *subdb = db.internal[i].get();
314 	Assert(subdb);
315 	intrusive_ptr<SubMatch> smatch;
316 
317 	// There is currently only one special case, for network databases.
318 #ifdef XAPIAN_HAS_REMOTE_BACKEND
319 	if (subdb->get_backend_info(NULL) == BACKEND_REMOTE) {
320 	    RemoteDatabase *rem_db = static_cast<RemoteDatabase*>(subdb);
321 	    if (have_sorter) {
322 		throw Xapian::UnimplementedError("Xapian::KeyMaker not supported for the remote backend");
323 	    }
324 	    if (have_mdecider) {
325 		throw Xapian::UnimplementedError("Xapian::MatchDecider not supported for the remote backend");
326 	    }
327 	    // FIXME: Remote handling for time_limit with multiple
328 	    // databases may need some work.
329 	    rem_db->set_query(query, qlen, collapse_max, collapse_key,
330 			      order, sort_key, sort_by, sort_value_forward,
331 			      time_limit,
332 			      percent_cutoff, weight_cutoff, weight,
333 			      subrsets[i], matchspies);
334 	    bool decreasing_relevance =
335 		(sort_by == REL || sort_by == REL_VAL);
336 	    smatch = new RemoteSubMatch(rem_db, decreasing_relevance, matchspies);
337 	    is_remote[i] = true;
338 	} else {
339 	    smatch = new LocalSubMatch(subdb, query, qlen, subrsets[i], weight,
340 				       i);
341 	    subdb->readahead_for_query(query);
342 	}
343 #else
344 	// Avoid unused parameter warnings.
345 	(void)have_sorter;
346 	(void)have_mdecider;
347 	smatch = new LocalSubMatch(subdb, query, qlen, subrsets[i], weight, i);
348 #endif /* XAPIAN_HAS_REMOTE_BACKEND */
349 	leaves.push_back(smatch);
350     }
351 
352     stats.set_query(query);
353     prepare_sub_matches(leaves, stats);
354     stats.set_bounds_from_db(db);
355 }
356 
357 double
getorrecalc_maxweight(PostList * pl)358 MultiMatch::getorrecalc_maxweight(PostList *pl)
359 {
360     LOGCALL(MATCH, double, "MultiMatch::getorrecalc_maxweight", pl);
361     double wt;
362     if (recalculate_w_max) {
363 	LOGLINE(MATCH, "recalculating max weight");
364 	wt = pl->recalc_maxweight();
365 	recalculate_w_max = false;
366     } else {
367 	wt = pl->get_maxweight();
368 	LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
369 	// FIXME: This fails for hoistnotbug1 under multi_remoteprog_glass:
370 	// AssertionError: matcher/multimatch.cc:370: within_DBL_EPSILON(wt, pl->recalc_maxweight()) : values were 2.7075940282079162813 and 2.6966211268553141878
371 	//
372 	// AssertEqDoubleParanoid(wt, pl->recalc_maxweight());
373 	//
374 	// Not sure why - adding code to call recalc_maxweight() doesn't seem
375 	// to help.  But the max weight not being as low as it could be is
376 	// only a potential missed optimisation opportunity, not a correctness
377 	// issue.
378     }
379     LOGLINE(MATCH, "max possible doc weight = " << wt);
380     RETURN(wt);
381 }
382 
383 void
get_mset(Xapian::doccount first,Xapian::doccount maxitems,Xapian::doccount check_at_least,Xapian::MSet & mset,Xapian::Weight::Internal & stats,const Xapian::MatchDecider * mdecider,const Xapian::KeyMaker * sorter)384 MultiMatch::get_mset(Xapian::doccount first, Xapian::doccount maxitems,
385 		     Xapian::doccount check_at_least,
386 		     Xapian::MSet & mset,
387 		     Xapian::Weight::Internal & stats,
388 		     const Xapian::MatchDecider *mdecider,
389 		     const Xapian::KeyMaker *sorter)
390 {
391     LOGCALL_VOID(MATCH, "MultiMatch::get_mset", first | maxitems | check_at_least | Literal("mset") | stats | Literal("mdecider") | Literal("sorter"));
392     AssertRel(check_at_least,>=,first + maxitems);
393 
394     if (query.empty()) {
395 	mset = Xapian::MSet();
396 	mset.internal->firstitem = first;
397 	return;
398     }
399 
400     Assert(!leaves.empty());
401 
402     TimeOut timeout(time_limit);
403 
404 #ifdef XAPIAN_HAS_REMOTE_BACKEND
405     // If there's only one database and it's remote, we can just unserialise
406     // its MSet and return that.
407     if (leaves.size() == 1 && is_remote[0]) {
408 	RemoteSubMatch * rem_match;
409 	rem_match = static_cast<RemoteSubMatch*>(leaves[0].get());
410 	rem_match->start_match(first, maxitems, check_at_least, stats);
411 	rem_match->get_mset(mset);
412 	return;
413     }
414 #endif
415 
416     // Start matchers.
417     for (size_t i = 0; i != leaves.size(); ++i) {
418 	auto& leaf = leaves[i];
419 #ifdef XAPIAN_HAS_REMOTE_BACKEND
420 	if (is_remote[i]) {
421 	    Xapian::doccount remote_maxitems = first + maxitems;
422 	    if (collapse_max != 0) {
423 		// If collapsing we need to fetch all check_at_least items in
424 		// order to satisfy the requirement that if there are <=
425 		// check_at_least results then then estimated number of matches
426 		// is exact.
427 		remote_maxitems = check_at_least;
428 	    }
429 	    leaf->start_match(0, remote_maxitems, check_at_least, stats);
430 	    continue;
431 	}
432 #endif
433 	leaf->start_match(0, first + maxitems, check_at_least, stats);
434     }
435 
436     // Get postlists and term info
437     vector<PostList *> postlists;
438     Xapian::termcount total_subqs = 0;
439     // Keep a count of matches which we know exist, but we won't see.  This
440     // occurs when a submatch is remote, and returns a lower bound on the
441     // number of matching documents which is higher than the number of
442     // documents it returns (because it wasn't asked for more documents).
443     Xapian::doccount definite_matches_not_seen = 0;
444 #ifdef XAPIAN_HAS_REMOTE_BACKEND
445     // Track these for calculating uncollapsed_upper_bound for the local.
446     size_t n_remotes = 0;
447     Xapian::doccount remote_uncollapsed_upper_bound = 0;
448 #endif
449     try {
450 	for (size_t i = 0; i != leaves.size(); ++i) {
451 	    // Pick the highest total subqueries answer amongst the
452 	    // subdatabases, as the query to postlist conversion doesn't
453 	    // recurse into positional queries for shards that don't have
454 	    // positional data when at least one other shard does.
455 	    Xapian::termcount total_subqs_i = 0;
456 	    PostList* pl = leaves[i]->get_postlist(this, &total_subqs_i, stats);
457 	    total_subqs = max(total_subqs, total_subqs_i);
458 #ifdef XAPIAN_HAS_REMOTE_BACKEND
459 	    if (is_remote[i]) {
460 		++n_remotes;
461 		RemoteSubMatch* rem_match =
462 		    static_cast<RemoteSubMatch*>(leaves[i].get());
463 		remote_uncollapsed_upper_bound +=
464 		    rem_match->get_uncollapsed_upper_bound();
465 		if (collapse_max == 0 &&
466 		    pl->get_termfreq_min() > first + maxitems) {
467 		    LOGLINE(MATCH, "Found " <<
468 				   pl->get_termfreq_min() - (first + maxitems)
469 				   << " definite matches in remote submatch "
470 				   "which aren't passed to local match");
471 		    definite_matches_not_seen += pl->get_termfreq_min();
472 		    definite_matches_not_seen -= first + maxitems;
473 		}
474 	    }
475 #endif
476 	    postlists.push_back(pl);
477 	}
478     } catch (...) {
479 	for (auto pl : postlists) delete pl;
480 	throw;
481     }
482     Assert(!postlists.empty());
483 
484     ValueStreamDocument vsdoc(db);
485     ++vsdoc._refs;
486     Xapian::Document doc(&vsdoc);
487 
488     // Get a single combined postlist
489     AutoPtr<PostList> pl;
490     if (postlists.size() == 1) {
491 	pl.reset(postlists.front());
492     } else {
493 	pl.reset(new MergePostList(postlists, this, vsdoc));
494     }
495 
496     LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
497 
498     // Empty result set
499     Xapian::doccount docs_matched = 0;
500     double greatest_wt = 0;
501     Xapian::termcount greatest_wt_subqs_matched = 0;
502 #ifdef XAPIAN_HAS_REMOTE_BACKEND
503     unsigned greatest_wt_subqs_db_num = UINT_MAX;
504 #endif
505     vector<Xapian::Internal::MSetItem> items;
506 
507     // maximum weight a document could possibly have
508     const double max_possible = pl->recalc_maxweight();
509 
510     LOGLINE(MATCH, "pl = (" << pl->get_description() << ")");
511     recalculate_w_max = false;
512 
513     Xapian::doccount matches_upper_bound = pl->get_termfreq_max();
514     Xapian::doccount matches_lower_bound = 0;
515     Xapian::doccount matches_estimated   = pl->get_termfreq_est();
516 
517     if (mdecider == NULL) {
518 	// If we have a match decider, the lower bound must be
519 	// set to 0 as we could discard all hits.  Otherwise set it to the
520 	// minimum number of entries which the postlist could return.
521 	matches_lower_bound = pl->get_termfreq_min();
522     }
523 
524     // Prepare the matchspy
525     Xapian::MatchSpy *matchspy = NULL;
526     MultipleMatchSpy multispy(matchspies);
527     if (!matchspies.empty()) {
528 	if (matchspies.size() == 1) {
529 	    matchspy = matchspies[0].get();
530 	} else {
531 	    matchspy = &multispy;
532 	}
533     }
534 
535     // Check if any results have been asked for (might just be wanting
536     // maxweight).
537     if (check_at_least == 0) {
538 	pl.reset(NULL);
539 	Xapian::doccount uncollapsed_lower_bound = matches_lower_bound;
540 	if (collapse_max) {
541 	    // Lower bound must be set to no more than collapse_max, since it's
542 	    // possible that all matching documents have the same collapse_key
543 	    // value and so are collapsed together.
544 	    if (matches_lower_bound > collapse_max)
545 		matches_lower_bound = collapse_max;
546 	}
547 
548 	mset.internal = new Xapian::MSet::Internal(
549 					   first,
550 					   matches_upper_bound,
551 					   matches_lower_bound,
552 					   matches_estimated,
553 					   matches_upper_bound,
554 					   uncollapsed_lower_bound,
555 					   matches_estimated,
556 					   max_possible, greatest_wt, items,
557 					   0);
558 	return;
559     }
560 
561     // Number of documents considered by a decider.
562     Xapian::doccount decider_considered = 0;
563     // Number of documents denied by the decider.
564     Xapian::doccount decider_denied = 0;
565 
566     // Set max number of results that we want - this is used to decide
567     // when to throw away unwanted items.
568     Xapian::doccount max_msize = first + maxitems;
569     items.reserve(max_msize + 1);
570 
571     // Tracks the minimum item currently eligible for the MSet - we compare
572     // candidate items against this.
573     Xapian::Internal::MSetItem min_item(0.0, 0);
574 
575     // Minimum weight an item must have to be worth considering.
576     double min_weight = weight_cutoff;
577 
578     // Factor to multiply maximum weight seen by to get the cutoff weight.
579     double percent_cutoff_factor = percent_cutoff / 100.0;
580     // Corresponding correction to that in omenquire.cc to account for excess
581     // precision on x86.
582     percent_cutoff_factor -= DBL_EPSILON;
583 
584     // Object to handle collapsing.
585     Collapser collapser(collapse_key, collapse_max);
586 
587     /// Comparison functor for sorting MSet
588     bool sort_forward = (order != Xapian::Enquire::DESCENDING);
589     MSetCmp mcmp(get_msetcmp_function(sort_by, sort_forward, sort_value_forward));
590 
591     // Perform query
592 
593     // We form the mset in two stages.  In the first we fill up our working
594     // mset.  Adding a new document does not remove another.
595     //
596     // In the second, we consider documents which rank higher than the current
597     // lowest ranking document in the mset.  Each document added expels the
598     // current lowest ranking document.
599     //
600     // If a percentage cutoff is in effect, it can cause the matcher to return
601     // from the second stage from the first.
602 
603     // Is the mset a valid heap?
604     bool is_heap = false;
605 
606     while (true) {
607 	bool pushback;
608 
609 	if (rare(recalculate_w_max)) {
610 	    if (min_weight > 0.0) {
611 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
612 		    LOGLINE(MATCH, "*** TERMINATING EARLY (1)");
613 		    break;
614 		}
615 	    }
616 	}
617 
618 	PostList * pl_copy = pl.get();
619 	if (rare(next_handling_prune(pl_copy, min_weight, this))) {
620 	    (void)pl.release();
621 	    pl.reset(pl_copy);
622 	    LOGLINE(MATCH, "*** REPLACING ROOT");
623 
624 	    if (min_weight > 0.0) {
625 		// No need for a full recalc (unless we've got to do one
626 		// because of a prune elsewhere) - we're just switching to a
627 		// subtree.
628 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
629 		    LOGLINE(MATCH, "*** TERMINATING EARLY (2)");
630 		    break;
631 		}
632 	    }
633 	}
634 
635 	if (rare(pl->at_end())) {
636 	    LOGLINE(MATCH, "Reached end of potential matches");
637 	    break;
638 	}
639 
640 	// Only calculate the weight if we need it for mcmp, or there's a
641 	// percentage or weight cutoff in effect.  Otherwise we calculate it
642 	// below if we haven't already rejected this candidate.
643 	double wt = 0.0;
644 	bool calculated_weight = false;
645 	if (sort_by != VAL || min_weight > 0.0) {
646 	    wt = pl->get_weight();
647 	    if (wt < min_weight) {
648 		LOGLINE(MATCH, "Rejecting potential match due to insufficient weight");
649 		continue;
650 	    }
651 	    calculated_weight = true;
652 	}
653 
654 	Xapian::docid did = pl->get_docid();
655 	vsdoc.set_document(did);
656 	LOGLINE(MATCH, "Candidate document id " << did << " wt " << wt);
657 	Xapian::Internal::MSetItem new_item(wt, did);
658 	if (check_at_least > first + maxitems && timeout.timed_out()) {
659 	    check_at_least = first + maxitems;
660 	}
661 
662 	if (sort_by != REL) {
663 	    const string * ptr = pl->get_sort_key();
664 	    if (ptr) {
665 		new_item.sort_key = *ptr;
666 	    } else if (sorter) {
667 		new_item.sort_key = (*sorter)(doc);
668 	    } else {
669 		new_item.sort_key = vsdoc.get_value(sort_key);
670 	    }
671 
672 	    // We're sorting by value (in part at least), so compare the item
673 	    // against the lowest currently in the proto-mset.  If sort_by is
674 	    // VAL, then new_item.wt won't yet be set, but that doesn't
675 	    // matter since it's not used by the sort function.
676 	    if (!mcmp(new_item, min_item)) {
677 		if (mdecider == NULL && !collapser) {
678 		    // Document was definitely suitable for mset - no more
679 		    // processing needed.
680 		    LOGLINE(MATCH, "Making note of match item which sorts lower than min_item");
681 		    ++docs_matched;
682 		    if (!calculated_weight) wt = pl->get_weight();
683 		    if (matchspy) {
684 			matchspy->operator()(doc, wt);
685 		    }
686 		    if (wt > greatest_wt) goto new_greatest_weight;
687 		    continue;
688 		}
689 		if (docs_matched >= check_at_least) {
690 		    // We've seen enough items - we can drop this one.
691 		    LOGLINE(MATCH, "Dropping candidate which sorts lower than min_item");
692 		    // FIXME: hmm, match decider might have rejected this...
693 		    if (!calculated_weight) wt = pl->get_weight();
694 		    if (wt > greatest_wt) goto new_greatest_weight;
695 		    continue;
696 		}
697 		// We can't drop the item, because we need to test whether the
698 		// mdecider would accept it and/or test whether it would be
699 		// collapsed.
700 		LOGLINE(MATCH, "Keeping candidate which sorts lower than min_item for further investigation");
701 	    }
702 	}
703 
704 	// Use the match spy and/or decision functors (if specified).
705 	if (matchspy != NULL || mdecider != NULL) {
706 	    const unsigned int multiplier = db.internal.size();
707 	    Assert(multiplier != 0);
708 	    Xapian::doccount n = (did - 1) % multiplier; // which actual database
709 	    // If the results are from a remote database, then the functor will
710 	    // already have been applied there so we can skip this step.
711 	    if (!is_remote[n]) {
712 		++decider_considered;
713 		if (mdecider && !mdecider->operator()(doc)) {
714 		    ++decider_denied;
715 		    continue;
716 		}
717 		if (matchspy) {
718 		    if (!calculated_weight) {
719 			wt = pl->get_weight();
720 			new_item.wt = wt;
721 			calculated_weight = true;
722 		    }
723 		    matchspy->operator()(doc, wt);
724 		}
725 	    }
726 	}
727 
728 	if (!calculated_weight) {
729 	    // we didn't calculate the weight above, but now we will need it
730 	    wt = pl->get_weight();
731 	    new_item.wt = wt;
732 	}
733 
734 	pushback = true;
735 
736 	// Perform collapsing on key if requested.
737 	if (collapser) {
738 	    collapse_result res;
739 	    res = collapser.process(new_item, pl.get(), vsdoc, mcmp);
740 	    if (res == REJECTED) {
741 		// If we're sorting by relevance primarily, then we throw away
742 		// the lower weighted document anyway.
743 		if (sort_by != REL && sort_by != REL_VAL) {
744 		    if (wt > greatest_wt) goto new_greatest_weight;
745 		}
746 		continue;
747 	    }
748 
749 	    if (res == REPLACED) {
750 		// There was a previous item in the collapse tab so
751 		// the MSet can't be empty.
752 		Assert(!items.empty());
753 
754 		const Xapian::Internal::MSetItem & old_item =
755 		    collapser.old_item;
756 		// This is one of the best collapse_max potential MSet entries
757 		// with this key which we've seen so far.  Check if the
758 		// entry with this key which it displaced might still be in the
759 		// proto-MSet.  If it might be, we need to check through for
760 		// it.
761 		double old_wt = old_item.wt;
762 		if (old_wt >= min_weight && mcmp(old_item, min_item)) {
763 		    // Scan through (unsorted) MSet looking for entry.
764 		    // FIXME: more efficient way than just scanning?
765 		    Xapian::docid olddid = old_item.did;
766 		    vector<Xapian::Internal::MSetItem>::iterator i;
767 		    for (i = items.begin(); i != items.end(); ++i) {
768 			if (i->did == olddid) {
769 			    LOGLINE(MATCH, "collapse: removing " <<
770 					   olddid << ": " <<
771 					   new_item.collapse_key);
772 			    // We can replace an arbitrary element in O(log N)
773 			    // but have to do it by hand (in this case the new
774 			    // elt is bigger, so we just swap down the tree).
775 			    // FIXME: implement this, and clean up is_heap
776 			    // handling
777 			    *i = new_item;
778 			    pushback = false;
779 			    is_heap = false;
780 			    break;
781 			}
782 		    }
783 		}
784 	    }
785 	}
786 
787 	// OK, actually add the item to the mset.
788 	if (pushback) {
789 	    ++docs_matched;
790 	    if (items.size() >= max_msize) {
791 		items.push_back(new_item);
792 		if (!is_heap) {
793 		    is_heap = true;
794 		    make_heap(items.begin(), items.end(), mcmp);
795 		} else {
796 		    push_heap<vector<Xapian::Internal::MSetItem>::iterator,
797 			      MSetCmp>(items.begin(), items.end(), mcmp);
798 		}
799 		pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
800 			 MSetCmp>(items.begin(), items.end(), mcmp);
801 		items.pop_back();
802 
803 		if (!items.empty()) {
804 		    // In the get_mset(0, 0, X) (X > 0) case just don't update min_item.
805 		    min_item = items.front();
806 		}
807 		if (sort_by == REL || sort_by == REL_VAL) {
808 		    if (docs_matched >= check_at_least) {
809 			if (sort_by == REL) {
810 			    // We're done if this is a forward boolean match
811 			    // with only one database (bodgetastic, FIXME
812 			    // better if we can!)
813 			    if (rare(max_possible == 0 && sort_forward)) {
814 				// In the multi database case, MergePostList
815 				// currently processes each database
816 				// sequentially (which actually may well be
817 				// more efficient) so the docids in general
818 				// won't arrive in order.
819 				if (leaves.size() == 1) break;
820 			    }
821 			}
822 			if (min_item.wt > min_weight) {
823 			    LOGLINE(MATCH, "Setting min_weight to " <<
824 				    min_item.wt << " from " << min_weight);
825 			    min_weight = min_item.wt;
826 			}
827 		    }
828 		}
829 		if (rare(getorrecalc_maxweight(pl.get()) < min_weight)) {
830 		    LOGLINE(MATCH, "*** TERMINATING EARLY (3)");
831 		    break;
832 		}
833 	    } else {
834 		items.push_back(new_item);
835 		is_heap = false;
836 		if (sort_by == REL && items.size() == max_msize) {
837 		    if (docs_matched >= check_at_least) {
838 			// We're done if this is a forward boolean match
839 			// with only one database (bodgetastic, FIXME
840 			// better if we can!)
841 			if (rare(max_possible == 0 && sort_forward)) {
842 			    // In the multi database case, MergePostList
843 			    // currently processes each database
844 			    // sequentially (which actually may well be
845 			    // more efficient) so the docids in general
846 			    // won't arrive in order.
847 			    if (leaves.size() == 1) break;
848 			}
849 		    }
850 		}
851 	    }
852 	}
853 
854 	// Keep a track of the greatest weight we've seen.
855 	if (wt > greatest_wt) {
856 new_greatest_weight:
857 	    greatest_wt = wt;
858 #ifdef XAPIAN_HAS_REMOTE_BACKEND
859 	    const unsigned int multiplier = db.internal.size();
860 	    unsigned int db_num = (did - 1) % multiplier;
861 	    if (is_remote[db_num]) {
862 		// Note that the greatest weighted document came from a remote
863 		// database, and which one.
864 		greatest_wt_subqs_db_num = db_num;
865 	    } else
866 #endif
867 	    {
868 		greatest_wt_subqs_matched = pl->count_matching_subqs();
869 #ifdef XAPIAN_HAS_REMOTE_BACKEND
870 		greatest_wt_subqs_db_num = UINT_MAX;
871 #endif
872 	    }
873 	    if (percent_cutoff) {
874 		double w = wt * percent_cutoff_factor;
875 		if (w > min_weight) {
876 		    min_weight = w;
877 		    if (!is_heap) {
878 			is_heap = true;
879 			make_heap<vector<Xapian::Internal::MSetItem>::iterator,
880 				  MSetCmp>(items.begin(), items.end(), mcmp);
881 		    }
882 		    while (!items.empty() && items.front().wt < min_weight) {
883 			pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
884 				 MSetCmp>(items.begin(), items.end(), mcmp);
885 			Assert(items.back().wt < min_weight);
886 			items.pop_back();
887 		    }
888 #ifdef XAPIAN_ASSERTIONS_PARANOID
889 		    vector<Xapian::Internal::MSetItem>::const_iterator i;
890 		    for (i = items.begin(); i != items.end(); ++i) {
891 			Assert(i->wt >= min_weight);
892 		    }
893 #endif
894 		}
895 	    }
896 	}
897     }
898 
899     // done with posting list tree
900     pl.reset(NULL);
901 
902     double percent_scale = 0;
903     if (!items.empty() && greatest_wt > 0) {
904 #ifdef XAPIAN_HAS_REMOTE_BACKEND
905 	if (greatest_wt_subqs_db_num != UINT_MAX) {
906 	    const unsigned int n = greatest_wt_subqs_db_num;
907 	    RemoteSubMatch * rem_match;
908 	    rem_match = static_cast<RemoteSubMatch*>(leaves[n].get());
909 	    percent_scale = rem_match->get_percent_factor() / 100.0;
910 	} else
911 #endif
912 	{
913 	    percent_scale = greatest_wt_subqs_matched / double(total_subqs);
914 	    percent_scale /= greatest_wt;
915 	}
916 	Assert(percent_scale > 0);
917 	if (percent_cutoff) {
918 	    // FIXME: better to sort and binary chop maybe?
919 	    // Or we could just use a linear scan here instead.
920 
921 	    // trim the mset to the correct answer...
922 	    double min_wt = percent_cutoff_factor / percent_scale;
923 	    if (!is_heap) {
924 		is_heap = true;
925 		make_heap<vector<Xapian::Internal::MSetItem>::iterator,
926 			  MSetCmp>(items.begin(), items.end(), mcmp);
927 	    }
928 	    while (!items.empty() && items.front().wt < min_wt) {
929 		pop_heap<vector<Xapian::Internal::MSetItem>::iterator,
930 			 MSetCmp>(items.begin(), items.end(), mcmp);
931 		Assert(items.back().wt < min_wt);
932 		items.pop_back();
933 	    }
934 #ifdef XAPIAN_ASSERTIONS_PARANOID
935 	    vector<Xapian::Internal::MSetItem>::const_iterator j;
936 	    for (j = items.begin(); j != items.end(); ++j) {
937 		Assert(j->wt >= min_wt);
938 	    }
939 #endif
940 	}
941     }
942 
943     LOGLINE(MATCH,
944 	    "docs_matched = " << docs_matched <<
945 	    ", definite_matches_not_seen = " << definite_matches_not_seen <<
946 	    ", matches_lower_bound = " << matches_lower_bound <<
947 	    ", matches_estimated = " << matches_estimated <<
948 	    ", matches_upper_bound = " << matches_upper_bound);
949 
950     // Adjust docs_matched to take account of documents which matched remotely
951     // but weren't sent across.
952     docs_matched += definite_matches_not_seen;
953 
954     Xapian::doccount uncollapsed_lower_bound = matches_lower_bound;
955     Xapian::doccount uncollapsed_upper_bound = matches_upper_bound;
956     Xapian::doccount uncollapsed_estimated = matches_estimated;
957 #ifdef XAPIAN_HAS_REMOTE_BACKEND
958     if (collapser && n_remotes) {
959 	// We need to adjust uncollapsed_upper_bound if there are multiple
960 	// shards and some or all are remote.  The lower bound and estimate
961 	// will be valid, though potentially could be better, but this
962 	// doesn't seem worth addressing in 1.4.x - the code on master
963 	// handles this correctly via merging MSet objects.
964 	if (n_remotes == leaves.size()) {
965 	    // All shards are remote so we just use the sum of
966 	    // uncollapsed_upper_bound over the remotes.
967 	    uncollapsed_upper_bound = remote_uncollapsed_upper_bound;
968 	} else {
969 	    // Sum and clamp to the number of documents.  This is crude but
970 	    // at least gives a valid answer.
971 	    Xapian::doccount num_docs = db.get_doccount();
972 	    uncollapsed_upper_bound += remote_uncollapsed_upper_bound;
973 	    if (uncollapsed_upper_bound < remote_uncollapsed_upper_bound ||
974 		uncollapsed_upper_bound > num_docs) {
975 		uncollapsed_upper_bound = num_docs;
976 	    }
977 	}
978     }
979 #endif
980     if (items.size() < max_msize) {
981 	// We have fewer items in the mset than we tried to get for it, so we
982 	// must have all the matches in it.
983 	LOGLINE(MATCH, "items.size() = " << items.size() <<
984 		", max_msize = " << max_msize << ", setting bounds equal");
985 	Assert(definite_matches_not_seen == 0);
986 	Assert(percent_cutoff || docs_matched == items.size());
987 	matches_lower_bound = matches_upper_bound = matches_estimated
988 	    = items.size();
989 	if (collapser && matches_lower_bound > uncollapsed_lower_bound)
990 	    uncollapsed_lower_bound = matches_lower_bound;
991     } else if (!collapser && docs_matched < check_at_least) {
992 	// We have seen fewer matches than we checked for, so we must have seen
993 	// all the matches.
994 	LOGLINE(MATCH, "Setting bounds equal");
995 	matches_lower_bound = matches_upper_bound = matches_estimated
996 	    = docs_matched;
997 	if (collapser && matches_lower_bound > uncollapsed_lower_bound)
998 	   uncollapsed_lower_bound = matches_lower_bound;
999     } else {
1000 	AssertRel(matches_estimated,>=,matches_lower_bound);
1001 	AssertRel(matches_estimated,<=,matches_upper_bound);
1002 
1003 	// We can end up scaling the estimate more than once, so collect
1004 	// the scale factors and apply them in one go to avoid rounding
1005 	// more than once.
1006 	double estimate_scale = 1.0;
1007 	double unique_rate = 1.0;
1008 
1009 	if (collapser) {
1010 	    LOGLINE(MATCH, "Adjusting bounds due to collapse_key");
1011 	    matches_lower_bound = collapser.get_matches_lower_bound();
1012 	    if (matches_lower_bound > uncollapsed_lower_bound)
1013 	       uncollapsed_lower_bound = matches_lower_bound;
1014 
1015 	    // The estimate for the number of hits can be modified by
1016 	    // multiplying it by the rate at which we've been finding
1017 	    // unique documents.
1018 	    Xapian::doccount docs_considered = collapser.get_docs_considered();
1019 	    Xapian::doccount dups_ignored = collapser.get_dups_ignored();
1020 	    if (docs_considered > 0) {
1021 		double unique = double(docs_considered - dups_ignored);
1022 		unique_rate = unique / double(docs_considered);
1023 	    }
1024 
1025 	    // We can safely reduce the upper bound by the number of duplicates
1026 	    // we've ignored.
1027 	    matches_upper_bound -= dups_ignored;
1028 
1029 	    LOGLINE(MATCH, "matches_lower_bound=" << matches_lower_bound <<
1030 		    ", matches_estimated=" << matches_estimated <<
1031 		    "*" << estimate_scale << "*" << unique_rate <<
1032 		    ", matches_upper_bound=" << matches_upper_bound);
1033 	}
1034 
1035 	if (mdecider) {
1036 	    if (!percent_cutoff) {
1037 		if (!collapser) {
1038 		    // We're not collapsing or doing a percentage cutoff, so
1039 		    // docs_matched is a lower bound on the total number of
1040 		    // matches.
1041 		    matches_lower_bound = max(docs_matched, matches_lower_bound);
1042 		}
1043 	    }
1044 
1045 	    // The estimate for the number of hits can be modified by
1046 	    // multiplying it by the rate at which the match decider has
1047 	    // been accepting documents.
1048 	    if (decider_considered > 0) {
1049 		double accept = double(decider_considered - decider_denied);
1050 		double accept_rate = accept / double(decider_considered);
1051 		estimate_scale *= accept_rate;
1052 	    }
1053 
1054 	    // If a document is denied by a match decider, it is not possible
1055 	    // for it to found to be a duplicate, so it is safe to also reduce
1056 	    // the upper bound by the number of documents denied by a match
1057 	    // decider.
1058 	    matches_upper_bound -= decider_denied;
1059 	    if (collapser) uncollapsed_upper_bound -= decider_denied;
1060 	}
1061 
1062 	if (percent_cutoff) {
1063 	    estimate_scale *= (1.0 - percent_cutoff_factor);
1064 	    // another approach:
1065 	    // Xapian::doccount new_est = items.size() * (1 - percent_cutoff_factor) / (1 - min_weight / greatest_wt);
1066 	    // and another: items.size() + (1 - greatest_wt * percent_cutoff_factor / min_weight) * (matches_estimated - items.size());
1067 
1068 	    // Very likely an underestimate, but we can't really do better
1069 	    // without checking further matches...  Only possibility would be
1070 	    // to track how many docs made the min weight test but didn't make
1071 	    // the candidate set since the last greatest_wt change, which we
1072 	    // could use if the top documents matched all the prob terms.
1073 	    matches_lower_bound = items.size();
1074 	    if (collapser) uncollapsed_lower_bound = matches_lower_bound;
1075 
1076 	    // matches_upper_bound could be reduced by the number of documents
1077 	    // which fail the min weight test (FIXME)
1078 
1079 	    LOGLINE(MATCH, "Adjusted bounds due to percent_cutoff (" <<
1080 		    percent_cutoff << "): now have matches_estimated=" <<
1081 		    matches_estimated << ", matches_lower_bound=" <<
1082 		    matches_lower_bound);
1083 	}
1084 
1085 	if (collapser && estimate_scale != 1.0) {
1086 	    uncollapsed_estimated =
1087 		Xapian::doccount(uncollapsed_estimated * estimate_scale + 0.5);
1088 	}
1089 
1090 	estimate_scale *= unique_rate;
1091 
1092 	if (estimate_scale != 1.0) {
1093 	    matches_estimated =
1094 		Xapian::doccount(matches_estimated * estimate_scale + 0.5);
1095 	    if (matches_estimated < matches_lower_bound)
1096 		matches_estimated = matches_lower_bound;
1097 	}
1098 
1099 	if (collapser || mdecider) {
1100 	    LOGLINE(MATCH, "Clamping estimate between bounds: "
1101 		    "matches_lower_bound = " << matches_lower_bound <<
1102 		    ", matches_estimated = " << matches_estimated <<
1103 		    ", matches_upper_bound = " << matches_upper_bound);
1104 	    AssertRel(matches_lower_bound,<=,matches_upper_bound);
1105 	    matches_estimated = max(matches_estimated, matches_lower_bound);
1106 	    matches_estimated = min(matches_estimated, matches_upper_bound);
1107 	} else if (!percent_cutoff) {
1108 	    AssertRel(docs_matched,<=,matches_upper_bound);
1109 	    if (docs_matched > matches_lower_bound)
1110 		matches_lower_bound = docs_matched;
1111 	    if (docs_matched > matches_estimated)
1112 		matches_estimated = docs_matched;
1113 	}
1114 
1115 	if (collapser && !mdecider && !percent_cutoff) {
1116 	    AssertRel(docs_matched,<=,uncollapsed_upper_bound);
1117 	    if (docs_matched > uncollapsed_lower_bound)
1118 		uncollapsed_lower_bound = docs_matched;
1119 	}
1120     }
1121 
1122     if (collapser) {
1123 	AssertRel(uncollapsed_lower_bound,<=,uncollapsed_upper_bound);
1124 	if (uncollapsed_estimated < uncollapsed_lower_bound) {
1125 	    uncollapsed_estimated = uncollapsed_lower_bound;
1126 	} else if (uncollapsed_estimated > uncollapsed_upper_bound) {
1127 	    uncollapsed_estimated = uncollapsed_upper_bound;
1128 	}
1129     } else {
1130 	// We're not collapsing, so the bounds must be the same.
1131 	uncollapsed_lower_bound = matches_lower_bound;
1132 	uncollapsed_upper_bound = matches_upper_bound;
1133 	uncollapsed_estimated = matches_estimated;
1134     }
1135 
1136     LOGLINE(MATCH, items.size() << " items in potential mset");
1137 
1138     if (first > 0) {
1139 	// Remove unwanted leading entries
1140 	if (items.size() <= first) {
1141 	    items.clear();
1142 	} else {
1143 	    LOGLINE(MATCH, "finding " << first << "th");
1144 	    // We perform nth_element() on reverse iterators so that the
1145 	    // unwanted elements end up at the end of items, which means
1146 	    // that the call to erase() to remove them doesn't have to copy
1147 	    // any elements.
1148 	    vector<Xapian::Internal::MSetItem>::reverse_iterator nth;
1149 	    nth = items.rbegin() + first;
1150 	    nth_element(items.rbegin(), nth, items.rend(), mcmp);
1151 	    // Erase the trailing "first" elements
1152 	    items.erase(items.begin() + items.size() - first, items.end());
1153 	}
1154     }
1155 
1156     LOGLINE(MATCH, "sorting " << items.size() << " entries");
1157 
1158     // We could use std::sort_heap if is_heap is true, but profiling
1159     // suggests that's actually slower.  Cast to void to suppress
1160     // compiler warnings that the last set value of is_heap is never
1161     // used.
1162     (void)is_heap;
1163 
1164     // Need a stable sort, but this is provided by comparison operator
1165     sort(items.begin(), items.end(), mcmp);
1166 
1167     if (!items.empty()) {
1168 	LOGLINE(MATCH, "min weight in mset = " << items.back().wt);
1169 	LOGLINE(MATCH, "max weight in mset = " << items[0].wt);
1170     }
1171 
1172     AssertRel(matches_estimated,>=,matches_lower_bound);
1173     AssertRel(matches_estimated,<=,matches_upper_bound);
1174 
1175     AssertRel(uncollapsed_estimated,>=,uncollapsed_lower_bound);
1176     AssertRel(uncollapsed_estimated,<=,uncollapsed_upper_bound);
1177 
1178     // We may need to qualify any collapse_count to see if the highest weight
1179     // collapsed item would have qualified percent_cutoff
1180     // We WILL need to restore collapse_count to the mset by taking from
1181     // collapse_tab; this is what comes of copying around whole objects
1182     // instead of taking references, we find it hard to update collapse_count
1183     // of an item that has already been pushed-back as we don't know where it
1184     // is any more.  If we keep or find references we won't need to mess with
1185     // is_heap so much maybe?
1186     if (!items.empty() && collapser && !collapser.empty()) {
1187 	// Nicked this formula from above.
1188 	double min_wt = 0.0;
1189 	if (percent_scale > 0.0)
1190 	    min_wt = percent_cutoff_factor / percent_scale;
1191 	Xapian::doccount entries = collapser.entries();
1192 	vector<Xapian::Internal::MSetItem>::iterator i;
1193 	for (i = items.begin(); i != items.end(); ++i) {
1194 	    // Skip entries without a collapse key.
1195 	    if (i->collapse_key.empty()) continue;
1196 
1197 	    // Set collapse_count appropriately.
1198 	    i->collapse_count = collapser.get_collapse_count(i->collapse_key, percent_cutoff, min_wt);
1199 	    if (--entries == 0) {
1200 		// Stop once we've processed all items with collapse keys.
1201 		break;
1202 	    }
1203 	}
1204     }
1205 
1206     mset.internal = new Xapian::MSet::Internal(
1207 				       first,
1208 				       matches_upper_bound,
1209 				       matches_lower_bound,
1210 				       matches_estimated,
1211 				       uncollapsed_upper_bound,
1212 				       uncollapsed_lower_bound,
1213 				       uncollapsed_estimated,
1214 				       max_possible, greatest_wt, items,
1215 				       percent_scale * 100.0);
1216 }
1217