1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"fmt"
21	"math"
22	"reflect"
23	"sort"
24
25	"github.com/blevesearch/bleve/index"
26	"github.com/blevesearch/bleve/index/scorch/segment"
27	"github.com/blevesearch/bleve/size"
28	"github.com/golang/snappy"
29)
30
31var reflectStaticSizedocValueReader int
32
33func init() {
34	var dvi docValueReader
35	reflectStaticSizedocValueReader = int(reflect.TypeOf(dvi).Size())
36}
37
38type docNumTermsVisitor func(docNum uint64, terms []byte) error
39
40type docVisitState struct {
41	dvrs    map[uint16]*docValueReader
42	segment *SegmentBase
43}
44
45type docValueReader struct {
46	field          string
47	curChunkNum    uint64
48	chunkOffsets   []uint64
49	dvDataLoc      uint64
50	curChunkHeader []MetaData
51	curChunkData   []byte // compressed data cache
52	uncompressed   []byte // temp buf for snappy decompression
53}
54
55func (di *docValueReader) size() int {
56	return reflectStaticSizedocValueReader + size.SizeOfPtr +
57		len(di.field) +
58		len(di.chunkOffsets)*size.SizeOfUint64 +
59		len(di.curChunkHeader)*reflectStaticSizeMetaData +
60		len(di.curChunkData)
61}
62
63func (di *docValueReader) cloneInto(rv *docValueReader) *docValueReader {
64	if rv == nil {
65		rv = &docValueReader{}
66	}
67
68	rv.field = di.field
69	rv.curChunkNum = math.MaxUint64
70	rv.chunkOffsets = di.chunkOffsets // immutable, so it's sharable
71	rv.dvDataLoc = di.dvDataLoc
72	rv.curChunkHeader = rv.curChunkHeader[:0]
73	rv.curChunkData = nil
74	rv.uncompressed = rv.uncompressed[:0]
75
76	return rv
77}
78
79func (di *docValueReader) curChunkNumber() uint64 {
80	return di.curChunkNum
81}
82
83func (s *SegmentBase) loadFieldDocValueReader(field string,
84	fieldDvLocStart, fieldDvLocEnd uint64) (*docValueReader, error) {
85	// get the docValue offset for the given fields
86	if fieldDvLocStart == fieldNotUninverted {
87		// no docValues found, nothing to do
88		return nil, nil
89	}
90
91	// read the number of chunks, and chunk offsets position
92	var numChunks, chunkOffsetsPosition uint64
93
94	if fieldDvLocEnd-fieldDvLocStart > 16 {
95		numChunks = binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-8 : fieldDvLocEnd])
96		// read the length of chunk offsets
97		chunkOffsetsLen := binary.BigEndian.Uint64(s.mem[fieldDvLocEnd-16 : fieldDvLocEnd-8])
98		// acquire position of chunk offsets
99		chunkOffsetsPosition = (fieldDvLocEnd - 16) - chunkOffsetsLen
100	} else {
101		return nil, fmt.Errorf("loadFieldDocValueReader: fieldDvLoc too small: %d-%d", fieldDvLocEnd, fieldDvLocStart)
102	}
103
104	fdvIter := &docValueReader{
105		curChunkNum:  math.MaxUint64,
106		field:        field,
107		chunkOffsets: make([]uint64, int(numChunks)),
108	}
109
110	// read the chunk offsets
111	var offset uint64
112	for i := 0; i < int(numChunks); i++ {
113		loc, read := binary.Uvarint(s.mem[chunkOffsetsPosition+offset : chunkOffsetsPosition+offset+binary.MaxVarintLen64])
114		if read <= 0 {
115			return nil, fmt.Errorf("corrupted chunk offset during segment load")
116		}
117		fdvIter.chunkOffsets[i] = loc
118		offset += uint64(read)
119	}
120
121	// set the data offset
122	fdvIter.dvDataLoc = fieldDvLocStart
123
124	return fdvIter, nil
125}
126
127func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error {
128	// advance to the chunk where the docValues
129	// reside for the given docNum
130	destChunkDataLoc, curChunkEnd := di.dvDataLoc, di.dvDataLoc
131	start, end := readChunkBoundary(int(chunkNumber), di.chunkOffsets)
132	if start >= end {
133		di.curChunkHeader = di.curChunkHeader[:0]
134		di.curChunkData = nil
135		di.curChunkNum = chunkNumber
136		di.uncompressed = di.uncompressed[:0]
137		return nil
138	}
139
140	destChunkDataLoc += start
141	curChunkEnd += end
142
143	// read the number of docs reside in the chunk
144	numDocs, read := binary.Uvarint(s.mem[destChunkDataLoc : destChunkDataLoc+binary.MaxVarintLen64])
145	if read <= 0 {
146		return fmt.Errorf("failed to read the chunk")
147	}
148	chunkMetaLoc := destChunkDataLoc + uint64(read)
149
150	offset := uint64(0)
151	if cap(di.curChunkHeader) < int(numDocs) {
152		di.curChunkHeader = make([]MetaData, int(numDocs))
153	} else {
154		di.curChunkHeader = di.curChunkHeader[:int(numDocs)]
155	}
156	for i := 0; i < int(numDocs); i++ {
157		di.curChunkHeader[i].DocNum, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
158		offset += uint64(read)
159		di.curChunkHeader[i].DocDvOffset, read = binary.Uvarint(s.mem[chunkMetaLoc+offset : chunkMetaLoc+offset+binary.MaxVarintLen64])
160		offset += uint64(read)
161	}
162
163	compressedDataLoc := chunkMetaLoc + offset
164	dataLength := curChunkEnd - compressedDataLoc
165	di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
166	di.curChunkNum = chunkNumber
167	di.uncompressed = di.uncompressed[:0]
168	return nil
169}
170
171func (di *docValueReader) iterateAllDocValues(s *SegmentBase, visitor docNumTermsVisitor) error {
172	for i := 0; i < len(di.chunkOffsets); i++ {
173		err := di.loadDvChunk(uint64(i), s)
174		if err != nil {
175			return err
176		}
177		if di.curChunkData == nil || len(di.curChunkHeader) == 0 {
178			continue
179		}
180
181		// uncompress the already loaded data
182		uncompressed, err := snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData)
183		if err != nil {
184			return err
185		}
186		di.uncompressed = uncompressed
187
188		start := uint64(0)
189		for _, entry := range di.curChunkHeader {
190			err = visitor(entry.DocNum, uncompressed[start:entry.DocDvOffset])
191			if err != nil {
192				return err
193			}
194
195			start = entry.DocDvOffset
196		}
197	}
198
199	return nil
200}
201
202func (di *docValueReader) visitDocValues(docNum uint64,
203	visitor index.DocumentFieldTermVisitor) error {
204	// binary search the term locations for the docNum
205	start, end := di.getDocValueLocs(docNum)
206	if start == math.MaxUint64 || end == math.MaxUint64 || start == end {
207		return nil
208	}
209
210	var uncompressed []byte
211	var err error
212	// use the uncompressed copy if available
213	if len(di.uncompressed) > 0 {
214		uncompressed = di.uncompressed
215	} else {
216		// uncompress the already loaded data
217		uncompressed, err = snappy.Decode(di.uncompressed[:cap(di.uncompressed)], di.curChunkData)
218		if err != nil {
219			return err
220		}
221		di.uncompressed = uncompressed
222	}
223
224	// pick the terms for the given docNum
225	uncompressed = uncompressed[start:end]
226	for {
227		i := bytes.Index(uncompressed, termSeparatorSplitSlice)
228		if i < 0 {
229			break
230		}
231
232		visitor(di.field, uncompressed[0:i])
233		uncompressed = uncompressed[i+1:]
234	}
235
236	return nil
237}
238
239func (di *docValueReader) getDocValueLocs(docNum uint64) (uint64, uint64) {
240	i := sort.Search(len(di.curChunkHeader), func(i int) bool {
241		return di.curChunkHeader[i].DocNum >= docNum
242	})
243	if i < len(di.curChunkHeader) && di.curChunkHeader[i].DocNum == docNum {
244		return ReadDocValueBoundary(i, di.curChunkHeader)
245	}
246	return math.MaxUint64, math.MaxUint64
247}
248
249// VisitDocumentFieldTerms is an implementation of the
250// DocumentFieldTermVisitable interface
251func (s *SegmentBase) VisitDocumentFieldTerms(localDocNum uint64, fields []string,
252	visitor index.DocumentFieldTermVisitor, dvsIn segment.DocVisitState) (
253	segment.DocVisitState, error) {
254	dvs, ok := dvsIn.(*docVisitState)
255	if !ok || dvs == nil {
256		dvs = &docVisitState{}
257	} else {
258		if dvs.segment != s {
259			dvs.segment = s
260			dvs.dvrs = nil
261		}
262	}
263
264	var fieldIDPlus1 uint16
265	if dvs.dvrs == nil {
266		dvs.dvrs = make(map[uint16]*docValueReader, len(fields))
267		for _, field := range fields {
268			if fieldIDPlus1, ok = s.fieldsMap[field]; !ok {
269				continue
270			}
271			fieldID := fieldIDPlus1 - 1
272			if dvIter, exists := s.fieldDvReaders[fieldID]; exists &&
273				dvIter != nil {
274				dvs.dvrs[fieldID] = dvIter.cloneInto(dvs.dvrs[fieldID])
275			}
276		}
277	}
278
279	// find the chunkNumber where the docValues are stored
280	docInChunk := localDocNum / uint64(s.chunkFactor)
281	var dvr *docValueReader
282	for _, field := range fields {
283		if fieldIDPlus1, ok = s.fieldsMap[field]; !ok {
284			continue
285		}
286		fieldID := fieldIDPlus1 - 1
287		if dvr, ok = dvs.dvrs[fieldID]; ok && dvr != nil {
288			// check if the chunk is already loaded
289			if docInChunk != dvr.curChunkNumber() {
290				err := dvr.loadDvChunk(docInChunk, s)
291				if err != nil {
292					return dvs, err
293				}
294			}
295
296			_ = dvr.visitDocValues(localDocNum, visitor)
297		}
298	}
299	return dvs, nil
300}
301
302// VisitableDocValueFields returns the list of fields with
303// persisted doc value terms ready to be visitable using the
304// VisitDocumentFieldTerms method.
305func (s *SegmentBase) VisitableDocValueFields() ([]string, error) {
306	return s.fieldDvNames, nil
307}
308