1// Copyright (c) 2014 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package collector 16 17import ( 18 "context" 19 "reflect" 20 "strconv" 21 "time" 22 23 "github.com/blevesearch/bleve/index" 24 "github.com/blevesearch/bleve/search" 25 "github.com/blevesearch/bleve/size" 26) 27 28var reflectStaticSizeTopNCollector int 29 30func init() { 31 var coll TopNCollector 32 reflectStaticSizeTopNCollector = int(reflect.TypeOf(coll).Size()) 33} 34 35type collectorStore interface { 36 // Add the document, and if the new store size exceeds the provided size 37 // the last element is removed and returned. If the size has not been 38 // exceeded, nil is returned. 39 AddNotExceedingSize(doc *search.DocumentMatch, size int) *search.DocumentMatch 40 41 Final(skip int, fixup collectorFixup) (search.DocumentMatchCollection, error) 42} 43 44// PreAllocSizeSkipCap will cap preallocation to this amount when 45// size+skip exceeds this value 46var PreAllocSizeSkipCap = 1000 47 48type collectorCompare func(i, j *search.DocumentMatch) int 49 50type collectorFixup func(d *search.DocumentMatch) error 51 52// TopNCollector collects the top N hits, optionally skipping some results 53type TopNCollector struct { 54 size int 55 skip int 56 total uint64 57 maxScore float64 58 took time.Duration 59 sort search.SortOrder 60 results search.DocumentMatchCollection 61 facetsBuilder *search.FacetsBuilder 62 63 store collectorStore 64 65 needDocIds bool 66 neededFields []string 67 cachedScoring []bool 68 cachedDesc []bool 69 70 lowestMatchOutsideResults *search.DocumentMatch 71 updateFieldVisitor index.DocumentFieldTermVisitor 72 dvReader index.DocValueReader 73 searchAfter *search.DocumentMatch 74} 75 76// CheckDoneEvery controls how frequently we check the context deadline 77const CheckDoneEvery = uint64(1024) 78 79// NewTopNCollector builds a collector to find the top 'size' hits 80// skipping over the first 'skip' hits 81// ordering hits by the provided sort order 82func NewTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector { 83 return newTopNCollector(size, skip, sort) 84} 85 86// NewTopNCollector builds a collector to find the top 'size' hits 87// skipping over the first 'skip' hits 88// ordering hits by the provided sort order 89func NewTopNCollectorAfter(size int, sort search.SortOrder, after []string) *TopNCollector { 90 rv := newTopNCollector(size, 0, sort) 91 rv.searchAfter = &search.DocumentMatch{ 92 Sort: after, 93 } 94 95 for pos, ss := range sort { 96 if ss.RequiresDocID() { 97 rv.searchAfter.ID = after[pos] 98 } 99 if ss.RequiresScoring() { 100 if score, err := strconv.ParseFloat(after[pos], 64); err == nil { 101 rv.searchAfter.Score = score 102 } 103 } 104 } 105 106 return rv 107} 108 109func newTopNCollector(size int, skip int, sort search.SortOrder) *TopNCollector { 110 hc := &TopNCollector{size: size, skip: skip, sort: sort} 111 112 // pre-allocate space on the store to avoid reslicing 113 // unless the size + skip is too large, then cap it 114 // everything should still work, just reslices as necessary 115 backingSize := size + skip + 1 116 if size+skip > PreAllocSizeSkipCap { 117 backingSize = PreAllocSizeSkipCap + 1 118 } 119 120 if size+skip > 10 { 121 hc.store = newStoreHeap(backingSize, func(i, j *search.DocumentMatch) int { 122 return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j) 123 }) 124 } else { 125 hc.store = newStoreSlice(backingSize, func(i, j *search.DocumentMatch) int { 126 return hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, i, j) 127 }) 128 } 129 130 // these lookups traverse an interface, so do once up-front 131 if sort.RequiresDocID() { 132 hc.needDocIds = true 133 } 134 hc.neededFields = sort.RequiredFields() 135 hc.cachedScoring = sort.CacheIsScore() 136 hc.cachedDesc = sort.CacheDescending() 137 138 return hc 139} 140 141func (hc *TopNCollector) Size() int { 142 sizeInBytes := reflectStaticSizeTopNCollector + size.SizeOfPtr 143 144 if hc.facetsBuilder != nil { 145 sizeInBytes += hc.facetsBuilder.Size() 146 } 147 148 for _, entry := range hc.neededFields { 149 sizeInBytes += len(entry) + size.SizeOfString 150 } 151 152 sizeInBytes += len(hc.cachedScoring) + len(hc.cachedDesc) 153 154 return sizeInBytes 155} 156 157// Collect goes to the index to find the matching documents 158func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher, reader index.IndexReader) error { 159 startTime := time.Now() 160 var err error 161 var next *search.DocumentMatch 162 163 // pre-allocate enough space in the DocumentMatchPool 164 // unless the size + skip is too large, then cap it 165 // everything should still work, just allocates DocumentMatches on demand 166 backingSize := hc.size + hc.skip + 1 167 if hc.size+hc.skip > PreAllocSizeSkipCap { 168 backingSize = PreAllocSizeSkipCap + 1 169 } 170 searchContext := &search.SearchContext{ 171 DocumentMatchPool: search.NewDocumentMatchPool(backingSize+searcher.DocumentMatchPoolSize(), len(hc.sort)), 172 Collector: hc, 173 IndexReader: reader, 174 } 175 176 hc.dvReader, err = reader.DocValueReader(hc.neededFields) 177 if err != nil { 178 return err 179 } 180 181 hc.updateFieldVisitor = func(field string, term []byte) { 182 if hc.facetsBuilder != nil { 183 hc.facetsBuilder.UpdateVisitor(field, term) 184 } 185 hc.sort.UpdateVisitor(field, term) 186 } 187 188 dmHandlerMaker := MakeTopNDocumentMatchHandler 189 if cv := ctx.Value(search.MakeDocumentMatchHandlerKey); cv != nil { 190 dmHandlerMaker = cv.(search.MakeDocumentMatchHandler) 191 } 192 // use the application given builder for making the custom document match 193 // handler and perform callbacks/invocations on the newly made handler. 194 dmHandler, loadID, err := dmHandlerMaker(searchContext) 195 if err != nil { 196 return err 197 } 198 199 hc.needDocIds = hc.needDocIds || loadID 200 201 select { 202 case <-ctx.Done(): 203 return ctx.Err() 204 default: 205 next, err = searcher.Next(searchContext) 206 } 207 for err == nil && next != nil { 208 if hc.total%CheckDoneEvery == 0 { 209 select { 210 case <-ctx.Done(): 211 return ctx.Err() 212 default: 213 } 214 } 215 216 err = hc.prepareDocumentMatch(searchContext, reader, next) 217 if err != nil { 218 break 219 } 220 221 err = dmHandler(next) 222 if err != nil { 223 break 224 } 225 226 next, err = searcher.Next(searchContext) 227 } 228 229 // help finalize/flush the results in case 230 // of custom document match handlers. 231 err = dmHandler(nil) 232 if err != nil { 233 return err 234 } 235 236 // compute search duration 237 hc.took = time.Since(startTime) 238 if err != nil { 239 return err 240 } 241 // finalize actual results 242 err = hc.finalizeResults(reader) 243 if err != nil { 244 return err 245 } 246 return nil 247} 248 249var sortByScoreOpt = []string{"_score"} 250 251func (hc *TopNCollector) prepareDocumentMatch(ctx *search.SearchContext, 252 reader index.IndexReader, d *search.DocumentMatch) (err error) { 253 254 // visit field terms for features that require it (sort, facets) 255 if len(hc.neededFields) > 0 { 256 err = hc.visitFieldTerms(reader, d) 257 if err != nil { 258 return err 259 } 260 } 261 262 // increment total hits 263 hc.total++ 264 d.HitNumber = hc.total 265 266 // update max score 267 if d.Score > hc.maxScore { 268 hc.maxScore = d.Score 269 } 270 271 // see if we need to load ID (at this early stage, for example to sort on it) 272 if hc.needDocIds { 273 d.ID, err = reader.ExternalID(d.IndexInternalID) 274 if err != nil { 275 return err 276 } 277 } 278 279 // compute this hits sort value 280 if len(hc.sort) == 1 && hc.cachedScoring[0] { 281 d.Sort = sortByScoreOpt 282 } else { 283 hc.sort.Value(d) 284 } 285 286 return nil 287} 288 289func MakeTopNDocumentMatchHandler( 290 ctx *search.SearchContext) (search.DocumentMatchHandler, bool, error) { 291 var hc *TopNCollector 292 var ok bool 293 if hc, ok = ctx.Collector.(*TopNCollector); ok { 294 return func(d *search.DocumentMatch) error { 295 if d == nil { 296 return nil 297 } 298 299 // support search after based pagination, 300 // if this hit is <= the search after sort key 301 // we should skip it 302 if hc.searchAfter != nil { 303 // exact sort order matches use hit number to break tie 304 // but we want to allow for exact match, so we pretend 305 hc.searchAfter.HitNumber = d.HitNumber 306 if hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, hc.searchAfter) <= 0 { 307 return nil 308 } 309 } 310 311 // optimization, we track lowest sorting hit already removed from heap 312 // with this one comparison, we can avoid all heap operations if 313 // this hit would have been added and then immediately removed 314 if hc.lowestMatchOutsideResults != nil { 315 cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, d, 316 hc.lowestMatchOutsideResults) 317 if cmp >= 0 { 318 // this hit can't possibly be in the result set, so avoid heap ops 319 ctx.DocumentMatchPool.Put(d) 320 return nil 321 } 322 } 323 324 removed := hc.store.AddNotExceedingSize(d, hc.size+hc.skip) 325 if removed != nil { 326 if hc.lowestMatchOutsideResults == nil { 327 hc.lowestMatchOutsideResults = removed 328 } else { 329 cmp := hc.sort.Compare(hc.cachedScoring, hc.cachedDesc, 330 removed, hc.lowestMatchOutsideResults) 331 if cmp < 0 { 332 tmp := hc.lowestMatchOutsideResults 333 hc.lowestMatchOutsideResults = removed 334 ctx.DocumentMatchPool.Put(tmp) 335 } 336 } 337 } 338 return nil 339 }, false, nil 340 } 341 return nil, false, nil 342} 343 344// visitFieldTerms is responsible for visiting the field terms of the 345// search hit, and passing visited terms to the sort and facet builder 346func (hc *TopNCollector) visitFieldTerms(reader index.IndexReader, d *search.DocumentMatch) error { 347 if hc.facetsBuilder != nil { 348 hc.facetsBuilder.StartDoc() 349 } 350 351 err := hc.dvReader.VisitDocValues(d.IndexInternalID, hc.updateFieldVisitor) 352 if hc.facetsBuilder != nil { 353 hc.facetsBuilder.EndDoc() 354 } 355 356 return err 357} 358 359// SetFacetsBuilder registers a facet builder for this collector 360func (hc *TopNCollector) SetFacetsBuilder(facetsBuilder *search.FacetsBuilder) { 361 hc.facetsBuilder = facetsBuilder 362 hc.neededFields = append(hc.neededFields, hc.facetsBuilder.RequiredFields()...) 363} 364 365// finalizeResults starts with the heap containing the final top size+skip 366// it now throws away the results to be skipped 367// and does final doc id lookup (if necessary) 368func (hc *TopNCollector) finalizeResults(r index.IndexReader) error { 369 var err error 370 hc.results, err = hc.store.Final(hc.skip, func(doc *search.DocumentMatch) error { 371 if doc.ID == "" { 372 // look up the id since we need it for lookup 373 var err error 374 doc.ID, err = r.ExternalID(doc.IndexInternalID) 375 if err != nil { 376 return err 377 } 378 } 379 doc.Complete(nil) 380 return nil 381 }) 382 383 return err 384} 385 386// Results returns the collected hits 387func (hc *TopNCollector) Results() search.DocumentMatchCollection { 388 return hc.results 389} 390 391// Total returns the total number of hits 392func (hc *TopNCollector) Total() uint64 { 393 return hc.total 394} 395 396// MaxScore returns the maximum score seen across all the hits 397func (hc *TopNCollector) MaxScore() float64 { 398 return hc.maxScore 399} 400 401// Took returns the time spent collecting hits 402func (hc *TopNCollector) Took() time.Duration { 403 return hc.took 404} 405 406// FacetResults returns the computed facets results 407func (hc *TopNCollector) FacetResults() search.FacetResults { 408 if hc.facetsBuilder != nil { 409 return hc.facetsBuilder.Results() 410 } 411 return nil 412} 413