1//  Copyright (c) 2014 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bleve
16
17import (
18	"encoding/json"
19	"fmt"
20	"os"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"golang.org/x/net/context"
26
27	"github.com/blevesearch/bleve/document"
28	"github.com/blevesearch/bleve/index"
29	"github.com/blevesearch/bleve/index/store"
30	"github.com/blevesearch/bleve/index/upsidedown"
31	"github.com/blevesearch/bleve/mapping"
32	"github.com/blevesearch/bleve/registry"
33	"github.com/blevesearch/bleve/search"
34	"github.com/blevesearch/bleve/search/collector"
35	"github.com/blevesearch/bleve/search/facet"
36	"github.com/blevesearch/bleve/search/highlight"
37)
38
39type indexImpl struct {
40	path  string
41	name  string
42	meta  *indexMeta
43	i     index.Index
44	m     mapping.IndexMapping
45	mutex sync.RWMutex
46	open  bool
47	stats *IndexStat
48}
49
50const storePath = "store"
51
52var mappingInternalKey = []byte("_mapping")
53
54func indexStorePath(path string) string {
55	return path + string(os.PathSeparator) + storePath
56}
57
58func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) {
59	// first validate the mapping
60	err := mapping.Validate()
61	if err != nil {
62		return nil, err
63	}
64
65	if kvconfig == nil {
66		kvconfig = map[string]interface{}{}
67	}
68
69	if kvstore == "" {
70		return nil, fmt.Errorf("bleve not configured for file based indexing")
71	}
72
73	rv := indexImpl{
74		path: path,
75		name: path,
76		m:    mapping,
77		meta: newIndexMeta(indexType, kvstore, kvconfig),
78	}
79	rv.stats = &IndexStat{i: &rv}
80	// at this point there is hope that we can be successful, so save index meta
81	if path != "" {
82		err = rv.meta.Save(path)
83		if err != nil {
84			return nil, err
85		}
86		kvconfig["create_if_missing"] = true
87		kvconfig["error_if_exists"] = true
88		kvconfig["path"] = indexStorePath(path)
89	} else {
90		kvconfig["path"] = ""
91	}
92
93	// open the index
94	indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
95	if indexTypeConstructor == nil {
96		return nil, ErrorUnknownIndexType
97	}
98
99	rv.i, err = indexTypeConstructor(rv.meta.Storage, kvconfig, Config.analysisQueue)
100	if err != nil {
101		return nil, err
102	}
103	err = rv.i.Open()
104	if err != nil {
105		if err == index.ErrorUnknownStorageType {
106			return nil, ErrorUnknownStorageType
107		}
108		return nil, err
109	}
110
111	// now persist the mapping
112	mappingBytes, err := json.Marshal(mapping)
113	if err != nil {
114		return nil, err
115	}
116	err = rv.i.SetInternal(mappingInternalKey, mappingBytes)
117	if err != nil {
118		return nil, err
119	}
120
121	// mark the index as open
122	rv.mutex.Lock()
123	defer rv.mutex.Unlock()
124	rv.open = true
125	indexStats.Register(&rv)
126	return &rv, nil
127}
128
129func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *indexImpl, err error) {
130	rv = &indexImpl{
131		path: path,
132		name: path,
133	}
134	rv.stats = &IndexStat{i: rv}
135
136	rv.meta, err = openIndexMeta(path)
137	if err != nil {
138		return nil, err
139	}
140
141	// backwards compatibility if index type is missing
142	if rv.meta.IndexType == "" {
143		rv.meta.IndexType = upsidedown.Name
144	}
145
146	storeConfig := rv.meta.Config
147	if storeConfig == nil {
148		storeConfig = map[string]interface{}{}
149	}
150
151	storeConfig["path"] = indexStorePath(path)
152	storeConfig["create_if_missing"] = false
153	storeConfig["error_if_exists"] = false
154	for rck, rcv := range runtimeConfig {
155		storeConfig[rck] = rcv
156	}
157
158	// open the index
159	indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
160	if indexTypeConstructor == nil {
161		return nil, ErrorUnknownIndexType
162	}
163
164	rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue)
165	if err != nil {
166		return nil, err
167	}
168	err = rv.i.Open()
169	if err != nil {
170		if err == index.ErrorUnknownStorageType {
171			return nil, ErrorUnknownStorageType
172		}
173		return nil, err
174	}
175
176	// now load the mapping
177	indexReader, err := rv.i.Reader()
178	if err != nil {
179		return nil, err
180	}
181	defer func() {
182		if cerr := indexReader.Close(); cerr != nil && err == nil {
183			err = cerr
184		}
185	}()
186
187	mappingBytes, err := indexReader.GetInternal(mappingInternalKey)
188	if err != nil {
189		return nil, err
190	}
191
192	var im *mapping.IndexMappingImpl
193	err = json.Unmarshal(mappingBytes, &im)
194	if err != nil {
195		return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes))
196	}
197
198	// mark the index as open
199	rv.mutex.Lock()
200	defer rv.mutex.Unlock()
201	rv.open = true
202
203	// validate the mapping
204	err = im.Validate()
205	if err != nil {
206		// note even if the mapping is invalid
207		// we still return an open usable index
208		return rv, err
209	}
210
211	rv.m = im
212	indexStats.Register(rv)
213	return rv, err
214}
215
216// Advanced returns implementation internals
217// necessary ONLY for advanced usage.
218func (i *indexImpl) Advanced() (index.Index, store.KVStore, error) {
219	s, err := i.i.Advanced()
220	if err != nil {
221		return nil, nil, err
222	}
223	return i.i, s, nil
224}
225
226// Mapping returns the IndexMapping in use by this
227// Index.
228func (i *indexImpl) Mapping() mapping.IndexMapping {
229	return i.m
230}
231
232// Index the object with the specified identifier.
233// The IndexMapping for this index will determine
234// how the object is indexed.
235func (i *indexImpl) Index(id string, data interface{}) (err error) {
236	if id == "" {
237		return ErrorEmptyID
238	}
239
240	i.mutex.RLock()
241	defer i.mutex.RUnlock()
242
243	if !i.open {
244		return ErrorIndexClosed
245	}
246
247	doc := document.NewDocument(id)
248	err = i.m.MapDocument(doc, data)
249	if err != nil {
250		return
251	}
252	err = i.i.Update(doc)
253	return
254}
255
256// IndexAdvanced takes a document.Document object
257// skips the mapping and indexes it.
258func (i *indexImpl) IndexAdvanced(doc *document.Document) (err error) {
259	if doc.ID == "" {
260		return ErrorEmptyID
261	}
262
263	i.mutex.RLock()
264	defer i.mutex.RUnlock()
265
266	if !i.open {
267		return ErrorIndexClosed
268	}
269
270	err = i.i.Update(doc)
271	return
272}
273
274// Delete entries for the specified identifier from
275// the index.
276func (i *indexImpl) Delete(id string) (err error) {
277	if id == "" {
278		return ErrorEmptyID
279	}
280
281	i.mutex.RLock()
282	defer i.mutex.RUnlock()
283
284	if !i.open {
285		return ErrorIndexClosed
286	}
287
288	err = i.i.Delete(id)
289	return
290}
291
292// Batch executes multiple Index and Delete
293// operations at the same time.  There are often
294// significant performance benefits when performing
295// operations in a batch.
296func (i *indexImpl) Batch(b *Batch) error {
297	i.mutex.RLock()
298	defer i.mutex.RUnlock()
299
300	if !i.open {
301		return ErrorIndexClosed
302	}
303
304	return i.i.Batch(b.internal)
305}
306
307// Document is used to find the values of all the
308// stored fields for a document in the index.  These
309// stored fields are put back into a Document object
310// and returned.
311func (i *indexImpl) Document(id string) (doc *document.Document, err error) {
312	i.mutex.RLock()
313	defer i.mutex.RUnlock()
314
315	if !i.open {
316		return nil, ErrorIndexClosed
317	}
318	indexReader, err := i.i.Reader()
319	if err != nil {
320		return nil, err
321	}
322	defer func() {
323		if cerr := indexReader.Close(); err == nil && cerr != nil {
324			err = cerr
325		}
326	}()
327
328	doc, err = indexReader.Document(id)
329	if err != nil {
330		return nil, err
331	}
332	return doc, nil
333}
334
335// DocCount returns the number of documents in the
336// index.
337func (i *indexImpl) DocCount() (count uint64, err error) {
338	i.mutex.RLock()
339	defer i.mutex.RUnlock()
340
341	if !i.open {
342		return 0, ErrorIndexClosed
343	}
344
345	// open a reader for this search
346	indexReader, err := i.i.Reader()
347	if err != nil {
348		return 0, fmt.Errorf("error opening index reader %v", err)
349	}
350	defer func() {
351		if cerr := indexReader.Close(); err == nil && cerr != nil {
352			err = cerr
353		}
354	}()
355
356	count, err = indexReader.DocCount()
357	return
358}
359
360// Search executes a search request operation.
361// Returns a SearchResult object or an error.
362func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) {
363	return i.SearchInContext(context.Background(), req)
364}
365
366// SearchInContext executes a search request operation within the provided
367// Context.  Returns a SearchResult object or an error.
368func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr *SearchResult, err error) {
369	i.mutex.RLock()
370	defer i.mutex.RUnlock()
371
372	searchStart := time.Now()
373
374	if !i.open {
375		return nil, ErrorIndexClosed
376	}
377
378	collector := collector.NewTopNCollector(req.Size, req.From, req.Sort)
379
380	// open a reader for this search
381	indexReader, err := i.i.Reader()
382	if err != nil {
383		return nil, fmt.Errorf("error opening index reader %v", err)
384	}
385	defer func() {
386		if cerr := indexReader.Close(); err == nil && cerr != nil {
387			err = cerr
388		}
389	}()
390
391	searcher, err := req.Query.Searcher(indexReader, i.m, search.SearcherOptions{
392		Explain:            req.Explain,
393		IncludeTermVectors: req.IncludeLocations || req.Highlight != nil,
394	})
395	if err != nil {
396		return nil, err
397	}
398	defer func() {
399		if serr := searcher.Close(); err == nil && serr != nil {
400			err = serr
401		}
402	}()
403
404	if req.Facets != nil {
405		facetsBuilder := search.NewFacetsBuilder(indexReader)
406		for facetName, facetRequest := range req.Facets {
407			if facetRequest.NumericRanges != nil {
408				// build numeric range facet
409				facetBuilder := facet.NewNumericFacetBuilder(facetRequest.Field, facetRequest.Size)
410				for _, nr := range facetRequest.NumericRanges {
411					facetBuilder.AddRange(nr.Name, nr.Min, nr.Max)
412				}
413				facetsBuilder.Add(facetName, facetBuilder)
414			} else if facetRequest.DateTimeRanges != nil {
415				// build date range facet
416				facetBuilder := facet.NewDateTimeFacetBuilder(facetRequest.Field, facetRequest.Size)
417				dateTimeParser := i.m.DateTimeParserNamed("")
418				for _, dr := range facetRequest.DateTimeRanges {
419					start, end := dr.ParseDates(dateTimeParser)
420					facetBuilder.AddRange(dr.Name, start, end)
421				}
422				facetsBuilder.Add(facetName, facetBuilder)
423			} else {
424				// build terms facet
425				facetBuilder := facet.NewTermsFacetBuilder(facetRequest.Field, facetRequest.Size)
426				facetsBuilder.Add(facetName, facetBuilder)
427			}
428		}
429		collector.SetFacetsBuilder(facetsBuilder)
430	}
431
432	err = collector.Collect(ctx, searcher, indexReader)
433	if err != nil {
434		return nil, err
435	}
436
437	hits := collector.Results()
438
439	var highlighter highlight.Highlighter
440
441	if req.Highlight != nil {
442		// get the right highlighter
443		highlighter, err = Config.Cache.HighlighterNamed(Config.DefaultHighlighter)
444		if err != nil {
445			return nil, err
446		}
447		if req.Highlight.Style != nil {
448			highlighter, err = Config.Cache.HighlighterNamed(*req.Highlight.Style)
449			if err != nil {
450				return nil, err
451			}
452		}
453		if highlighter == nil {
454			return nil, fmt.Errorf("no highlighter named `%s` registered", *req.Highlight.Style)
455		}
456	}
457
458	for _, hit := range hits {
459		if len(req.Fields) > 0 || highlighter != nil {
460			doc, err := indexReader.Document(hit.ID)
461			if err == nil && doc != nil {
462				if len(req.Fields) > 0 {
463					for _, f := range req.Fields {
464						for _, docF := range doc.Fields {
465							if f == "*" || docF.Name() == f {
466								var value interface{}
467								switch docF := docF.(type) {
468								case *document.TextField:
469									value = string(docF.Value())
470								case *document.NumericField:
471									num, err := docF.Number()
472									if err == nil {
473										value = num
474									}
475								case *document.DateTimeField:
476									datetime, err := docF.DateTime()
477									if err == nil {
478										value = datetime.Format(time.RFC3339)
479									}
480								case *document.BooleanField:
481									boolean, err := docF.Boolean()
482									if err == nil {
483										value = boolean
484									}
485								case *document.GeoPointField:
486									lon, err := docF.Lon()
487									if err == nil {
488										lat, err := docF.Lat()
489										if err == nil {
490											value = []float64{lon, lat}
491										}
492									}
493								}
494								if value != nil {
495									hit.AddFieldValue(docF.Name(), value)
496								}
497							}
498						}
499					}
500				}
501				if highlighter != nil {
502					highlightFields := req.Highlight.Fields
503					if highlightFields == nil {
504						// add all fields with matches
505						highlightFields = make([]string, 0, len(hit.Locations))
506						for k := range hit.Locations {
507							highlightFields = append(highlightFields, k)
508						}
509					}
510					for _, hf := range highlightFields {
511						highlighter.BestFragmentsInField(hit, doc, hf, 1)
512					}
513				}
514			} else if doc == nil {
515				// unexpected case, a doc ID that was found as a search hit
516				// was unable to be found during document lookup
517				return nil, ErrorIndexReadInconsistency
518			}
519		}
520		if i.name != "" {
521			hit.Index = i.name
522		}
523	}
524
525	atomic.AddUint64(&i.stats.searches, 1)
526	searchDuration := time.Since(searchStart)
527	atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration))
528
529	if Config.SlowSearchLogThreshold > 0 &&
530		searchDuration > Config.SlowSearchLogThreshold {
531		logger.Printf("slow search took %s - %v", searchDuration, req)
532	}
533
534	return &SearchResult{
535		Status: &SearchStatus{
536			Total:      1,
537			Failed:     0,
538			Successful: 1,
539			Errors:     make(map[string]error),
540		},
541		Request:  req,
542		Hits:     hits,
543		Total:    collector.Total(),
544		MaxScore: collector.MaxScore(),
545		Took:     searchDuration,
546		Facets:   collector.FacetResults(),
547	}, nil
548}
549
550// Fields returns the name of all the fields this
551// Index has operated on.
552func (i *indexImpl) Fields() (fields []string, err error) {
553	i.mutex.RLock()
554	defer i.mutex.RUnlock()
555
556	if !i.open {
557		return nil, ErrorIndexClosed
558	}
559
560	indexReader, err := i.i.Reader()
561	if err != nil {
562		return nil, err
563	}
564	defer func() {
565		if cerr := indexReader.Close(); err == nil && cerr != nil {
566			err = cerr
567		}
568	}()
569
570	fields, err = indexReader.Fields()
571	if err != nil {
572		return nil, err
573	}
574	return fields, nil
575}
576
577func (i *indexImpl) FieldDict(field string) (index.FieldDict, error) {
578	i.mutex.RLock()
579
580	if !i.open {
581		i.mutex.RUnlock()
582		return nil, ErrorIndexClosed
583	}
584
585	indexReader, err := i.i.Reader()
586	if err != nil {
587		i.mutex.RUnlock()
588		return nil, err
589	}
590
591	fieldDict, err := indexReader.FieldDict(field)
592	if err != nil {
593		i.mutex.RUnlock()
594		return nil, err
595	}
596
597	return &indexImplFieldDict{
598		index:       i,
599		indexReader: indexReader,
600		fieldDict:   fieldDict,
601	}, nil
602}
603
604func (i *indexImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
605	i.mutex.RLock()
606
607	if !i.open {
608		i.mutex.RUnlock()
609		return nil, ErrorIndexClosed
610	}
611
612	indexReader, err := i.i.Reader()
613	if err != nil {
614		i.mutex.RUnlock()
615		return nil, err
616	}
617
618	fieldDict, err := indexReader.FieldDictRange(field, startTerm, endTerm)
619	if err != nil {
620		i.mutex.RUnlock()
621		return nil, err
622	}
623
624	return &indexImplFieldDict{
625		index:       i,
626		indexReader: indexReader,
627		fieldDict:   fieldDict,
628	}, nil
629}
630
631func (i *indexImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
632	i.mutex.RLock()
633
634	if !i.open {
635		i.mutex.RUnlock()
636		return nil, ErrorIndexClosed
637	}
638
639	indexReader, err := i.i.Reader()
640	if err != nil {
641		i.mutex.RUnlock()
642		return nil, err
643	}
644
645	fieldDict, err := indexReader.FieldDictPrefix(field, termPrefix)
646	if err != nil {
647		i.mutex.RUnlock()
648		return nil, err
649	}
650
651	return &indexImplFieldDict{
652		index:       i,
653		indexReader: indexReader,
654		fieldDict:   fieldDict,
655	}, nil
656}
657
658func (i *indexImpl) Close() error {
659	i.mutex.Lock()
660	defer i.mutex.Unlock()
661
662	indexStats.UnRegister(i)
663
664	i.open = false
665	return i.i.Close()
666}
667
668func (i *indexImpl) Stats() *IndexStat {
669	return i.stats
670}
671
672func (i *indexImpl) StatsMap() map[string]interface{} {
673	return i.stats.statsMap()
674}
675
676func (i *indexImpl) GetInternal(key []byte) (val []byte, err error) {
677	i.mutex.RLock()
678	defer i.mutex.RUnlock()
679
680	if !i.open {
681		return nil, ErrorIndexClosed
682	}
683
684	reader, err := i.i.Reader()
685	if err != nil {
686		return nil, err
687	}
688	defer func() {
689		if cerr := reader.Close(); err == nil && cerr != nil {
690			err = cerr
691		}
692	}()
693
694	val, err = reader.GetInternal(key)
695	if err != nil {
696		return nil, err
697	}
698	return val, nil
699}
700
701func (i *indexImpl) SetInternal(key, val []byte) error {
702	i.mutex.RLock()
703	defer i.mutex.RUnlock()
704
705	if !i.open {
706		return ErrorIndexClosed
707	}
708
709	return i.i.SetInternal(key, val)
710}
711
712func (i *indexImpl) DeleteInternal(key []byte) error {
713	i.mutex.RLock()
714	defer i.mutex.RUnlock()
715
716	if !i.open {
717		return ErrorIndexClosed
718	}
719
720	return i.i.DeleteInternal(key)
721}
722
723// NewBatch creates a new empty batch.
724func (i *indexImpl) NewBatch() *Batch {
725	return &Batch{
726		index:    i,
727		internal: index.NewBatch(),
728	}
729}
730
731func (i *indexImpl) Name() string {
732	return i.name
733}
734
735func (i *indexImpl) SetName(name string) {
736	indexStats.UnRegister(i)
737	i.name = name
738	indexStats.Register(i)
739}
740
741type indexImplFieldDict struct {
742	index       *indexImpl
743	indexReader index.IndexReader
744	fieldDict   index.FieldDict
745}
746
747func (f *indexImplFieldDict) Next() (*index.DictEntry, error) {
748	return f.fieldDict.Next()
749}
750
751func (f *indexImplFieldDict) Close() error {
752	defer f.index.mutex.RUnlock()
753	err := f.fieldDict.Close()
754	if err != nil {
755		return err
756	}
757	return f.indexReader.Close()
758}
759