1//  Copyright (c) 2014 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package collector
16
17import (
18	"context"
19	"reflect"
20	"strconv"
21	"time"
22
23	"github.com/blevesearch/bleve/index"
24	"github.com/blevesearch/bleve/search"
25	"github.com/blevesearch/bleve/size"
26)
27
28var reflectStaticSizeTopNCollector int
29
30func init() {
31	var coll TopNCollector
32	reflectStaticSizeTopNCollector = int(reflect.TypeOf(coll).Size())
33}
34
35type collectorStore interface {
36	// Add the document, and if the new store size exceeds the provided size
37	// the last element is removed and returned.  If the size has not been
38	// exceeded, nil is returned.
39	AddNotExceedingSize(doc *search.DocumentMatch, size int) *search.DocumentMatch
40
41	Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error)
42}
43
44// PreAllocSizeSkipCap will cap preallocation to this amount when
45// size+skip exceeds this value
46var PreAllocSizeSkipCap = 1000
47
48type collectorCompare func(i, j *search.DocumentMatch) int
49
50type collectorFixup func(d *search.DocumentMatch) error
51
52// TopNCollector collects the top N hits, optionally skipping some results
53type TopNCollector struct {
54	size          int
55	skip          int
56	total         uint64
57	maxScore      float64
58	took          time.Duration
59	sort          search.SortOrder
60	results       search.DocumentMatchCollection
61	facetsBuilder *search.FacetsBuilder
62
63	store collectorStore
64
65	needDocIds    bool
66	neededFields  []string
67	cachedScoring []bool
68	cachedDesc    []bool
69
70	lowestMatchOutsideResults *search.DocumentMatch
71	updateFieldVisitor        index.DocumentFieldTermVisitor
72	dvReader                  index.DocValueReader
73	searchAfter               *search.DocumentMatch
74}
75
76// CheckDoneEvery controls how frequently we check the context deadline
77const CheckDoneEvery = uint64(1024)
78
79// NewTopNCollector builds a collector to find the top 'size' hits
80// skipping over the first 'skip' hits
81// ordering hits by the provided sort order
82func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
83	return newTopNCollector(size, skip, sort)
84}
85
86// NewTopNCollector builds a collector to find the top 'size' hits
87// skipping over the first 'skip' hits
88// ordering hits by the provided sort order
89func NewTopNCollectorAfter(size int, sort search.SortOrder, after []string) *TopNCollector {
90	rv := newTopNCollector(size, 0, sort)
91	rv.searchAfter = &search.DocumentMatch{
92		Sort: after,
93	}
94
95	for pos, ss := range sort {
96		if ss.RequiresDocID() {
97			rv.searchAfter.ID = after[pos]
98		}
99		if ss.RequiresScoring() {
100			if score, err := strconv.ParseFloat(after[pos], 64); err == nil {
101				rv.searchAfter.Score = score
102			}
103		}
104	}
105
106	return rv
107}
108
109func newTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector {
110	hc := &TopNCollector{size: size, skip: skip, sort: sort}
111
112	// pre-allocate space on the store to avoid reslicing
113	// unless the size + skip is too large, then cap it
114	// everything should still work, just reslices as necessary
115	backingSize := size + skip + 1
116	if size+skip > PreAllocSizeSkipCap {
117		backingSize = PreAllocSizeSkipCap + 1
118	}
119
120	if size+skip > 10 {
121		hc.store = newStoreHeap(backingSize, func(i, j *search.DocumentMatch) int {
122			return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j)
123		})
124	} else {
125		hc.store = newStoreSlice(backingSize, func(i, j *search.DocumentMatch) int {
126			return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j)
127		})
128	}
129
130	// these lookups traverse an interface, so do once up-front
131	if sort.RequiresDocID() {
132		hc.needDocIds = true
133	}
134	hc.neededFields = sort.RequiredFields()
135	hc.cachedScoring = sort.CacheIsScore()
136	hc.cachedDesc = sort.CacheDescending()
137
138	return hc
139}
140
141func (hc *TopNCollector) Size() int {
142	sizeInBytes := reflectStaticSizeTopNCollector + size.SizeOfPtr
143
144	if hc.facetsBuilder != nil {
145		sizeInBytes += hc.facetsBuilder.Size()
146	}
147
148	for _, entry := range hc.neededFields {
149		sizeInBytes += len(entry) + size.SizeOfString
150	}
151
152	sizeInBytes += len(hc.cachedScoring) + len(hc.cachedDesc)
153
154	return sizeInBytes
155}
156
157// Collect goes to the index to find the matching documents
158func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error {
159	startTime := time.Now()
160	var err error
161	var next *search.DocumentMatch
162
163	// pre-allocate enough space in the DocumentMatchPool
164	// unless the size + skip is too large, then cap it
165	// everything should still work, just allocates DocumentMatches on demand
166	backingSize := hc.size + hc.skip + 1
167	if hc.size+hc.skip > PreAllocSizeSkipCap {
168		backingSize = PreAllocSizeSkipCap + 1
169	}
170	searchContext := &search.SearchContext{
171		DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), len(hc.sort)),
172		Collector:         hc,
173		IndexReader:       reader,
174	}
175
176	hc.dvReader, err = reader.DocValueReader(hc.neededFields)
177	if err != nil {
178		return err
179	}
180
181	hc.updateFieldVisitor = func(field string, term []byte) {
182		if hc.facetsBuilder != nil {
183			hc.facetsBuilder.UpdateVisitor(field, term)
184		}
185		hc.sort.UpdateVisitor(field, term)
186	}
187
188	dmHandlerMaker := MakeTopNDocumentMatchHandler
189	if cv := ctx.Value(search.MakeDocumentMatchHandlerKey); cv != nil {
190		dmHandlerMaker = cv.(search.MakeDocumentMatchHandler)
191	}
192	// use the application given builder for making the custom document match
193	// handler and perform callbacks/invocations on the newly made handler.
194	dmHandler, loadID, err := dmHandlerMaker(searchContext)
195	if err != nil {
196		return err
197	}
198
199	hc.needDocIds = hc.needDocIds || loadID
200
201	select {
202	case <-ctx.Done():
203		return ctx.Err()
204	default:
205		next, err = searcher.Next(searchContext)
206	}
207	for err == nil && next != nil {
208		if hc.total%CheckDoneEvery == 0 {
209			select {
210			case <-ctx.Done():
211				return ctx.Err()
212			default:
213			}
214		}
215
216		err = hc.prepareDocumentMatch(searchContext, reader, next)
217		if err != nil {
218			break
219		}
220
221		err = dmHandler(next)
222		if err != nil {
223			break
224		}
225
226		next, err = searcher.Next(searchContext)
227	}
228
229	// help finalize/flush the results in case
230	// of custom document match handlers.
231	err = dmHandler(nil)
232	if err != nil {
233		return err
234	}
235
236	// compute search duration
237	hc.took = time.Since(startTime)
238	if err != nil {
239		return err
240	}
241	// finalize actual results
242	err = hc.finalizeResults(reader)
243	if err != nil {
244		return err
245	}
246	return nil
247}
248
249var sortByScoreOpt = []string{"_score"}
250
251func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext,
252	reader index.IndexReader, d *search.DocumentMatch) (err error) {
253
254	// visit field terms for features that require it (sort, facets)
255	if len(hc.neededFields) > 0 {
256		err = hc.visitFieldTerms(reader, d)
257		if err != nil {
258			return err
259		}
260	}
261
262	// increment total hits
263	hc.total++
264	d.HitNumber = hc.total
265
266	// update max score
267	if d.Score > hc.maxScore {
268		hc.maxScore = d.Score
269	}
270
271	// see if we need to load ID (at this early stage, for example to sort on it)
272	if hc.needDocIds {
273		d.ID, err = reader.ExternalID(d.IndexInternalID)
274		if err != nil {
275			return err
276		}
277	}
278
279	// compute this hits sort value
280	if len(hc.sort) == 1 && hc.cachedScoring[0] {
281		d.Sort = sortByScoreOpt
282	} else {
283		hc.sort.Value(d)
284	}
285
286	return nil
287}
288
289func MakeTopNDocumentMatchHandler(
290	ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) {
291	var hc *TopNCollector
292	var ok bool
293	if hc, ok = ctx.Collector.(*TopNCollector); ok {
294		return func(d *search.DocumentMatch) error {
295			if d == nil {
296				return nil
297			}
298
299			// support search after based pagination,
300			// if this hit is <= the search after sort key
301			// we should skip it
302			if hc.searchAfter != nil {
303				// exact sort order matches use hit number to break tie
304				// but we want to allow for exact match, so we pretend
305				hc.searchAfter.HitNumber = d.HitNumber
306				if hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.searchAfter) <= 0 {
307					return nil
308				}
309			}
310
311			// optimization, we track lowest sorting hit already removed from heap
312			// with this one comparison, we can avoid all heap operations if
313			// this hit would have been added and then immediately removed
314			if hc.lowestMatchOutsideResults != nil {
315				cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d,
316					hc.lowestMatchOutsideResults)
317				if cmp >= 0 {
318					// this hit can't possibly be in the result set, so avoid heap ops
319					ctx.DocumentMatchPool.Put(d)
320					return nil
321				}
322			}
323
324			removed := hc.store.AddNotExceedingSize(d, hc.size+hc.skip)
325			if removed != nil {
326				if hc.lowestMatchOutsideResults == nil {
327					hc.lowestMatchOutsideResults = removed
328				} else {
329					cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc,
330						removed, hc.lowestMatchOutsideResults)
331					if cmp < 0 {
332						tmp := hc.lowestMatchOutsideResults
333						hc.lowestMatchOutsideResults = removed
334						ctx.DocumentMatchPool.Put(tmp)
335					}
336				}
337			}
338			return nil
339		}, false, nil
340	}
341	return nil, false, nil
342}
343
344// visitFieldTerms is responsible for visiting the field terms of the
345// search hit, and passing visited terms to the sort and facet builder
346func (hc *TopNCollector) visitFieldTerms(reader index.IndexReader, d *search.DocumentMatch) error {
347	if hc.facetsBuilder != nil {
348		hc.facetsBuilder.StartDoc()
349	}
350
351	err := hc.dvReader.VisitDocValues(d.IndexInternalID, hc.updateFieldVisitor)
352	if hc.facetsBuilder != nil {
353		hc.facetsBuilder.EndDoc()
354	}
355
356	return err
357}
358
359// SetFacetsBuilder registers a facet builder for this collector
360func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) {
361	hc.facetsBuilder = facetsBuilder
362	hc.neededFields = append(hc.neededFields, hc.facetsBuilder.RequiredFields()...)
363}
364
365// finalizeResults starts with the heap containing the final top size+skip
366// it now throws away the results to be skipped
367// and does final doc id lookup (if necessary)
368func (hc *TopNCollector) finalizeResults(r index.IndexReader) error {
369	var err error
370	hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error {
371		if doc.ID == "" {
372			// look up the id since we need it for lookup
373			var err error
374			doc.ID, err = r.ExternalID(doc.IndexInternalID)
375			if err != nil {
376				return err
377			}
378		}
379		doc.Complete(nil)
380		return nil
381	})
382
383	return err
384}
385
386// Results returns the collected hits
387func (hc *TopNCollector) Results() search.DocumentMatchCollection {
388	return hc.results
389}
390
391// Total returns the total number of hits
392func (hc *TopNCollector) Total() uint64 {
393	return hc.total
394}
395
396// MaxScore returns the maximum score seen across all the hits
397func (hc *TopNCollector) MaxScore() float64 {
398	return hc.maxScore
399}
400
401// Took returns the time spent collecting hits
402func (hc *TopNCollector) Took() time.Duration {
403	return hc.took
404}
405
406// FacetResults returns the computed facets results
407func (hc *TopNCollector) FacetResults() search.FacetResults {
408	if hc.facetsBuilder != nil {
409		return hc.facetsBuilder.Results()
410	}
411	return nil
412}
413