1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"fmt"
21	"io"
22	"os"
23	"sync"
24
25	"github.com/RoaringBitmap/roaring"
26	"github.com/Smerity/govarint"
27	"github.com/blevesearch/bleve/index/scorch/segment"
28	"github.com/couchbase/vellum"
29	mmap "github.com/edsrzf/mmap-go"
30	"github.com/golang/snappy"
31)
32
33// Open returns a zap impl of a segment
34func Open(path string) (segment.Segment, error) {
35	f, err := os.Open(path)
36	if err != nil {
37		return nil, err
38	}
39	mm, err := mmap.Map(f, mmap.RDONLY, 0)
40	if err != nil {
41		// mmap failed, try to close the file
42		_ = f.Close()
43		return nil, err
44	}
45
46	rv := &Segment{
47		SegmentBase: SegmentBase{
48			mem:            mm[0 : len(mm)-FooterSize],
49			fieldsMap:      make(map[string]uint16),
50			fieldDvIterMap: make(map[uint16]*docValueIterator),
51		},
52		f:    f,
53		mm:   mm,
54		path: path,
55		refs: 1,
56	}
57
58	err = rv.loadConfig()
59	if err != nil {
60		_ = rv.Close()
61		return nil, err
62	}
63
64	err = rv.loadFields()
65	if err != nil {
66		_ = rv.Close()
67		return nil, err
68	}
69
70	err = rv.loadDvIterators()
71	if err != nil {
72		_ = rv.Close()
73		return nil, err
74	}
75
76	return rv, nil
77}
78
79// SegmentBase is a memory only, read-only implementation of the
80// segment.Segment interface, using zap's data representation.
81type SegmentBase struct {
82	mem               []byte
83	memCRC            uint32
84	chunkFactor       uint32
85	fieldsMap         map[string]uint16 // fieldName -> fieldID+1
86	fieldsInv         []string          // fieldID -> fieldName
87	numDocs           uint64
88	storedIndexOffset uint64
89	fieldsIndexOffset uint64
90	docValueOffset    uint64
91	dictLocs          []uint64
92	fieldDvIterMap    map[uint16]*docValueIterator // naive chunk cache per field
93}
94
95func (sb *SegmentBase) AddRef()             {}
96func (sb *SegmentBase) DecRef() (err error) { return nil }
97func (sb *SegmentBase) Close() (err error)  { return nil }
98
99// Segment implements a persisted segment.Segment interface, by
100// embedding an mmap()'ed SegmentBase.
101type Segment struct {
102	SegmentBase
103
104	f       *os.File
105	mm      mmap.MMap
106	path    string
107	version uint32
108	crc     uint32
109
110	m    sync.Mutex // Protects the fields that follow.
111	refs int64
112}
113
114func (s *Segment) SizeInBytes() uint64 {
115	// 8 /* size of file pointer */
116	// 4 /* size of version -> uint32 */
117	// 4 /* size of crc -> uint32 */
118	sizeOfUints := 16
119
120	sizeInBytes := (len(s.path) + int(segment.SizeOfString)) + sizeOfUints
121
122	// mutex, refs -> int64
123	sizeInBytes += 16
124
125	// do not include the mmap'ed part
126	return uint64(sizeInBytes) + s.SegmentBase.SizeInBytes() - uint64(len(s.mem))
127}
128
129func (s *SegmentBase) SizeInBytes() uint64 {
130	// 4 /* size of memCRC -> uint32 */
131	// 4 /* size of chunkFactor -> uint32 */
132	// 8 /* size of numDocs -> uint64 */
133	// 8 /* size of storedIndexOffset -> uint64 */
134	// 8 /* size of fieldsIndexOffset -> uint64 */
135	// 8 /* size of docValueOffset -> uint64 */
136	sizeInBytes := 40
137
138	sizeInBytes += len(s.mem) + int(segment.SizeOfSlice)
139
140	// fieldsMap
141	for k, _ := range s.fieldsMap {
142		sizeInBytes += (len(k) + int(segment.SizeOfString)) + 2 /* size of uint16 */
143	}
144	sizeInBytes += int(segment.SizeOfMap) /* overhead from map */
145
146	// fieldsInv, dictLocs
147	for _, entry := range s.fieldsInv {
148		sizeInBytes += (len(entry) + int(segment.SizeOfString))
149	}
150	sizeInBytes += len(s.dictLocs) * 8          /* size of uint64 */
151	sizeInBytes += int(segment.SizeOfSlice) * 3 /* overhead from slices */
152
153	// fieldDvIterMap
154	sizeInBytes += len(s.fieldDvIterMap) *
155		int(segment.SizeOfPointer+2 /* size of uint16 */)
156	for _, entry := range s.fieldDvIterMap {
157		if entry != nil {
158			sizeInBytes += int(entry.sizeInBytes())
159		}
160	}
161	sizeInBytes += int(segment.SizeOfMap)
162
163	return uint64(sizeInBytes)
164}
165
166func (s *Segment) AddRef() {
167	s.m.Lock()
168	s.refs++
169	s.m.Unlock()
170}
171
172func (s *Segment) DecRef() (err error) {
173	s.m.Lock()
174	s.refs--
175	if s.refs == 0 {
176		err = s.closeActual()
177	}
178	s.m.Unlock()
179	return err
180}
181
182func (s *Segment) loadConfig() error {
183	crcOffset := len(s.mm) - 4
184	s.crc = binary.BigEndian.Uint32(s.mm[crcOffset : crcOffset+4])
185
186	verOffset := crcOffset - 4
187	s.version = binary.BigEndian.Uint32(s.mm[verOffset : verOffset+4])
188	if s.version != version {
189		return fmt.Errorf("unsupported version %d", s.version)
190	}
191
192	chunkOffset := verOffset - 4
193	s.chunkFactor = binary.BigEndian.Uint32(s.mm[chunkOffset : chunkOffset+4])
194
195	docValueOffset := chunkOffset - 8
196	s.docValueOffset = binary.BigEndian.Uint64(s.mm[docValueOffset : docValueOffset+8])
197
198	fieldsIndexOffset := docValueOffset - 8
199	s.fieldsIndexOffset = binary.BigEndian.Uint64(s.mm[fieldsIndexOffset : fieldsIndexOffset+8])
200
201	storedIndexOffset := fieldsIndexOffset - 8
202	s.storedIndexOffset = binary.BigEndian.Uint64(s.mm[storedIndexOffset : storedIndexOffset+8])
203
204	numDocsOffset := storedIndexOffset - 8
205	s.numDocs = binary.BigEndian.Uint64(s.mm[numDocsOffset : numDocsOffset+8])
206	return nil
207}
208
209func (s *SegmentBase) loadFields() error {
210	// NOTE for now we assume the fields index immediately preceeds
211	// the footer, and if this changes, need to adjust accordingly (or
212	// store explicit length), where s.mem was sliced from s.mm in Open().
213	fieldsIndexEnd := uint64(len(s.mem))
214
215	// iterate through fields index
216	var fieldID uint64
217	for s.fieldsIndexOffset+(8*fieldID) < fieldsIndexEnd {
218		addr := binary.BigEndian.Uint64(s.mem[s.fieldsIndexOffset+(8*fieldID) : s.fieldsIndexOffset+(8*fieldID)+8])
219
220		dictLoc, read := binary.Uvarint(s.mem[addr:fieldsIndexEnd])
221		n := uint64(read)
222		s.dictLocs = append(s.dictLocs, dictLoc)
223
224		var nameLen uint64
225		nameLen, read = binary.Uvarint(s.mem[addr+n : fieldsIndexEnd])
226		n += uint64(read)
227
228		name := string(s.mem[addr+n : addr+n+nameLen])
229		s.fieldsInv = append(s.fieldsInv, name)
230		s.fieldsMap[name] = uint16(fieldID + 1)
231
232		fieldID++
233	}
234	return nil
235}
236
237// Dictionary returns the term dictionary for the specified field
238func (s *SegmentBase) Dictionary(field string) (segment.TermDictionary, error) {
239	dict, err := s.dictionary(field)
240	if err == nil && dict == nil {
241		return &segment.EmptyDictionary{}, nil
242	}
243	return dict, err
244}
245
246func (sb *SegmentBase) dictionary(field string) (rv *Dictionary, err error) {
247	fieldIDPlus1 := sb.fieldsMap[field]
248	if fieldIDPlus1 > 0 {
249		rv = &Dictionary{
250			sb:      sb,
251			field:   field,
252			fieldID: fieldIDPlus1 - 1,
253		}
254
255		dictStart := sb.dictLocs[rv.fieldID]
256		if dictStart > 0 {
257			// read the length of the vellum data
258			vellumLen, read := binary.Uvarint(sb.mem[dictStart : dictStart+binary.MaxVarintLen64])
259			fstBytes := sb.mem[dictStart+uint64(read) : dictStart+uint64(read)+vellumLen]
260			if fstBytes != nil {
261				rv.fst, err = vellum.Load(fstBytes)
262				if err != nil {
263					return nil, fmt.Errorf("dictionary field %s vellum err: %v", field, err)
264				}
265			}
266		}
267	}
268
269	return rv, nil
270}
271
272// VisitDocument invokes the DocFieldValueVistor for each stored field
273// for the specified doc number
274func (s *SegmentBase) VisitDocument(num uint64, visitor segment.DocumentFieldValueVisitor) error {
275	// first make sure this is a valid number in this segment
276	if num < s.numDocs {
277		meta, compressed := s.getDocStoredMetaAndCompressed(num)
278		uncompressed, err := snappy.Decode(nil, compressed)
279		if err != nil {
280			return err
281		}
282		// now decode meta and process
283		reader := bytes.NewReader(meta)
284		decoder := govarint.NewU64Base128Decoder(reader)
285
286		keepGoing := true
287		for keepGoing {
288			field, err := decoder.GetU64()
289			if err == io.EOF {
290				break
291			}
292			if err != nil {
293				return err
294			}
295			typ, err := decoder.GetU64()
296			if err != nil {
297				return err
298			}
299			offset, err := decoder.GetU64()
300			if err != nil {
301				return err
302			}
303			l, err := decoder.GetU64()
304			if err != nil {
305				return err
306			}
307			numap, err := decoder.GetU64()
308			if err != nil {
309				return err
310			}
311			var arrayPos []uint64
312			if numap > 0 {
313				arrayPos = make([]uint64, numap)
314				for i := 0; i < int(numap); i++ {
315					ap, err := decoder.GetU64()
316					if err != nil {
317						return err
318					}
319					arrayPos[i] = ap
320				}
321			}
322
323			value := uncompressed[offset : offset+l]
324			keepGoing = visitor(s.fieldsInv[field], byte(typ), value, arrayPos)
325		}
326	}
327	return nil
328}
329
330// Count returns the number of documents in this segment.
331func (s *SegmentBase) Count() uint64 {
332	return s.numDocs
333}
334
335// DocNumbers returns a bitset corresponding to the doc numbers of all the
336// provided _id strings
337func (s *SegmentBase) DocNumbers(ids []string) (*roaring.Bitmap, error) {
338	rv := roaring.New()
339
340	if len(s.fieldsMap) > 0 {
341		idDict, err := s.dictionary("_id")
342		if err != nil {
343			return nil, err
344		}
345
346		var postings *PostingsList
347		for _, id := range ids {
348			postings, err = idDict.postingsList([]byte(id), nil, postings)
349			if err != nil {
350				return nil, err
351			}
352			if postings.postings != nil {
353				rv.Or(postings.postings)
354			}
355		}
356	}
357
358	return rv, nil
359}
360
361// Fields returns the field names used in this segment
362func (s *SegmentBase) Fields() []string {
363	return s.fieldsInv
364}
365
366// Path returns the path of this segment on disk
367func (s *Segment) Path() string {
368	return s.path
369}
370
371// Close releases all resources associated with this segment
372func (s *Segment) Close() (err error) {
373	return s.DecRef()
374}
375
376func (s *Segment) closeActual() (err error) {
377	if s.mm != nil {
378		err = s.mm.Unmap()
379	}
380	// try to close file even if unmap failed
381	if s.f != nil {
382		err2 := s.f.Close()
383		if err == nil {
384			// try to return first error
385			err = err2
386		}
387	}
388	return
389}
390
391// some helpers i started adding for the command-line utility
392
393// Data returns the underlying mmaped data slice
394func (s *Segment) Data() []byte {
395	return s.mm
396}
397
398// CRC returns the CRC value stored in the file footer
399func (s *Segment) CRC() uint32 {
400	return s.crc
401}
402
403// Version returns the file version in the file footer
404func (s *Segment) Version() uint32 {
405	return s.version
406}
407
408// ChunkFactor returns the chunk factor in the file footer
409func (s *Segment) ChunkFactor() uint32 {
410	return s.chunkFactor
411}
412
413// FieldsIndexOffset returns the fields index offset in the file footer
414func (s *Segment) FieldsIndexOffset() uint64 {
415	return s.fieldsIndexOffset
416}
417
418// StoredIndexOffset returns the stored value index offset in the file footer
419func (s *Segment) StoredIndexOffset() uint64 {
420	return s.storedIndexOffset
421}
422
423// DocValueOffset returns the docValue offset in the file footer
424func (s *Segment) DocValueOffset() uint64 {
425	return s.docValueOffset
426}
427
428// NumDocs returns the number of documents in the file footer
429func (s *Segment) NumDocs() uint64 {
430	return s.numDocs
431}
432
433// DictAddr is a helper function to compute the file offset where the
434// dictionary is stored for the specified field.
435func (s *Segment) DictAddr(field string) (uint64, error) {
436	fieldIDPlus1, ok := s.fieldsMap[field]
437	if !ok {
438		return 0, fmt.Errorf("no such field '%s'", field)
439	}
440
441	return s.dictLocs[fieldIDPlus1-1], nil
442}
443
444func (s *SegmentBase) loadDvIterators() error {
445	if s.docValueOffset == fieldNotUninverted {
446		return nil
447	}
448
449	var read uint64
450	for fieldID, field := range s.fieldsInv {
451		fieldLoc, n := binary.Uvarint(s.mem[s.docValueOffset+read : s.docValueOffset+read+binary.MaxVarintLen64])
452		if n <= 0 {
453			return fmt.Errorf("loadDvIterators: failed to read the docvalue offsets for field %d", fieldID)
454		}
455		s.fieldDvIterMap[uint16(fieldID)], _ = s.loadFieldDocValueIterator(field, fieldLoc)
456		read += uint64(n)
457	}
458	return nil
459}
460