1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"io"
21)
22
23// We can safely use 0 to represent termNotEncoded since 0
24// could never be a valid address for term location information.
25// (stored field index is always non-empty and earlier in the
26// file)
27const termNotEncoded = 0
28
29type chunkedIntCoder struct {
30	final     []byte
31	chunkSize uint64
32	chunkBuf  bytes.Buffer
33	chunkLens []uint64
34	currChunk uint64
35
36	buf []byte
37}
38
39// newChunkedIntCoder returns a new chunk int coder which packs data into
40// chunks based on the provided chunkSize and supports up to the specified
41// maxDocNum
42func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder {
43	total := maxDocNum/chunkSize + 1
44	rv := &chunkedIntCoder{
45		chunkSize: chunkSize,
46		chunkLens: make([]uint64, total),
47		final:     make([]byte, 0, 64),
48	}
49
50	return rv
51}
52
53// Reset lets you reuse this chunked int coder.  buffers are reset and reused
54// from previous use.  you cannot change the chunk size or max doc num.
55func (c *chunkedIntCoder) Reset() {
56	c.final = c.final[:0]
57	c.chunkBuf.Reset()
58	c.currChunk = 0
59	for i := range c.chunkLens {
60		c.chunkLens[i] = 0
61	}
62}
63
64// SetChunkSize changes the chunk size.  It is only valid to do so
65// with a new chunkedIntCoder, or immediately after calling Reset()
66func (c *chunkedIntCoder) SetChunkSize(chunkSize uint64, maxDocNum uint64) {
67	total := int(maxDocNum/chunkSize + 1)
68	c.chunkSize = chunkSize
69	if cap(c.chunkLens) < total {
70		c.chunkLens = make([]uint64, total)
71	} else {
72		c.chunkLens = c.chunkLens[:total]
73	}
74}
75
76// Add encodes the provided integers into the correct chunk for the provided
77// doc num.  You MUST call Add() with increasing docNums.
78func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
79	chunk := docNum / c.chunkSize
80	if chunk != c.currChunk {
81		// starting a new chunk
82		c.Close()
83		c.chunkBuf.Reset()
84		c.currChunk = chunk
85	}
86
87	if len(c.buf) < binary.MaxVarintLen64 {
88		c.buf = make([]byte, binary.MaxVarintLen64)
89	}
90
91	for _, val := range vals {
92		wb := binary.PutUvarint(c.buf, val)
93		_, err := c.chunkBuf.Write(c.buf[:wb])
94		if err != nil {
95			return err
96		}
97	}
98
99	return nil
100}
101
102func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error {
103	chunk := docNum / c.chunkSize
104	if chunk != c.currChunk {
105		// starting a new chunk
106		c.Close()
107		c.chunkBuf.Reset()
108		c.currChunk = chunk
109	}
110
111	_, err := c.chunkBuf.Write(buf)
112	return err
113}
114
115// Close indicates you are done calling Add() this allows the final chunk
116// to be encoded.
117func (c *chunkedIntCoder) Close() {
118	encodingBytes := c.chunkBuf.Bytes()
119	c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
120	c.final = append(c.final, encodingBytes...)
121	c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close
122}
123
124// Write commits all the encoded chunked integers to the provided writer.
125func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
126	bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
127	if len(c.buf) < bufNeeded {
128		c.buf = make([]byte, bufNeeded)
129	}
130	buf := c.buf
131
132	// convert the chunk lengths into chunk offsets
133	chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
134
135	// write out the number of chunks & each chunk offsets
136	n := binary.PutUvarint(buf, uint64(len(chunkOffsets)))
137	for _, chunkOffset := range chunkOffsets {
138		n += binary.PutUvarint(buf[n:], chunkOffset)
139	}
140
141	tw, err := w.Write(buf[:n])
142	if err != nil {
143		return tw, err
144	}
145
146	// write out the data
147	nw, err := w.Write(c.final)
148	tw += nw
149	if err != nil {
150		return tw, err
151	}
152	return tw, nil
153}
154
155// writeAt commits all the encoded chunked integers to the provided writer
156// and returns the starting offset, total bytes written and an error
157func (c *chunkedIntCoder) writeAt(w io.Writer) (uint64, int, error) {
158	startOffset := uint64(termNotEncoded)
159	if len(c.final) <= 0 {
160		return startOffset, 0, nil
161	}
162
163	if chw := w.(*CountHashWriter); chw != nil {
164		startOffset = uint64(chw.Count())
165	}
166
167	tw, err := c.Write(w)
168	return startOffset, tw, err
169}
170
171func (c *chunkedIntCoder) FinalSize() int {
172	return len(c.final)
173}
174
175// modifyLengthsToEndOffsets converts the chunk length array
176// to a chunk offset array. The readChunkBoundary
177// will figure out the start and end of every chunk from
178// these offsets. Starting offset of i'th index is stored
179// in i-1'th position except for 0'th index and ending offset
180// is stored at i'th index position.
181// For 0'th element, starting position is always zero.
182// eg:
183// Lens ->  5 5 5 5 => 5 10 15 20
184// Lens ->  0 5 0 5 => 0 5 5 10
185// Lens ->  0 0 0 5 => 0 0 0 5
186// Lens ->  5 0 0 0 => 5 5 5 5
187// Lens ->  0 5 0 0 => 0 5 5 5
188// Lens ->  0 0 5 0 => 0 0 5 5
189func modifyLengthsToEndOffsets(lengths []uint64) []uint64 {
190	var runningOffset uint64
191	var index, i int
192	for i = 1; i <= len(lengths); i++ {
193		runningOffset += lengths[i-1]
194		lengths[index] = runningOffset
195		index++
196	}
197	return lengths
198}
199
200func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) {
201	var start uint64
202	if chunk > 0 {
203		start = offsets[chunk-1]
204	}
205	return start, offsets[chunk]
206}
207