1// Copyright (c) 2014 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bleve 16 17import ( 18 "sort" 19 "sync" 20 "time" 21 22 "golang.org/x/net/context" 23 24 "github.com/blevesearch/bleve/document" 25 "github.com/blevesearch/bleve/index" 26 "github.com/blevesearch/bleve/index/store" 27 "github.com/blevesearch/bleve/mapping" 28 "github.com/blevesearch/bleve/search" 29) 30 31type indexAliasImpl struct { 32 name string 33 indexes []Index 34 mutex sync.RWMutex 35 open bool 36} 37 38// NewIndexAlias creates a new IndexAlias over the provided 39// Index objects. 40func NewIndexAlias(indexes ...Index) *indexAliasImpl { 41 return &indexAliasImpl{ 42 name: "alias", 43 indexes: indexes, 44 open: true, 45 } 46} 47 48func (i *indexAliasImpl) isAliasToSingleIndex() error { 49 if len(i.indexes) < 1 { 50 return ErrorAliasEmpty 51 } else if len(i.indexes) > 1 { 52 return ErrorAliasMulti 53 } 54 return nil 55} 56 57func (i *indexAliasImpl) Index(id string, data interface{}) error { 58 i.mutex.RLock() 59 defer i.mutex.RUnlock() 60 61 if !i.open { 62 return ErrorIndexClosed 63 } 64 65 err := i.isAliasToSingleIndex() 66 if err != nil { 67 return err 68 } 69 70 return i.indexes[0].Index(id, data) 71} 72 73func (i *indexAliasImpl) Delete(id string) error { 74 i.mutex.RLock() 75 defer i.mutex.RUnlock() 76 77 if !i.open { 78 return ErrorIndexClosed 79 } 80 81 err := i.isAliasToSingleIndex() 82 if err != nil { 83 return err 84 } 85 86 return i.indexes[0].Delete(id) 87} 88 89func (i *indexAliasImpl) Batch(b *Batch) error { 90 i.mutex.RLock() 91 defer i.mutex.RUnlock() 92 93 if !i.open { 94 return ErrorIndexClosed 95 } 96 97 err := i.isAliasToSingleIndex() 98 if err != nil { 99 return err 100 } 101 102 return i.indexes[0].Batch(b) 103} 104 105func (i *indexAliasImpl) Document(id string) (*document.Document, error) { 106 i.mutex.RLock() 107 defer i.mutex.RUnlock() 108 109 if !i.open { 110 return nil, ErrorIndexClosed 111 } 112 113 err := i.isAliasToSingleIndex() 114 if err != nil { 115 return nil, err 116 } 117 118 return i.indexes[0].Document(id) 119} 120 121func (i *indexAliasImpl) DocCount() (uint64, error) { 122 i.mutex.RLock() 123 defer i.mutex.RUnlock() 124 125 rv := uint64(0) 126 127 if !i.open { 128 return 0, ErrorIndexClosed 129 } 130 131 for _, index := range i.indexes { 132 otherCount, err := index.DocCount() 133 if err == nil { 134 rv += otherCount 135 } 136 // tolerate errors to produce partial counts 137 } 138 139 return rv, nil 140} 141 142func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) { 143 return i.SearchInContext(context.Background(), req) 144} 145 146func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) { 147 i.mutex.RLock() 148 defer i.mutex.RUnlock() 149 150 if !i.open { 151 return nil, ErrorIndexClosed 152 } 153 154 if len(i.indexes) < 1 { 155 return nil, ErrorAliasEmpty 156 } 157 158 // short circuit the simple case 159 if len(i.indexes) == 1 { 160 return i.indexes[0].SearchInContext(ctx, req) 161 } 162 163 return MultiSearch(ctx, req, i.indexes...) 164} 165 166func (i *indexAliasImpl) Fields() ([]string, error) { 167 i.mutex.RLock() 168 defer i.mutex.RUnlock() 169 170 if !i.open { 171 return nil, ErrorIndexClosed 172 } 173 174 err := i.isAliasToSingleIndex() 175 if err != nil { 176 return nil, err 177 } 178 179 return i.indexes[0].Fields() 180} 181 182func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) { 183 i.mutex.RLock() 184 185 if !i.open { 186 i.mutex.RUnlock() 187 return nil, ErrorIndexClosed 188 } 189 190 err := i.isAliasToSingleIndex() 191 if err != nil { 192 i.mutex.RUnlock() 193 return nil, err 194 } 195 196 fieldDict, err := i.indexes[0].FieldDict(field) 197 if err != nil { 198 i.mutex.RUnlock() 199 return nil, err 200 } 201 202 return &indexAliasImplFieldDict{ 203 index: i, 204 fieldDict: fieldDict, 205 }, nil 206} 207 208func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) { 209 i.mutex.RLock() 210 211 if !i.open { 212 i.mutex.RUnlock() 213 return nil, ErrorIndexClosed 214 } 215 216 err := i.isAliasToSingleIndex() 217 if err != nil { 218 i.mutex.RUnlock() 219 return nil, err 220 } 221 222 fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm) 223 if err != nil { 224 i.mutex.RUnlock() 225 return nil, err 226 } 227 228 return &indexAliasImplFieldDict{ 229 index: i, 230 fieldDict: fieldDict, 231 }, nil 232} 233 234func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) { 235 i.mutex.RLock() 236 237 if !i.open { 238 i.mutex.RUnlock() 239 return nil, ErrorIndexClosed 240 } 241 242 err := i.isAliasToSingleIndex() 243 if err != nil { 244 i.mutex.RUnlock() 245 return nil, err 246 } 247 248 fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix) 249 if err != nil { 250 i.mutex.RUnlock() 251 return nil, err 252 } 253 254 return &indexAliasImplFieldDict{ 255 index: i, 256 fieldDict: fieldDict, 257 }, nil 258} 259 260func (i *indexAliasImpl) Close() error { 261 i.mutex.Lock() 262 defer i.mutex.Unlock() 263 264 i.open = false 265 return nil 266} 267 268func (i *indexAliasImpl) Mapping() mapping.IndexMapping { 269 i.mutex.RLock() 270 defer i.mutex.RUnlock() 271 272 if !i.open { 273 return nil 274 } 275 276 err := i.isAliasToSingleIndex() 277 if err != nil { 278 return nil 279 } 280 281 return i.indexes[0].Mapping() 282} 283 284func (i *indexAliasImpl) Stats() *IndexStat { 285 i.mutex.RLock() 286 defer i.mutex.RUnlock() 287 288 if !i.open { 289 return nil 290 } 291 292 err := i.isAliasToSingleIndex() 293 if err != nil { 294 return nil 295 } 296 297 return i.indexes[0].Stats() 298} 299 300func (i *indexAliasImpl) StatsMap() map[string]interface{} { 301 i.mutex.RLock() 302 defer i.mutex.RUnlock() 303 304 if !i.open { 305 return nil 306 } 307 308 err := i.isAliasToSingleIndex() 309 if err != nil { 310 return nil 311 } 312 313 return i.indexes[0].StatsMap() 314} 315 316func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) { 317 i.mutex.RLock() 318 defer i.mutex.RUnlock() 319 320 if !i.open { 321 return nil, ErrorIndexClosed 322 } 323 324 err := i.isAliasToSingleIndex() 325 if err != nil { 326 return nil, err 327 } 328 329 return i.indexes[0].GetInternal(key) 330} 331 332func (i *indexAliasImpl) SetInternal(key, val []byte) error { 333 i.mutex.RLock() 334 defer i.mutex.RUnlock() 335 336 if !i.open { 337 return ErrorIndexClosed 338 } 339 340 err := i.isAliasToSingleIndex() 341 if err != nil { 342 return err 343 } 344 345 return i.indexes[0].SetInternal(key, val) 346} 347 348func (i *indexAliasImpl) DeleteInternal(key []byte) error { 349 i.mutex.RLock() 350 defer i.mutex.RUnlock() 351 352 if !i.open { 353 return ErrorIndexClosed 354 } 355 356 err := i.isAliasToSingleIndex() 357 if err != nil { 358 return err 359 } 360 361 return i.indexes[0].DeleteInternal(key) 362} 363 364func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) { 365 i.mutex.RLock() 366 defer i.mutex.RUnlock() 367 368 if !i.open { 369 return nil, nil, ErrorIndexClosed 370 } 371 372 err := i.isAliasToSingleIndex() 373 if err != nil { 374 return nil, nil, err 375 } 376 377 return i.indexes[0].Advanced() 378} 379 380func (i *indexAliasImpl) Add(indexes ...Index) { 381 i.mutex.Lock() 382 defer i.mutex.Unlock() 383 384 i.indexes = append(i.indexes, indexes...) 385} 386 387func (i *indexAliasImpl) removeSingle(index Index) { 388 for pos, in := range i.indexes { 389 if in == index { 390 i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...) 391 break 392 } 393 } 394} 395 396func (i *indexAliasImpl) Remove(indexes ...Index) { 397 i.mutex.Lock() 398 defer i.mutex.Unlock() 399 400 for _, in := range indexes { 401 i.removeSingle(in) 402 } 403} 404 405func (i *indexAliasImpl) Swap(in, out []Index) { 406 i.mutex.Lock() 407 defer i.mutex.Unlock() 408 409 // add 410 i.indexes = append(i.indexes, in...) 411 412 // delete 413 for _, ind := range out { 414 i.removeSingle(ind) 415 } 416} 417 418// createChildSearchRequest creates a separate 419// request from the original 420// For now, avoid data race on req structure. 421// TODO disable highlight/field load on child 422// requests, and add code to do this only on 423// the actual final results. 424// Perhaps that part needs to be optional, 425// could be slower in remote usages. 426func createChildSearchRequest(req *SearchRequest) *SearchRequest { 427 rv := SearchRequest{ 428 Query: req.Query, 429 Size: req.Size + req.From, 430 From: 0, 431 Highlight: req.Highlight, 432 Fields: req.Fields, 433 Facets: req.Facets, 434 Explain: req.Explain, 435 Sort: req.Sort.Copy(), 436 IncludeLocations: req.IncludeLocations, 437 } 438 return &rv 439} 440 441type asyncSearchResult struct { 442 Name string 443 Result *SearchResult 444 Err error 445} 446 447// MultiSearch executes a SearchRequest across multiple Index objects, 448// then merges the results. The indexes must honor any ctx deadline. 449func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) { 450 451 searchStart := time.Now() 452 asyncResults := make(chan *asyncSearchResult, len(indexes)) 453 454 // run search on each index in separate go routine 455 var waitGroup sync.WaitGroup 456 457 var searchChildIndex = func(in Index, childReq *SearchRequest) { 458 rv := asyncSearchResult{Name: in.Name()} 459 rv.Result, rv.Err = in.SearchInContext(ctx, childReq) 460 asyncResults <- &rv 461 waitGroup.Done() 462 } 463 464 waitGroup.Add(len(indexes)) 465 for _, in := range indexes { 466 go searchChildIndex(in, createChildSearchRequest(req)) 467 } 468 469 // on another go routine, close after finished 470 go func() { 471 waitGroup.Wait() 472 close(asyncResults) 473 }() 474 475 var sr *SearchResult 476 indexErrors := make(map[string]error) 477 478 for asr := range asyncResults { 479 if asr.Err == nil { 480 if sr == nil { 481 // first result 482 sr = asr.Result 483 } else { 484 // merge with previous 485 sr.Merge(asr.Result) 486 } 487 } else { 488 indexErrors[asr.Name] = asr.Err 489 } 490 } 491 492 // merge just concatenated all the hits 493 // now lets clean it up 494 495 // handle case where no results were successful 496 if sr == nil { 497 sr = &SearchResult{ 498 Status: &SearchStatus{ 499 Errors: make(map[string]error), 500 }, 501 } 502 } 503 504 // sort all hits with the requested order 505 if len(req.Sort) > 0 { 506 sorter := newMultiSearchHitSorter(req.Sort, sr.Hits) 507 sort.Sort(sorter) 508 } 509 510 // now skip over the correct From 511 if req.From > 0 && len(sr.Hits) > req.From { 512 sr.Hits = sr.Hits[req.From:] 513 } else if req.From > 0 { 514 sr.Hits = search.DocumentMatchCollection{} 515 } 516 517 // now trim to the correct size 518 if req.Size > 0 && len(sr.Hits) > req.Size { 519 sr.Hits = sr.Hits[0:req.Size] 520 } 521 522 // fix up facets 523 for name, fr := range req.Facets { 524 sr.Facets.Fixup(name, fr.Size) 525 } 526 527 // fix up original request 528 sr.Request = req 529 searchDuration := time.Since(searchStart) 530 sr.Took = searchDuration 531 532 // fix up errors 533 if len(indexErrors) > 0 { 534 if sr.Status.Errors == nil { 535 sr.Status.Errors = make(map[string]error) 536 } 537 for indexName, indexErr := range indexErrors { 538 sr.Status.Errors[indexName] = indexErr 539 sr.Status.Total++ 540 sr.Status.Failed++ 541 } 542 } 543 544 return sr, nil 545} 546 547func (i *indexAliasImpl) NewBatch() *Batch { 548 i.mutex.RLock() 549 defer i.mutex.RUnlock() 550 551 if !i.open { 552 return nil 553 } 554 555 err := i.isAliasToSingleIndex() 556 if err != nil { 557 return nil 558 } 559 560 return i.indexes[0].NewBatch() 561} 562 563func (i *indexAliasImpl) Name() string { 564 return i.name 565} 566 567func (i *indexAliasImpl) SetName(name string) { 568 i.name = name 569} 570 571type indexAliasImplFieldDict struct { 572 index *indexAliasImpl 573 fieldDict index.FieldDict 574} 575 576func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) { 577 return f.fieldDict.Next() 578} 579 580func (f *indexAliasImplFieldDict) Close() error { 581 defer f.index.mutex.RUnlock() 582 return f.fieldDict.Close() 583} 584 585type multiSearchHitSorter struct { 586 hits search.DocumentMatchCollection 587 sort search.SortOrder 588 cachedScoring []bool 589 cachedDesc []bool 590} 591 592func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter { 593 return &multiSearchHitSorter{ 594 sort: sort, 595 hits: hits, 596 cachedScoring: sort.CacheIsScore(), 597 cachedDesc: sort.CacheDescending(), 598 } 599} 600 601func (m *multiSearchHitSorter) Len() int { return len(m.hits) } 602func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] } 603func (m *multiSearchHitSorter) Less(i, j int) bool { 604 c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j]) 605 return c < 0 606} 607