1// Copyright (c) 2014 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package collector 16 17import ( 18 "context" 19 "reflect" 20 "strconv" 21 "time" 22 23 "github.com/blevesearch/bleve/v2/search" 24 "github.com/blevesearch/bleve/v2/size" 25 index "github.com/blevesearch/bleve_index_api" 26) 27 28var reflectStaticSizeTopNCollector int 29 30func init() { 31 var coll TopNCollector 32 reflectStaticSizeTopNCollector = int(reflect.TypeOf(coll).Size()) 33} 34 35type collectorStore interface { 36 // Add the document, and if the new store size exceeds the provided size 37 // the last element is removed and returned. If the size has not been 38 // exceeded, nil is returned. 39 AddNotExceedingSize(doc *search.DocumentMatch, size int) *search.DocumentMatch 40 41 Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error) 42} 43 44// PreAllocSizeSkipCap will cap preallocation to this amount when 45// size+skip exceeds this value 46var PreAllocSizeSkipCap = 1000 47 48type collectorCompare func(i, j *search.DocumentMatch) int 49 50type collectorFixup func(d *search.DocumentMatch) error 51 52// TopNCollector collects the top N hits, optionally skipping some results 53type TopNCollector struct { 54 size int 55 skip int 56 total uint64 57 maxScore float64 58 took time.Duration 59 sort search.SortOrder 60 results search.DocumentMatchCollection 61 facetsBuilder *search.FacetsBuilder 62 63 store collectorStore 64 65 needDocIds bool 66 neededFields []string 67 cachedScoring []bool 68 cachedDesc []bool 69 70 lowestMatchOutsideResults *search.DocumentMatch 71 updateFieldVisitor index.DocValueVisitor 72 dvReader index.DocValueReader 73 searchAfter *search.DocumentMatch 74} 75 76// CheckDoneEvery controls how frequently we check the context deadline 77const CheckDoneEvery = uint64(1024) 78 79// NewTopNCollector builds a collector to find the top 'size' hits 80// skipping over the first 'skip' hits 81// ordering hits by the provided sort order 82func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector { 83 return newTopNCollector(size, skip, sort) 84} 85 86// NewTopNCollectorAfter builds a collector to find the top 'size' hits 87// skipping over the first 'skip' hits 88// ordering hits by the provided sort order 89func NewTopNCollectorAfter(size int, sort search.SortOrder, after []string) *TopNCollector { 90 rv := newTopNCollector(size, 0, sort) 91 rv.searchAfter = &search.DocumentMatch{ 92 Sort: after, 93 } 94 95 for pos, ss := range sort { 96 if ss.RequiresDocID() { 97 rv.searchAfter.ID = after[pos] 98 } 99 if ss.RequiresScoring() { 100 if score, err := strconv.ParseFloat(after[pos], 64); err == nil { 101 rv.searchAfter.Score = score 102 } 103 } 104 } 105 106 return rv 107} 108 109func newTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector { 110 hc := &TopNCollector{size: size, skip: skip, sort: sort} 111 112 // pre-allocate space on the store to avoid reslicing 113 // unless the size + skip is too large, then cap it 114 // everything should still work, just reslices as necessary 115 backingSize := size + skip + 1 116 if size+skip > PreAllocSizeSkipCap { 117 backingSize = PreAllocSizeSkipCap + 1 118 } 119 120 if size+skip > 10 { 121 hc.store = newStoreHeap(backingSize, func(i, j *search.DocumentMatch) int { 122 return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j) 123 }) 124 } else { 125 hc.store = newStoreSlice(backingSize, func(i, j *search.DocumentMatch) int { 126 return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j) 127 }) 128 } 129 130 // these lookups traverse an interface, so do once up-front 131 if sort.RequiresDocID() { 132 hc.needDocIds = true 133 } 134 hc.neededFields = sort.RequiredFields() 135 hc.cachedScoring = sort.CacheIsScore() 136 hc.cachedDesc = sort.CacheDescending() 137 138 return hc 139} 140 141func (hc *TopNCollector) Size() int { 142 sizeInBytes := reflectStaticSizeTopNCollector + size.SizeOfPtr 143 144 if hc.facetsBuilder != nil { 145 sizeInBytes += hc.facetsBuilder.Size() 146 } 147 148 for _, entry := range hc.neededFields { 149 sizeInBytes += len(entry) + size.SizeOfString 150 } 151 152 sizeInBytes += len(hc.cachedScoring) + len(hc.cachedDesc) 153 154 return sizeInBytes 155} 156 157// Collect goes to the index to find the matching documents 158func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error { 159 startTime := time.Now() 160 var err error 161 var next *search.DocumentMatch 162 163 // pre-allocate enough space in the DocumentMatchPool 164 // unless the size + skip is too large, then cap it 165 // everything should still work, just allocates DocumentMatches on demand 166 backingSize := hc.size + hc.skip + 1 167 if hc.size+hc.skip > PreAllocSizeSkipCap { 168 backingSize = PreAllocSizeSkipCap + 1 169 } 170 searchContext := &search.SearchContext{ 171 DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), len(hc.sort)), 172 Collector: hc, 173 IndexReader: reader, 174 } 175 176 hc.dvReader, err = reader.DocValueReader(hc.neededFields) 177 if err != nil { 178 return err 179 } 180 181 hc.updateFieldVisitor = func(field string, term []byte) { 182 if hc.facetsBuilder != nil { 183 hc.facetsBuilder.UpdateVisitor(field, term) 184 } 185 hc.sort.UpdateVisitor(field, term) 186 } 187 188 dmHandlerMaker := MakeTopNDocumentMatchHandler 189 if cv := ctx.Value(search.MakeDocumentMatchHandlerKey); cv != nil { 190 dmHandlerMaker = cv.(search.MakeDocumentMatchHandler) 191 } 192 // use the application given builder for making the custom document match 193 // handler and perform callbacks/invocations on the newly made handler. 194 dmHandler, loadID, err := dmHandlerMaker(searchContext) 195 if err != nil { 196 return err 197 } 198 199 hc.needDocIds = hc.needDocIds || loadID 200 201 select { 202 case <-ctx.Done(): 203 return ctx.Err() 204 default: 205 next, err = searcher.Next(searchContext) 206 } 207 for err == nil && next != nil { 208 if hc.total%CheckDoneEvery == 0 { 209 select { 210 case <-ctx.Done(): 211 return ctx.Err() 212 default: 213 } 214 } 215 216 err = hc.prepareDocumentMatch(searchContext, reader, next) 217 if err != nil { 218 break 219 } 220 221 err = dmHandler(next) 222 if err != nil { 223 break 224 } 225 226 next, err = searcher.Next(searchContext) 227 } 228 229 // help finalize/flush the results in case 230 // of custom document match handlers. 231 err = dmHandler(nil) 232 if err != nil { 233 return err 234 } 235 236 // compute search duration 237 hc.took = time.Since(startTime) 238 239 // finalize actual results 240 err = hc.finalizeResults(reader) 241 if err != nil { 242 return err 243 } 244 return nil 245} 246 247var sortByScoreOpt = []string{"_score"} 248 249func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext, 250 reader index.IndexReader, d *search.DocumentMatch) (err error) { 251 252 // visit field terms for features that require it (sort, facets) 253 if len(hc.neededFields) > 0 { 254 err = hc.visitFieldTerms(reader, d) 255 if err != nil { 256 return err 257 } 258 } 259 260 // increment total hits 261 hc.total++ 262 d.HitNumber = hc.total 263 264 // update max score 265 if d.Score > hc.maxScore { 266 hc.maxScore = d.Score 267 } 268 269 // see if we need to load ID (at this early stage, for example to sort on it) 270 if hc.needDocIds { 271 d.ID, err = reader.ExternalID(d.IndexInternalID) 272 if err != nil { 273 return err 274 } 275 } 276 277 // compute this hits sort value 278 if len(hc.sort) == 1 && hc.cachedScoring[0] { 279 d.Sort = sortByScoreOpt 280 } else { 281 hc.sort.Value(d) 282 } 283 284 return nil 285} 286 287func MakeTopNDocumentMatchHandler( 288 ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) { 289 var hc *TopNCollector 290 var ok bool 291 if hc, ok = ctx.Collector.(*TopNCollector); ok { 292 return func(d *search.DocumentMatch) error { 293 if d == nil { 294 return nil 295 } 296 297 // support search after based pagination, 298 // if this hit is <= the search after sort key 299 // we should skip it 300 if hc.searchAfter != nil { 301 // exact sort order matches use hit number to break tie 302 // but we want to allow for exact match, so we pretend 303 hc.searchAfter.HitNumber = d.HitNumber 304 if hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.searchAfter) <= 0 { 305 return nil 306 } 307 } 308 309 // optimization, we track lowest sorting hit already removed from heap 310 // with this one comparison, we can avoid all heap operations if 311 // this hit would have been added and then immediately removed 312 if hc.lowestMatchOutsideResults != nil { 313 cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, 314 hc.lowestMatchOutsideResults) 315 if cmp >= 0 { 316 // this hit can't possibly be in the result set, so avoid heap ops 317 ctx.DocumentMatchPool.Put(d) 318 return nil 319 } 320 } 321 322 removed := hc.store.AddNotExceedingSize(d, hc.size+hc.skip) 323 if removed != nil { 324 if hc.lowestMatchOutsideResults == nil { 325 hc.lowestMatchOutsideResults = removed 326 } else { 327 cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, 328 removed, hc.lowestMatchOutsideResults) 329 if cmp < 0 { 330 tmp := hc.lowestMatchOutsideResults 331 hc.lowestMatchOutsideResults = removed 332 ctx.DocumentMatchPool.Put(tmp) 333 } 334 } 335 } 336 return nil 337 }, false, nil 338 } 339 return nil, false, nil 340} 341 342// visitFieldTerms is responsible for visiting the field terms of the 343// search hit, and passing visited terms to the sort and facet builder 344func (hc *TopNCollector) visitFieldTerms(reader index.IndexReader, d *search.DocumentMatch) error { 345 if hc.facetsBuilder != nil { 346 hc.facetsBuilder.StartDoc() 347 } 348 349 err := hc.dvReader.VisitDocValues(d.IndexInternalID, hc.updateFieldVisitor) 350 if hc.facetsBuilder != nil { 351 hc.facetsBuilder.EndDoc() 352 } 353 354 return err 355} 356 357// SetFacetsBuilder registers a facet builder for this collector 358func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) { 359 hc.facetsBuilder = facetsBuilder 360 hc.neededFields = append(hc.neededFields, hc.facetsBuilder.RequiredFields()...) 361} 362 363// finalizeResults starts with the heap containing the final top size+skip 364// it now throws away the results to be skipped 365// and does final doc id lookup (if necessary) 366func (hc *TopNCollector) finalizeResults(r index.IndexReader) error { 367 var err error 368 hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error { 369 if doc.ID == "" { 370 // look up the id since we need it for lookup 371 var err error 372 doc.ID, err = r.ExternalID(doc.IndexInternalID) 373 if err != nil { 374 return err 375 } 376 } 377 doc.Complete(nil) 378 return nil 379 }) 380 381 return err 382} 383 384// Results returns the collected hits 385func (hc *TopNCollector) Results() search.DocumentMatchCollection { 386 return hc.results 387} 388 389// Total returns the total number of hits 390func (hc *TopNCollector) Total() uint64 { 391 return hc.total 392} 393 394// MaxScore returns the maximum score seen across all the hits 395func (hc *TopNCollector) MaxScore() float64 { 396 return hc.maxScore 397} 398 399// Took returns the time spent collecting hits 400func (hc *TopNCollector) Took() time.Duration { 401 return hc.took 402} 403 404// FacetResults returns the computed facets results 405func (hc *TopNCollector) FacetResults() search.FacetResults { 406 if hc.facetsBuilder != nil { 407 return hc.facetsBuilder.Results() 408 } 409 return nil 410} 411