1//  Copyright (c) 2014 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package bleve
16
17import (
18	"sort"
19	"sync"
20	"time"
21
22	"golang.org/x/net/context"
23
24	"github.com/blevesearch/bleve/document"
25	"github.com/blevesearch/bleve/index"
26	"github.com/blevesearch/bleve/index/store"
27	"github.com/blevesearch/bleve/mapping"
28	"github.com/blevesearch/bleve/search"
29)
30
31type indexAliasImpl struct {
32	name    string
33	indexes []Index
34	mutex   sync.RWMutex
35	open    bool
36}
37
38// NewIndexAlias creates a new IndexAlias over the provided
39// Index objects.
40func NewIndexAlias(indexes ...Index) *indexAliasImpl {
41	return &indexAliasImpl{
42		name:    "alias",
43		indexes: indexes,
44		open:    true,
45	}
46}
47
48func (i *indexAliasImpl) isAliasToSingleIndex() error {
49	if len(i.indexes) < 1 {
50		return ErrorAliasEmpty
51	} else if len(i.indexes) > 1 {
52		return ErrorAliasMulti
53	}
54	return nil
55}
56
57func (i *indexAliasImpl) Index(id string, data interface{}) error {
58	i.mutex.RLock()
59	defer i.mutex.RUnlock()
60
61	if !i.open {
62		return ErrorIndexClosed
63	}
64
65	err := i.isAliasToSingleIndex()
66	if err != nil {
67		return err
68	}
69
70	return i.indexes[0].Index(id, data)
71}
72
73func (i *indexAliasImpl) Delete(id string) error {
74	i.mutex.RLock()
75	defer i.mutex.RUnlock()
76
77	if !i.open {
78		return ErrorIndexClosed
79	}
80
81	err := i.isAliasToSingleIndex()
82	if err != nil {
83		return err
84	}
85
86	return i.indexes[0].Delete(id)
87}
88
89func (i *indexAliasImpl) Batch(b *Batch) error {
90	i.mutex.RLock()
91	defer i.mutex.RUnlock()
92
93	if !i.open {
94		return ErrorIndexClosed
95	}
96
97	err := i.isAliasToSingleIndex()
98	if err != nil {
99		return err
100	}
101
102	return i.indexes[0].Batch(b)
103}
104
105func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
106	i.mutex.RLock()
107	defer i.mutex.RUnlock()
108
109	if !i.open {
110		return nil, ErrorIndexClosed
111	}
112
113	err := i.isAliasToSingleIndex()
114	if err != nil {
115		return nil, err
116	}
117
118	return i.indexes[0].Document(id)
119}
120
121func (i *indexAliasImpl) DocCount() (uint64, error) {
122	i.mutex.RLock()
123	defer i.mutex.RUnlock()
124
125	rv := uint64(0)
126
127	if !i.open {
128		return 0, ErrorIndexClosed
129	}
130
131	for _, index := range i.indexes {
132		otherCount, err := index.DocCount()
133		if err == nil {
134			rv += otherCount
135		}
136		// tolerate errors to produce partial counts
137	}
138
139	return rv, nil
140}
141
142func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
143	return i.SearchInContext(context.Background(), req)
144}
145
146func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
147	i.mutex.RLock()
148	defer i.mutex.RUnlock()
149
150	if !i.open {
151		return nil, ErrorIndexClosed
152	}
153
154	if len(i.indexes) < 1 {
155		return nil, ErrorAliasEmpty
156	}
157
158	// short circuit the simple case
159	if len(i.indexes) == 1 {
160		return i.indexes[0].SearchInContext(ctx, req)
161	}
162
163	return MultiSearch(ctx, req, i.indexes...)
164}
165
166func (i *indexAliasImpl) Fields() ([]string, error) {
167	i.mutex.RLock()
168	defer i.mutex.RUnlock()
169
170	if !i.open {
171		return nil, ErrorIndexClosed
172	}
173
174	err := i.isAliasToSingleIndex()
175	if err != nil {
176		return nil, err
177	}
178
179	return i.indexes[0].Fields()
180}
181
182func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
183	i.mutex.RLock()
184
185	if !i.open {
186		i.mutex.RUnlock()
187		return nil, ErrorIndexClosed
188	}
189
190	err := i.isAliasToSingleIndex()
191	if err != nil {
192		i.mutex.RUnlock()
193		return nil, err
194	}
195
196	fieldDict, err := i.indexes[0].FieldDict(field)
197	if err != nil {
198		i.mutex.RUnlock()
199		return nil, err
200	}
201
202	return &indexAliasImplFieldDict{
203		index:     i,
204		fieldDict: fieldDict,
205	}, nil
206}
207
208func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
209	i.mutex.RLock()
210
211	if !i.open {
212		i.mutex.RUnlock()
213		return nil, ErrorIndexClosed
214	}
215
216	err := i.isAliasToSingleIndex()
217	if err != nil {
218		i.mutex.RUnlock()
219		return nil, err
220	}
221
222	fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
223	if err != nil {
224		i.mutex.RUnlock()
225		return nil, err
226	}
227
228	return &indexAliasImplFieldDict{
229		index:     i,
230		fieldDict: fieldDict,
231	}, nil
232}
233
234func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
235	i.mutex.RLock()
236
237	if !i.open {
238		i.mutex.RUnlock()
239		return nil, ErrorIndexClosed
240	}
241
242	err := i.isAliasToSingleIndex()
243	if err != nil {
244		i.mutex.RUnlock()
245		return nil, err
246	}
247
248	fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
249	if err != nil {
250		i.mutex.RUnlock()
251		return nil, err
252	}
253
254	return &indexAliasImplFieldDict{
255		index:     i,
256		fieldDict: fieldDict,
257	}, nil
258}
259
260func (i *indexAliasImpl) Close() error {
261	i.mutex.Lock()
262	defer i.mutex.Unlock()
263
264	i.open = false
265	return nil
266}
267
268func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
269	i.mutex.RLock()
270	defer i.mutex.RUnlock()
271
272	if !i.open {
273		return nil
274	}
275
276	err := i.isAliasToSingleIndex()
277	if err != nil {
278		return nil
279	}
280
281	return i.indexes[0].Mapping()
282}
283
284func (i *indexAliasImpl) Stats() *IndexStat {
285	i.mutex.RLock()
286	defer i.mutex.RUnlock()
287
288	if !i.open {
289		return nil
290	}
291
292	err := i.isAliasToSingleIndex()
293	if err != nil {
294		return nil
295	}
296
297	return i.indexes[0].Stats()
298}
299
300func (i *indexAliasImpl) StatsMap() map[string]interface{} {
301	i.mutex.RLock()
302	defer i.mutex.RUnlock()
303
304	if !i.open {
305		return nil
306	}
307
308	err := i.isAliasToSingleIndex()
309	if err != nil {
310		return nil
311	}
312
313	return i.indexes[0].StatsMap()
314}
315
316func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
317	i.mutex.RLock()
318	defer i.mutex.RUnlock()
319
320	if !i.open {
321		return nil, ErrorIndexClosed
322	}
323
324	err := i.isAliasToSingleIndex()
325	if err != nil {
326		return nil, err
327	}
328
329	return i.indexes[0].GetInternal(key)
330}
331
332func (i *indexAliasImpl) SetInternal(key, val []byte) error {
333	i.mutex.RLock()
334	defer i.mutex.RUnlock()
335
336	if !i.open {
337		return ErrorIndexClosed
338	}
339
340	err := i.isAliasToSingleIndex()
341	if err != nil {
342		return err
343	}
344
345	return i.indexes[0].SetInternal(key, val)
346}
347
348func (i *indexAliasImpl) DeleteInternal(key []byte) error {
349	i.mutex.RLock()
350	defer i.mutex.RUnlock()
351
352	if !i.open {
353		return ErrorIndexClosed
354	}
355
356	err := i.isAliasToSingleIndex()
357	if err != nil {
358		return err
359	}
360
361	return i.indexes[0].DeleteInternal(key)
362}
363
364func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
365	i.mutex.RLock()
366	defer i.mutex.RUnlock()
367
368	if !i.open {
369		return nil, nil, ErrorIndexClosed
370	}
371
372	err := i.isAliasToSingleIndex()
373	if err != nil {
374		return nil, nil, err
375	}
376
377	return i.indexes[0].Advanced()
378}
379
380func (i *indexAliasImpl) Add(indexes ...Index) {
381	i.mutex.Lock()
382	defer i.mutex.Unlock()
383
384	i.indexes = append(i.indexes, indexes...)
385}
386
387func (i *indexAliasImpl) removeSingle(index Index) {
388	for pos, in := range i.indexes {
389		if in == index {
390			i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
391			break
392		}
393	}
394}
395
396func (i *indexAliasImpl) Remove(indexes ...Index) {
397	i.mutex.Lock()
398	defer i.mutex.Unlock()
399
400	for _, in := range indexes {
401		i.removeSingle(in)
402	}
403}
404
405func (i *indexAliasImpl) Swap(in, out []Index) {
406	i.mutex.Lock()
407	defer i.mutex.Unlock()
408
409	// add
410	i.indexes = append(i.indexes, in...)
411
412	// delete
413	for _, ind := range out {
414		i.removeSingle(ind)
415	}
416}
417
418// createChildSearchRequest creates a separate
419// request from the original
420// For now, avoid data race on req structure.
421// TODO disable highlight/field load on child
422// requests, and add code to do this only on
423// the actual final results.
424// Perhaps that part needs to be optional,
425// could be slower in remote usages.
426func createChildSearchRequest(req *SearchRequest) *SearchRequest {
427	rv := SearchRequest{
428		Query:            req.Query,
429		Size:             req.Size + req.From,
430		From:             0,
431		Highlight:        req.Highlight,
432		Fields:           req.Fields,
433		Facets:           req.Facets,
434		Explain:          req.Explain,
435		Sort:             req.Sort.Copy(),
436		IncludeLocations: req.IncludeLocations,
437	}
438	return &rv
439}
440
441type asyncSearchResult struct {
442	Name   string
443	Result *SearchResult
444	Err    error
445}
446
447// MultiSearch executes a SearchRequest across multiple Index objects,
448// then merges the results.  The indexes must honor any ctx deadline.
449func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
450
451	searchStart := time.Now()
452	asyncResults := make(chan *asyncSearchResult, len(indexes))
453
454	// run search on each index in separate go routine
455	var waitGroup sync.WaitGroup
456
457	var searchChildIndex = func(in Index, childReq *SearchRequest) {
458		rv := asyncSearchResult{Name: in.Name()}
459		rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
460		asyncResults <- &rv
461		waitGroup.Done()
462	}
463
464	waitGroup.Add(len(indexes))
465	for _, in := range indexes {
466		go searchChildIndex(in, createChildSearchRequest(req))
467	}
468
469	// on another go routine, close after finished
470	go func() {
471		waitGroup.Wait()
472		close(asyncResults)
473	}()
474
475	var sr *SearchResult
476	indexErrors := make(map[string]error)
477
478	for asr := range asyncResults {
479		if asr.Err == nil {
480			if sr == nil {
481				// first result
482				sr = asr.Result
483			} else {
484				// merge with previous
485				sr.Merge(asr.Result)
486			}
487		} else {
488			indexErrors[asr.Name] = asr.Err
489		}
490	}
491
492	// merge just concatenated all the hits
493	// now lets clean it up
494
495	// handle case where no results were successful
496	if sr == nil {
497		sr = &SearchResult{
498			Status: &SearchStatus{
499				Errors: make(map[string]error),
500			},
501		}
502	}
503
504	// sort all hits with the requested order
505	if len(req.Sort) > 0 {
506		sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
507		sort.Sort(sorter)
508	}
509
510	// now skip over the correct From
511	if req.From > 0 && len(sr.Hits) > req.From {
512		sr.Hits = sr.Hits[req.From:]
513	} else if req.From > 0 {
514		sr.Hits = search.DocumentMatchCollection{}
515	}
516
517	// now trim to the correct size
518	if req.Size > 0 && len(sr.Hits) > req.Size {
519		sr.Hits = sr.Hits[0:req.Size]
520	}
521
522	// fix up facets
523	for name, fr := range req.Facets {
524		sr.Facets.Fixup(name, fr.Size)
525	}
526
527	// fix up original request
528	sr.Request = req
529	searchDuration := time.Since(searchStart)
530	sr.Took = searchDuration
531
532	// fix up errors
533	if len(indexErrors) > 0 {
534		if sr.Status.Errors == nil {
535			sr.Status.Errors = make(map[string]error)
536		}
537		for indexName, indexErr := range indexErrors {
538			sr.Status.Errors[indexName] = indexErr
539			sr.Status.Total++
540			sr.Status.Failed++
541		}
542	}
543
544	return sr, nil
545}
546
547func (i *indexAliasImpl) NewBatch() *Batch {
548	i.mutex.RLock()
549	defer i.mutex.RUnlock()
550
551	if !i.open {
552		return nil
553	}
554
555	err := i.isAliasToSingleIndex()
556	if err != nil {
557		return nil
558	}
559
560	return i.indexes[0].NewBatch()
561}
562
563func (i *indexAliasImpl) Name() string {
564	return i.name
565}
566
567func (i *indexAliasImpl) SetName(name string) {
568	i.name = name
569}
570
571type indexAliasImplFieldDict struct {
572	index     *indexAliasImpl
573	fieldDict index.FieldDict
574}
575
576func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
577	return f.fieldDict.Next()
578}
579
580func (f *indexAliasImplFieldDict) Close() error {
581	defer f.index.mutex.RUnlock()
582	return f.fieldDict.Close()
583}
584
585type multiSearchHitSorter struct {
586	hits          search.DocumentMatchCollection
587	sort          search.SortOrder
588	cachedScoring []bool
589	cachedDesc    []bool
590}
591
592func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
593	return &multiSearchHitSorter{
594		sort:          sort,
595		hits:          hits,
596		cachedScoring: sort.CacheIsScore(),
597		cachedDesc:    sort.CacheDescending(),
598	}
599}
600
601func (m *multiSearchHitSorter) Len() int      { return len(m.hits) }
602func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
603func (m *multiSearchHitSorter) Less(i, j int) bool {
604	c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
605	return c < 0
606}
607