1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19	"encoding/binary"
20	"io"
21)
22
23type chunkedIntCoder struct {
24	final     []byte
25	chunkSize uint64
26	chunkBuf  bytes.Buffer
27	chunkLens []uint64
28	currChunk uint64
29
30	buf []byte
31}
32
33// newChunkedIntCoder returns a new chunk int coder which packs data into
34// chunks based on the provided chunkSize and supports up to the specified
35// maxDocNum
36func newChunkedIntCoder(chunkSize uint64, maxDocNum uint64) *chunkedIntCoder {
37	total := maxDocNum/chunkSize + 1
38	rv := &chunkedIntCoder{
39		chunkSize: chunkSize,
40		chunkLens: make([]uint64, total),
41		final:     make([]byte, 0, 64),
42	}
43
44	return rv
45}
46
47// Reset lets you reuse this chunked int coder.  buffers are reset and reused
48// from previous use.  you cannot change the chunk size or max doc num.
49func (c *chunkedIntCoder) Reset() {
50	c.final = c.final[:0]
51	c.chunkBuf.Reset()
52	c.currChunk = 0
53	for i := range c.chunkLens {
54		c.chunkLens[i] = 0
55	}
56}
57
58// Add encodes the provided integers into the correct chunk for the provided
59// doc num.  You MUST call Add() with increasing docNums.
60func (c *chunkedIntCoder) Add(docNum uint64, vals ...uint64) error {
61	chunk := docNum / c.chunkSize
62	if chunk != c.currChunk {
63		// starting a new chunk
64		c.Close()
65		c.chunkBuf.Reset()
66		c.currChunk = chunk
67	}
68
69	if len(c.buf) < binary.MaxVarintLen64 {
70		c.buf = make([]byte, binary.MaxVarintLen64)
71	}
72
73	for _, val := range vals {
74		wb := binary.PutUvarint(c.buf, val)
75		_, err := c.chunkBuf.Write(c.buf[:wb])
76		if err != nil {
77			return err
78		}
79	}
80
81	return nil
82}
83
84func (c *chunkedIntCoder) AddBytes(docNum uint64, buf []byte) error {
85	chunk := docNum / c.chunkSize
86	if chunk != c.currChunk {
87		// starting a new chunk
88		c.Close()
89		c.chunkBuf.Reset()
90		c.currChunk = chunk
91	}
92
93	_, err := c.chunkBuf.Write(buf)
94	return err
95}
96
97// Close indicates you are done calling Add() this allows the final chunk
98// to be encoded.
99func (c *chunkedIntCoder) Close() {
100	encodingBytes := c.chunkBuf.Bytes()
101	c.chunkLens[c.currChunk] = uint64(len(encodingBytes))
102	c.final = append(c.final, encodingBytes...)
103	c.currChunk = uint64(cap(c.chunkLens)) // sentinel to detect double close
104}
105
106// Write commits all the encoded chunked integers to the provided writer.
107func (c *chunkedIntCoder) Write(w io.Writer) (int, error) {
108	bufNeeded := binary.MaxVarintLen64 * (1 + len(c.chunkLens))
109	if len(c.buf) < bufNeeded {
110		c.buf = make([]byte, bufNeeded)
111	}
112	buf := c.buf
113
114	// convert the chunk lengths into chunk offsets
115	chunkOffsets := modifyLengthsToEndOffsets(c.chunkLens)
116
117	// write out the number of chunks & each chunk offsets
118	n := binary.PutUvarint(buf, uint64(len(chunkOffsets)))
119	for _, chunkOffset := range chunkOffsets {
120		n += binary.PutUvarint(buf[n:], chunkOffset)
121	}
122
123	tw, err := w.Write(buf[:n])
124	if err != nil {
125		return tw, err
126	}
127
128	// write out the data
129	nw, err := w.Write(c.final)
130	tw += nw
131	if err != nil {
132		return tw, err
133	}
134	return tw, nil
135}
136
137func (c *chunkedIntCoder) FinalSize() int {
138	return len(c.final)
139}
140
141// modifyLengthsToEndOffsets converts the chunk length array
142// to a chunk offset array. The readChunkBoundary
143// will figure out the start and end of every chunk from
144// these offsets. Starting offset of i'th index is stored
145// in i-1'th position except for 0'th index and ending offset
146// is stored at i'th index position.
147// For 0'th element, starting position is always zero.
148// eg:
149// Lens ->  5 5 5 5 => 5 10 15 20
150// Lens ->  0 5 0 5 => 0 5 5 10
151// Lens ->  0 0 0 5 => 0 0 0 5
152// Lens ->  5 0 0 0 => 5 5 5 5
153// Lens ->  0 5 0 0 => 0 5 5 5
154// Lens ->  0 0 5 0 => 0 0 5 5
155func modifyLengthsToEndOffsets(lengths []uint64) []uint64 {
156	var runningOffset uint64
157	var index, i int
158	for i = 1; i <= len(lengths); i++ {
159		runningOffset += lengths[i-1]
160		lengths[index] = runningOffset
161		index++
162	}
163	return lengths
164}
165
166func readChunkBoundary(chunk int, offsets []uint64) (uint64, uint64) {
167	var start uint64
168	if chunk > 0 {
169		start = offsets[chunk-1]
170	}
171	return start, offsets[chunk]
172}
173