1// Copyright (c) 2014 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bleve 16 17import ( 18 "encoding/json" 19 "fmt" 20 "os" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "golang.org/x/net/context" 26 27 "github.com/blevesearch/bleve/document" 28 "github.com/blevesearch/bleve/index" 29 "github.com/blevesearch/bleve/index/store" 30 "github.com/blevesearch/bleve/index/upsidedown" 31 "github.com/blevesearch/bleve/mapping" 32 "github.com/blevesearch/bleve/registry" 33 "github.com/blevesearch/bleve/search" 34 "github.com/blevesearch/bleve/search/collector" 35 "github.com/blevesearch/bleve/search/facet" 36 "github.com/blevesearch/bleve/search/highlight" 37) 38 39type indexImpl struct { 40 path string 41 name string 42 meta *indexMeta 43 i index.Index 44 m mapping.IndexMapping 45 mutex sync.RWMutex 46 open bool 47 stats *IndexStat 48} 49 50const storePath = "store" 51 52var mappingInternalKey = []byte("_mapping") 53 54func indexStorePath(path string) string { 55 return path + string(os.PathSeparator) + storePath 56} 57 58func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) { 59 // first validate the mapping 60 err := mapping.Validate() 61 if err != nil { 62 return nil, err 63 } 64 65 if kvconfig == nil { 66 kvconfig = map[string]interface{}{} 67 } 68 69 if kvstore == "" { 70 return nil, fmt.Errorf("bleve not configured for file based indexing") 71 } 72 73 rv := indexImpl{ 74 path: path, 75 name: path, 76 m: mapping, 77 meta: newIndexMeta(indexType, kvstore, kvconfig), 78 } 79 rv.stats = &IndexStat{i: &rv} 80 // at this point there is hope that we can be successful, so save index meta 81 if path != "" { 82 err = rv.meta.Save(path) 83 if err != nil { 84 return nil, err 85 } 86 kvconfig["create_if_missing"] = true 87 kvconfig["error_if_exists"] = true 88 kvconfig["path"] = indexStorePath(path) 89 } else { 90 kvconfig["path"] = "" 91 } 92 93 // open the index 94 indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType) 95 if indexTypeConstructor == nil { 96 return nil, ErrorUnknownIndexType 97 } 98 99 rv.i, err = indexTypeConstructor(rv.meta.Storage, kvconfig, Config.analysisQueue) 100 if err != nil { 101 return nil, err 102 } 103 err = rv.i.Open() 104 if err != nil { 105 if err == index.ErrorUnknownStorageType { 106 return nil, ErrorUnknownStorageType 107 } 108 return nil, err 109 } 110 111 // now persist the mapping 112 mappingBytes, err := json.Marshal(mapping) 113 if err != nil { 114 return nil, err 115 } 116 err = rv.i.SetInternal(mappingInternalKey, mappingBytes) 117 if err != nil { 118 return nil, err 119 } 120 121 // mark the index as open 122 rv.mutex.Lock() 123 defer rv.mutex.Unlock() 124 rv.open = true 125 indexStats.Register(&rv) 126 return &rv, nil 127} 128 129func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *indexImpl, err error) { 130 rv = &indexImpl{ 131 path: path, 132 name: path, 133 } 134 rv.stats = &IndexStat{i: rv} 135 136 rv.meta, err = openIndexMeta(path) 137 if err != nil { 138 return nil, err 139 } 140 141 // backwards compatibility if index type is missing 142 if rv.meta.IndexType == "" { 143 rv.meta.IndexType = upsidedown.Name 144 } 145 146 storeConfig := rv.meta.Config 147 if storeConfig == nil { 148 storeConfig = map[string]interface{}{} 149 } 150 151 storeConfig["path"] = indexStorePath(path) 152 storeConfig["create_if_missing"] = false 153 storeConfig["error_if_exists"] = false 154 for rck, rcv := range runtimeConfig { 155 storeConfig[rck] = rcv 156 } 157 158 // open the index 159 indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType) 160 if indexTypeConstructor == nil { 161 return nil, ErrorUnknownIndexType 162 } 163 164 rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue) 165 if err != nil { 166 return nil, err 167 } 168 err = rv.i.Open() 169 if err != nil { 170 if err == index.ErrorUnknownStorageType { 171 return nil, ErrorUnknownStorageType 172 } 173 return nil, err 174 } 175 176 // now load the mapping 177 indexReader, err := rv.i.Reader() 178 if err != nil { 179 return nil, err 180 } 181 defer func() { 182 if cerr := indexReader.Close(); cerr != nil && err == nil { 183 err = cerr 184 } 185 }() 186 187 mappingBytes, err := indexReader.GetInternal(mappingInternalKey) 188 if err != nil { 189 return nil, err 190 } 191 192 var im *mapping.IndexMappingImpl 193 err = json.Unmarshal(mappingBytes, &im) 194 if err != nil { 195 return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes)) 196 } 197 198 // mark the index as open 199 rv.mutex.Lock() 200 defer rv.mutex.Unlock() 201 rv.open = true 202 203 // validate the mapping 204 err = im.Validate() 205 if err != nil { 206 // note even if the mapping is invalid 207 // we still return an open usable index 208 return rv, err 209 } 210 211 rv.m = im 212 indexStats.Register(rv) 213 return rv, err 214} 215 216// Advanced returns implementation internals 217// necessary ONLY for advanced usage. 218func (i *indexImpl) Advanced() (index.Index, store.KVStore, error) { 219 s, err := i.i.Advanced() 220 if err != nil { 221 return nil, nil, err 222 } 223 return i.i, s, nil 224} 225 226// Mapping returns the IndexMapping in use by this 227// Index. 228func (i *indexImpl) Mapping() mapping.IndexMapping { 229 return i.m 230} 231 232// Index the object with the specified identifier. 233// The IndexMapping for this index will determine 234// how the object is indexed. 235func (i *indexImpl) Index(id string, data interface{}) (err error) { 236 if id == "" { 237 return ErrorEmptyID 238 } 239 240 i.mutex.RLock() 241 defer i.mutex.RUnlock() 242 243 if !i.open { 244 return ErrorIndexClosed 245 } 246 247 doc := document.NewDocument(id) 248 err = i.m.MapDocument(doc, data) 249 if err != nil { 250 return 251 } 252 err = i.i.Update(doc) 253 return 254} 255 256// IndexAdvanced takes a document.Document object 257// skips the mapping and indexes it. 258func (i *indexImpl) IndexAdvanced(doc *document.Document) (err error) { 259 if doc.ID == "" { 260 return ErrorEmptyID 261 } 262 263 i.mutex.RLock() 264 defer i.mutex.RUnlock() 265 266 if !i.open { 267 return ErrorIndexClosed 268 } 269 270 err = i.i.Update(doc) 271 return 272} 273 274// Delete entries for the specified identifier from 275// the index. 276func (i *indexImpl) Delete(id string) (err error) { 277 if id == "" { 278 return ErrorEmptyID 279 } 280 281 i.mutex.RLock() 282 defer i.mutex.RUnlock() 283 284 if !i.open { 285 return ErrorIndexClosed 286 } 287 288 err = i.i.Delete(id) 289 return 290} 291 292// Batch executes multiple Index and Delete 293// operations at the same time. There are often 294// significant performance benefits when performing 295// operations in a batch. 296func (i *indexImpl) Batch(b *Batch) error { 297 i.mutex.RLock() 298 defer i.mutex.RUnlock() 299 300 if !i.open { 301 return ErrorIndexClosed 302 } 303 304 return i.i.Batch(b.internal) 305} 306 307// Document is used to find the values of all the 308// stored fields for a document in the index. These 309// stored fields are put back into a Document object 310// and returned. 311func (i *indexImpl) Document(id string) (doc *document.Document, err error) { 312 i.mutex.RLock() 313 defer i.mutex.RUnlock() 314 315 if !i.open { 316 return nil, ErrorIndexClosed 317 } 318 indexReader, err := i.i.Reader() 319 if err != nil { 320 return nil, err 321 } 322 defer func() { 323 if cerr := indexReader.Close(); err == nil && cerr != nil { 324 err = cerr 325 } 326 }() 327 328 doc, err = indexReader.Document(id) 329 if err != nil { 330 return nil, err 331 } 332 return doc, nil 333} 334 335// DocCount returns the number of documents in the 336// index. 337func (i *indexImpl) DocCount() (count uint64, err error) { 338 i.mutex.RLock() 339 defer i.mutex.RUnlock() 340 341 if !i.open { 342 return 0, ErrorIndexClosed 343 } 344 345 // open a reader for this search 346 indexReader, err := i.i.Reader() 347 if err != nil { 348 return 0, fmt.Errorf("error opening index reader %v", err) 349 } 350 defer func() { 351 if cerr := indexReader.Close(); err == nil && cerr != nil { 352 err = cerr 353 } 354 }() 355 356 count, err = indexReader.DocCount() 357 return 358} 359 360// Search executes a search request operation. 361// Returns a SearchResult object or an error. 362func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) { 363 return i.SearchInContext(context.Background(), req) 364} 365 366// SearchInContext executes a search request operation within the provided 367// Context. Returns a SearchResult object or an error. 368func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr *SearchResult, err error) { 369 i.mutex.RLock() 370 defer i.mutex.RUnlock() 371 372 searchStart := time.Now() 373 374 if !i.open { 375 return nil, ErrorIndexClosed 376 } 377 378 collector := collector.NewTopNCollector(req.Size, req.From, req.Sort) 379 380 // open a reader for this search 381 indexReader, err := i.i.Reader() 382 if err != nil { 383 return nil, fmt.Errorf("error opening index reader %v", err) 384 } 385 defer func() { 386 if cerr := indexReader.Close(); err == nil && cerr != nil { 387 err = cerr 388 } 389 }() 390 391 searcher, err := req.Query.Searcher(indexReader, i.m, search.SearcherOptions{ 392 Explain: req.Explain, 393 IncludeTermVectors: req.IncludeLocations || req.Highlight != nil, 394 }) 395 if err != nil { 396 return nil, err 397 } 398 defer func() { 399 if serr := searcher.Close(); err == nil && serr != nil { 400 err = serr 401 } 402 }() 403 404 if req.Facets != nil { 405 facetsBuilder := search.NewFacetsBuilder(indexReader) 406 for facetName, facetRequest := range req.Facets { 407 if facetRequest.NumericRanges != nil { 408 // build numeric range facet 409 facetBuilder := facet.NewNumericFacetBuilder(facetRequest.Field, facetRequest.Size) 410 for _, nr := range facetRequest.NumericRanges { 411 facetBuilder.AddRange(nr.Name, nr.Min, nr.Max) 412 } 413 facetsBuilder.Add(facetName, facetBuilder) 414 } else if facetRequest.DateTimeRanges != nil { 415 // build date range facet 416 facetBuilder := facet.NewDateTimeFacetBuilder(facetRequest.Field, facetRequest.Size) 417 dateTimeParser := i.m.DateTimeParserNamed("") 418 for _, dr := range facetRequest.DateTimeRanges { 419 start, end := dr.ParseDates(dateTimeParser) 420 facetBuilder.AddRange(dr.Name, start, end) 421 } 422 facetsBuilder.Add(facetName, facetBuilder) 423 } else { 424 // build terms facet 425 facetBuilder := facet.NewTermsFacetBuilder(facetRequest.Field, facetRequest.Size) 426 facetsBuilder.Add(facetName, facetBuilder) 427 } 428 } 429 collector.SetFacetsBuilder(facetsBuilder) 430 } 431 432 err = collector.Collect(ctx, searcher, indexReader) 433 if err != nil { 434 return nil, err 435 } 436 437 hits := collector.Results() 438 439 var highlighter highlight.Highlighter 440 441 if req.Highlight != nil { 442 // get the right highlighter 443 highlighter, err = Config.Cache.HighlighterNamed(Config.DefaultHighlighter) 444 if err != nil { 445 return nil, err 446 } 447 if req.Highlight.Style != nil { 448 highlighter, err = Config.Cache.HighlighterNamed(*req.Highlight.Style) 449 if err != nil { 450 return nil, err 451 } 452 } 453 if highlighter == nil { 454 return nil, fmt.Errorf("no highlighter named `%s` registered", *req.Highlight.Style) 455 } 456 } 457 458 for _, hit := range hits { 459 if len(req.Fields) > 0 || highlighter != nil { 460 doc, err := indexReader.Document(hit.ID) 461 if err == nil && doc != nil { 462 if len(req.Fields) > 0 { 463 for _, f := range req.Fields { 464 for _, docF := range doc.Fields { 465 if f == "*" || docF.Name() == f { 466 var value interface{} 467 switch docF := docF.(type) { 468 case *document.TextField: 469 value = string(docF.Value()) 470 case *document.NumericField: 471 num, err := docF.Number() 472 if err == nil { 473 value = num 474 } 475 case *document.DateTimeField: 476 datetime, err := docF.DateTime() 477 if err == nil { 478 value = datetime.Format(time.RFC3339) 479 } 480 case *document.BooleanField: 481 boolean, err := docF.Boolean() 482 if err == nil { 483 value = boolean 484 } 485 case *document.GeoPointField: 486 lon, err := docF.Lon() 487 if err == nil { 488 lat, err := docF.Lat() 489 if err == nil { 490 value = []float64{lon, lat} 491 } 492 } 493 } 494 if value != nil { 495 hit.AddFieldValue(docF.Name(), value) 496 } 497 } 498 } 499 } 500 } 501 if highlighter != nil { 502 highlightFields := req.Highlight.Fields 503 if highlightFields == nil { 504 // add all fields with matches 505 highlightFields = make([]string, 0, len(hit.Locations)) 506 for k := range hit.Locations { 507 highlightFields = append(highlightFields, k) 508 } 509 } 510 for _, hf := range highlightFields { 511 highlighter.BestFragmentsInField(hit, doc, hf, 1) 512 } 513 } 514 } else if doc == nil { 515 // unexpected case, a doc ID that was found as a search hit 516 // was unable to be found during document lookup 517 return nil, ErrorIndexReadInconsistency 518 } 519 } 520 if i.name != "" { 521 hit.Index = i.name 522 } 523 } 524 525 atomic.AddUint64(&i.stats.searches, 1) 526 searchDuration := time.Since(searchStart) 527 atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration)) 528 529 if Config.SlowSearchLogThreshold > 0 && 530 searchDuration > Config.SlowSearchLogThreshold { 531 logger.Printf("slow search took %s - %v", searchDuration, req) 532 } 533 534 return &SearchResult{ 535 Status: &SearchStatus{ 536 Total: 1, 537 Failed: 0, 538 Successful: 1, 539 Errors: make(map[string]error), 540 }, 541 Request: req, 542 Hits: hits, 543 Total: collector.Total(), 544 MaxScore: collector.MaxScore(), 545 Took: searchDuration, 546 Facets: collector.FacetResults(), 547 }, nil 548} 549 550// Fields returns the name of all the fields this 551// Index has operated on. 552func (i *indexImpl) Fields() (fields []string, err error) { 553 i.mutex.RLock() 554 defer i.mutex.RUnlock() 555 556 if !i.open { 557 return nil, ErrorIndexClosed 558 } 559 560 indexReader, err := i.i.Reader() 561 if err != nil { 562 return nil, err 563 } 564 defer func() { 565 if cerr := indexReader.Close(); err == nil && cerr != nil { 566 err = cerr 567 } 568 }() 569 570 fields, err = indexReader.Fields() 571 if err != nil { 572 return nil, err 573 } 574 return fields, nil 575} 576 577func (i *indexImpl) FieldDict(field string) (index.FieldDict, error) { 578 i.mutex.RLock() 579 580 if !i.open { 581 i.mutex.RUnlock() 582 return nil, ErrorIndexClosed 583 } 584 585 indexReader, err := i.i.Reader() 586 if err != nil { 587 i.mutex.RUnlock() 588 return nil, err 589 } 590 591 fieldDict, err := indexReader.FieldDict(field) 592 if err != nil { 593 i.mutex.RUnlock() 594 return nil, err 595 } 596 597 return &indexImplFieldDict{ 598 index: i, 599 indexReader: indexReader, 600 fieldDict: fieldDict, 601 }, nil 602} 603 604func (i *indexImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) { 605 i.mutex.RLock() 606 607 if !i.open { 608 i.mutex.RUnlock() 609 return nil, ErrorIndexClosed 610 } 611 612 indexReader, err := i.i.Reader() 613 if err != nil { 614 i.mutex.RUnlock() 615 return nil, err 616 } 617 618 fieldDict, err := indexReader.FieldDictRange(field, startTerm, endTerm) 619 if err != nil { 620 i.mutex.RUnlock() 621 return nil, err 622 } 623 624 return &indexImplFieldDict{ 625 index: i, 626 indexReader: indexReader, 627 fieldDict: fieldDict, 628 }, nil 629} 630 631func (i *indexImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) { 632 i.mutex.RLock() 633 634 if !i.open { 635 i.mutex.RUnlock() 636 return nil, ErrorIndexClosed 637 } 638 639 indexReader, err := i.i.Reader() 640 if err != nil { 641 i.mutex.RUnlock() 642 return nil, err 643 } 644 645 fieldDict, err := indexReader.FieldDictPrefix(field, termPrefix) 646 if err != nil { 647 i.mutex.RUnlock() 648 return nil, err 649 } 650 651 return &indexImplFieldDict{ 652 index: i, 653 indexReader: indexReader, 654 fieldDict: fieldDict, 655 }, nil 656} 657 658func (i *indexImpl) Close() error { 659 i.mutex.Lock() 660 defer i.mutex.Unlock() 661 662 indexStats.UnRegister(i) 663 664 i.open = false 665 return i.i.Close() 666} 667 668func (i *indexImpl) Stats() *IndexStat { 669 return i.stats 670} 671 672func (i *indexImpl) StatsMap() map[string]interface{} { 673 return i.stats.statsMap() 674} 675 676func (i *indexImpl) GetInternal(key []byte) (val []byte, err error) { 677 i.mutex.RLock() 678 defer i.mutex.RUnlock() 679 680 if !i.open { 681 return nil, ErrorIndexClosed 682 } 683 684 reader, err := i.i.Reader() 685 if err != nil { 686 return nil, err 687 } 688 defer func() { 689 if cerr := reader.Close(); err == nil && cerr != nil { 690 err = cerr 691 } 692 }() 693 694 val, err = reader.GetInternal(key) 695 if err != nil { 696 return nil, err 697 } 698 return val, nil 699} 700 701func (i *indexImpl) SetInternal(key, val []byte) error { 702 i.mutex.RLock() 703 defer i.mutex.RUnlock() 704 705 if !i.open { 706 return ErrorIndexClosed 707 } 708 709 return i.i.SetInternal(key, val) 710} 711 712func (i *indexImpl) DeleteInternal(key []byte) error { 713 i.mutex.RLock() 714 defer i.mutex.RUnlock() 715 716 if !i.open { 717 return ErrorIndexClosed 718 } 719 720 return i.i.DeleteInternal(key) 721} 722 723// NewBatch creates a new empty batch. 724func (i *indexImpl) NewBatch() *Batch { 725 return &Batch{ 726 index: i, 727 internal: index.NewBatch(), 728 } 729} 730 731func (i *indexImpl) Name() string { 732 return i.name 733} 734 735func (i *indexImpl) SetName(name string) { 736 indexStats.UnRegister(i) 737 i.name = name 738 indexStats.Register(i) 739} 740 741type indexImplFieldDict struct { 742 index *indexImpl 743 indexReader index.IndexReader 744 fieldDict index.FieldDict 745} 746 747func (f *indexImplFieldDict) Next() (*index.DictEntry, error) { 748 return f.fieldDict.Next() 749} 750 751func (f *indexImplFieldDict) Close() error { 752 defer f.index.mutex.RUnlock() 753 err := f.fieldDict.Close() 754 if err != nil { 755 return err 756 } 757 return f.indexReader.Close() 758} 759