1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"io"
21	"reflect"
22
23	"github.com/golang/snappy"
24)
25
26var reflectStaticSizeMetaData int
27
28func init() {
29	var md MetaData
30	reflectStaticSizeMetaData = int(reflect.TypeOf(md).Size())
31}
32
33var termSeparator byte = 0xff
34var termSeparatorSplitSlice = []byte{termSeparator}
35
36type chunkedContentCoder struct {
37	final     []byte
38	chunkSize uint64
39	currChunk uint64
40	chunkLens []uint64
41
42	w                io.Writer
43	progressiveWrite bool
44
45	chunkMetaBuf bytes.Buffer
46	chunkBuf     bytes.Buffer
47
48	chunkMeta []MetaData
49
50	compressed []byte // temp buf for snappy compression
51}
52
53// MetaData represents the data information inside a
54// chunk.
55type MetaData struct {
56	DocNum      uint64 // docNum of the data inside the chunk
57	DocDvOffset uint64 // offset of data inside the chunk for the given docid
58}
59
60// newChunkedContentCoder returns a new chunk content coder which
61// packs data into chunks based on the provided chunkSize
62func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64,
63	w io.Writer, progressiveWrite bool) *chunkedContentCoder {
64	total := maxDocNum/chunkSize + 1
65	rv := &chunkedContentCoder{
66		chunkSize:        chunkSize,
67		chunkLens:        make([]uint64, total),
68		chunkMeta:        make([]MetaData, 0, total),
69		w:                w,
70		progressiveWrite: progressiveWrite,
71	}
72
73	return rv
74}
75
76// Reset lets you reuse this chunked content coder. Buffers are reset
77// and re used. You cannot change the chunk size.
78func (c *chunkedContentCoder) Reset() {
79	c.currChunk = 0
80	c.final = c.final[:0]
81	c.chunkBuf.Reset()
82	c.chunkMetaBuf.Reset()
83	for i := range c.chunkLens {
84		c.chunkLens[i] = 0
85	}
86	c.chunkMeta = c.chunkMeta[:0]
87}
88
89func (c *chunkedContentCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
90	total := int(maxDocNum/chunkSize + 1)
91	c.chunkSize = chunkSize
92	if cap(c.chunkLens) < total {
93		c.chunkLens = make([]uint64, total)
94	} else {
95		c.chunkLens = c.chunkLens[:total]
96	}
97	if cap(c.chunkMeta) < total {
98		c.chunkMeta = make([]MetaData, 0, total)
99	}
100}
101
102// Close indicates you are done calling Add() this allows
103// the final chunk to be encoded.
104func (c *chunkedContentCoder) Close() error {
105	return c.flushContents()
106}
107
108func (c *chunkedContentCoder) flushContents() error {
109	// flush the contents, with meta information at first
110	buf := make([]byte, binary.MaxVarintLen64)
111	n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
112	_, err := c.chunkMetaBuf.Write(buf[:n])
113	if err != nil {
114		return err
115	}
116
117	// write out the metaData slice
118	for _, meta := range c.chunkMeta {
119		_, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset)
120		if err != nil {
121			return err
122		}
123	}
124
125	// write the metadata to final data
126	metaData := c.chunkMetaBuf.Bytes()
127	c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
128	// write the compressed data to the final data
129	c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
130	c.final = append(c.final, c.compressed...)
131
132	c.chunkLens[c.currChunk] = uint64(len(c.compressed) + len(metaData))
133
134	if c.progressiveWrite {
135		_, err := c.w.Write(c.final)
136		if err != nil {
137			return err
138		}
139		c.final = c.final[:0]
140	}
141
142	return nil
143}
144
145// Add encodes the provided byte slice into the correct chunk for the provided
146// doc num.  You MUST call Add() with increasing docNums.
147func (c *chunkedContentCoder) Add(docNum uint64, vals []byte) error {
148	chunk := docNum / c.chunkSize
149	if chunk != c.currChunk {
150		// flush out the previous chunk details
151		err := c.flushContents()
152		if err != nil {
153			return err
154		}
155		// clearing the chunk specific meta for next chunk
156		c.chunkBuf.Reset()
157		c.chunkMetaBuf.Reset()
158		c.chunkMeta = c.chunkMeta[:0]
159		c.currChunk = chunk
160	}
161
162	// get the starting offset for this doc
163	dvOffset := c.chunkBuf.Len()
164	dvSize, err := c.chunkBuf.Write(vals)
165	if err != nil {
166		return err
167	}
168
169	c.chunkMeta = append(c.chunkMeta, MetaData{
170		DocNum:      docNum,
171		DocDvOffset: uint64(dvOffset + dvSize),
172	})
173	return nil
174}
175
176// Write commits all the encoded chunked contents to the provided writer.
177//
178// | ..... data ..... | chunk offsets (varints)
179// | position of chunk offsets (uint64) | number of offsets (uint64) |
180//
181func (c *chunkedContentCoder) Write() (int, error) {
182	var tw int
183
184	if c.final != nil {
185		// write out the data section first
186		nw, err := c.w.Write(c.final)
187		tw += nw
188		if err != nil {
189			return tw, err
190		}
191	}
192
193	chunkOffsetsStart := uint64(tw)
194
195	if cap(c.final) < binary.MaxVarintLen64 {
196		c.final = make([]byte, binary.MaxVarintLen64)
197	} else {
198		c.final = c.final[0:binary.MaxVarintLen64]
199	}
200	chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
201	// write out the chunk offsets
202	for _, chunkOffset := range chunkOffsets {
203		n := binary.PutUvarint(c.final, chunkOffset)
204		nw, err := c.w.Write(c.final[:n])
205		tw += nw
206		if err != nil {
207			return tw, err
208		}
209	}
210
211	chunkOffsetsLen := uint64(tw) - chunkOffsetsStart
212
213	c.final = c.final[0:8]
214	// write out the length of chunk offsets
215	binary.BigEndian.PutUint64(c.final, chunkOffsetsLen)
216	nw, err := c.w.Write(c.final)
217	tw += nw
218	if err != nil {
219		return tw, err
220	}
221
222	// write out the number of chunks
223	binary.BigEndian.PutUint64(c.final, uint64(len(c.chunkLens)))
224	nw, err = c.w.Write(c.final)
225	tw += nw
226	if err != nil {
227		return tw, err
228	}
229
230	c.final = c.final[:0]
231
232	return tw, nil
233}
234
235// ReadDocValueBoundary elicits the start, end offsets from a
236// metaData header slice
237func ReadDocValueBoundary(chunk int, metaHeaders []MetaData) (uint64, uint64) {
238	var start uint64
239	if chunk > 0 {
240		start = metaHeaders[chunk-1].DocDvOffset
241	}
242	return start, metaHeaders[chunk].DocDvOffset
243}
244