1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"io"
21	"math"
22)
23
24const termNotEncoded = math.MaxUint64
25
26type chunkedIntCoder struct {
27	final     []byte
28	chunkSize uint64
29	chunkBuf  bytes.Buffer
30	chunkLens []uint64
31	currChunk uint64
32
33	buf []byte
34}
35
36// newChunkedIntCoder returns a new chunk int coder which packs data into
37// chunks based on the provided chunkSize and supports up to the specified
38// maxDocNum
39func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder {
40	total := maxDocNum/chunkSize + 1
41	rv := &chunkedIntCoder{
42		chunkSize: chunkSize,
43		chunkLens: make([]uint64, total),
44		final:     make([]byte, 0, 64),
45	}
46
47	return rv
48}
49
50// Reset lets you reuse this chunked int coder.  buffers are reset and reused
51// from previous use.  you cannot change the chunk size or max doc num.
52func (c *chunkedIntCoder) Reset() {
53	c.final = c.final[:0]
54	c.chunkBuf.Reset()
55	c.currChunk = 0
56	for i := range c.chunkLens {
57		c.chunkLens[i] = 0
58	}
59}
60
61// SetChunkSize changes the chunk size.  It is only valid to do so
62// with a new chunkedIntCoder, or immediately after calling Reset()
63func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
64	total := int(maxDocNum/chunkSize + 1)
65	c.chunkSize = chunkSize
66	if cap(c.chunkLens) < total {
67		c.chunkLens = make([]uint64, total)
68	} else {
69		c.chunkLens = c.chunkLens[:total]
70	}
71}
72
73// Add encodes the provided integers into the correct chunk for the provided
74// doc num.  You MUST call Add() with increasing docNums.
75func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
76	chunk := docNum / c.chunkSize
77	if chunk != c.currChunk {
78		// starting a new chunk
79		c.Close()
80		c.chunkBuf.Reset()
81		c.currChunk = chunk
82	}
83
84	if len(c.buf) < binary.MaxVarintLen64 {
85		c.buf = make([]byte, binary.MaxVarintLen64)
86	}
87
88	for _, val := range vals {
89		wb := binary.PutUvarint(c.buf, val)
90		_, err := c.chunkBuf.Write(c.buf[:wb])
91		if err != nil {
92			return err
93		}
94	}
95
96	return nil
97}
98
99func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error {
100	chunk := docNum / c.chunkSize
101	if chunk != c.currChunk {
102		// starting a new chunk
103		c.Close()
104		c.chunkBuf.Reset()
105		c.currChunk = chunk
106	}
107
108	_, err := c.chunkBuf.Write(buf)
109	return err
110}
111
112// Close indicates you are done calling Add() this allows the final chunk
113// to be encoded.
114func (c *chunkedIntCoder) Close() {
115	encodingBytes := c.chunkBuf.Bytes()
116	c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
117	c.final = append(c.final, encodingBytes...)
118	c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close
119}
120
121// Write commits all the encoded chunked integers to the provided writer.
122func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
123	bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
124	if len(c.buf) < bufNeeded {
125		c.buf = make([]byte, bufNeeded)
126	}
127	buf := c.buf
128
129	// convert the chunk lengths into chunk offsets
130	chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
131
132	// write out the number of chunks & each chunk offsets
133	n := binary.PutUvarint(buf, uint64(len(chunkOffsets)))
134	for _, chunkOffset := range chunkOffsets {
135		n += binary.PutUvarint(buf[n:], chunkOffset)
136	}
137
138	tw, err := w.Write(buf[:n])
139	if err != nil {
140		return tw, err
141	}
142
143	// write out the data
144	nw, err := w.Write(c.final)
145	tw += nw
146	if err != nil {
147		return tw, err
148	}
149	return tw, nil
150}
151
152// writeAt commits all the encoded chunked integers to the provided writer
153// and returns the starting offset, total bytes written and an error
154func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) {
155	startOffset := uint64(termNotEncoded)
156	if len(c.final) <= 0 {
157		return startOffset, 0, nil
158	}
159
160	if chw := w.(*CountHashWriter); chw != nil {
161		startOffset = uint64(chw.Count())
162	}
163
164	tw, err := c.Write(w)
165	return startOffset, tw, err
166}
167
168func (c *chunkedIntCoder) FinalSize() int {
169	return len(c.final)
170}
171
172// modifyLengthsToEndOffsets converts the chunk length array
173// to a chunk offset array. The readChunkBoundary
174// will figure out the start and end of every chunk from
175// these offsets. Starting offset of i'th index is stored
176// in i-1'th position except for 0'th index and ending offset
177// is stored at i'th index position.
178// For 0'th element, starting position is always zero.
179// eg:
180// Lens ->  5 5 5 5 => 5 10 15 20
181// Lens ->  0 5 0 5 => 0 5 5 10
182// Lens ->  0 0 0 5 => 0 0 0 5
183// Lens ->  5 0 0 0 => 5 5 5 5
184// Lens ->  0 5 0 0 => 0 5 5 5
185// Lens ->  0 0 5 0 => 0 0 5 5
186func modifyLengthsToEndOffsets(lengths []uint64) []uint64 {
187	var runningOffset uint64
188	var index, i int
189	for i = 1; i <= len(lengths); i++ {
190		runningOffset += lengths[i-1]
191		lengths[index] = runningOffset
192		index++
193	}
194	return lengths
195}
196
197func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) {
198	var start uint64
199	if chunk > 0 {
200		start = offsets[chunk-1]
201	}
202	return start, offsets[chunk]
203}
204