1/*
2 * Copyright 2017 Dgraph Labs, Inc. and Contributors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package y
18
19import (
20	"bytes"
21	"container/heap"
22	"encoding/binary"
23
24	"github.com/ooni/psiphon/oopsi/github.com/pkg/errors"
25)
26
27// ValueStruct represents the value info that can be associated with a key, but also the internal
28// Meta field.
29type ValueStruct struct {
30	Meta      byte
31	UserMeta  byte
32	ExpiresAt uint64
33	Value     []byte
34
35	Version uint64 // This field is not serialized. Only for internal usage.
36}
37
38func sizeVarint(x uint64) (n int) {
39	for {
40		n++
41		x >>= 7
42		if x == 0 {
43			break
44		}
45	}
46	return n
47}
48
49// EncodedSize is the size of the ValueStruct when encoded
50func (v *ValueStruct) EncodedSize() uint16 {
51	sz := len(v.Value) + 2 // meta, usermeta.
52	if v.ExpiresAt == 0 {
53		return uint16(sz + 1)
54	}
55
56	enc := sizeVarint(v.ExpiresAt)
57	return uint16(sz + enc)
58}
59
60// Decode uses the length of the slice to infer the length of the Value field.
61func (v *ValueStruct) Decode(b []byte) {
62	v.Meta = b[0]
63	v.UserMeta = b[1]
64	var sz int
65	v.ExpiresAt, sz = binary.Uvarint(b[2:])
66	v.Value = b[2+sz:]
67}
68
69// Encode expects a slice of length at least v.EncodedSize().
70func (v *ValueStruct) Encode(b []byte) {
71	b[0] = v.Meta
72	b[1] = v.UserMeta
73	sz := binary.PutUvarint(b[2:], v.ExpiresAt)
74	copy(b[2+sz:], v.Value)
75}
76
77// EncodeTo should be kept in sync with the Encode function above. The reason
78// this function exists is to avoid creating byte arrays per key-value pair in
79// table/builder.go.
80func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) {
81	buf.WriteByte(v.Meta)
82	buf.WriteByte(v.UserMeta)
83	var enc [binary.MaxVarintLen64]byte
84	sz := binary.PutUvarint(enc[:], v.ExpiresAt)
85	buf.Write(enc[:sz])
86	buf.Write(v.Value)
87}
88
89// Iterator is an interface for a basic iterator.
90type Iterator interface {
91	Next()
92	Rewind()
93	Seek(key []byte)
94	Key() []byte
95	Value() ValueStruct
96	Valid() bool
97
98	// All iterators should be closed so that file garbage collection works.
99	Close() error
100}
101
102type elem struct {
103	itr      Iterator
104	nice     int
105	reversed bool
106}
107
108type elemHeap []*elem
109
110func (eh elemHeap) Len() int            { return len(eh) }
111func (eh elemHeap) Swap(i, j int)       { eh[i], eh[j] = eh[j], eh[i] }
112func (eh *elemHeap) Push(x interface{}) { *eh = append(*eh, x.(*elem)) }
113func (eh *elemHeap) Pop() interface{} {
114	// Remove the last element, because Go has already swapped 0th elem <-> last.
115	old := *eh
116	n := len(old)
117	x := old[n-1]
118	*eh = old[0 : n-1]
119	return x
120}
121func (eh elemHeap) Less(i, j int) bool {
122	cmp := CompareKeys(eh[i].itr.Key(), eh[j].itr.Key())
123	if cmp < 0 {
124		return !eh[i].reversed
125	}
126	if cmp > 0 {
127		return eh[i].reversed
128	}
129	// The keys are equal. In this case, lower nice take precedence. This is important.
130	return eh[i].nice < eh[j].nice
131}
132
133// MergeIterator merges multiple iterators.
134// NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
135type MergeIterator struct {
136	h        elemHeap
137	curKey   []byte
138	reversed bool
139
140	all []Iterator
141}
142
143// NewMergeIterator returns a new MergeIterator from a list of Iterators.
144func NewMergeIterator(iters []Iterator, reversed bool) *MergeIterator {
145	m := &MergeIterator{all: iters, reversed: reversed}
146	m.h = make(elemHeap, 0, len(iters))
147	m.initHeap()
148	return m
149}
150
151func (s *MergeIterator) storeKey(smallest Iterator) {
152	if cap(s.curKey) < len(smallest.Key()) {
153		s.curKey = make([]byte, 2*len(smallest.Key()))
154	}
155	s.curKey = s.curKey[:len(smallest.Key())]
156	copy(s.curKey, smallest.Key())
157}
158
159// initHeap checks all iterators and initializes our heap and array of keys.
160// Whenever we reverse direction, we need to run this.
161func (s *MergeIterator) initHeap() {
162	s.h = s.h[:0]
163	for idx, itr := range s.all {
164		if !itr.Valid() {
165			continue
166		}
167		e := &elem{itr: itr, nice: idx, reversed: s.reversed}
168		s.h = append(s.h, e)
169	}
170	heap.Init(&s.h)
171	for len(s.h) > 0 {
172		it := s.h[0].itr
173		if it == nil || !it.Valid() {
174			heap.Pop(&s.h)
175			continue
176		}
177		s.storeKey(s.h[0].itr)
178		break
179	}
180}
181
182// Valid returns whether the MergeIterator is at a valid element.
183func (s *MergeIterator) Valid() bool {
184	if s == nil {
185		return false
186	}
187	if len(s.h) == 0 {
188		return false
189	}
190	return s.h[0].itr.Valid()
191}
192
193// Key returns the key associated with the current iterator
194func (s *MergeIterator) Key() []byte {
195	if len(s.h) == 0 {
196		return nil
197	}
198	return s.h[0].itr.Key()
199}
200
201// Value returns the value associated with the iterator.
202func (s *MergeIterator) Value() ValueStruct {
203	if len(s.h) == 0 {
204		return ValueStruct{}
205	}
206	return s.h[0].itr.Value()
207}
208
209// Next returns the next element. If it is the same as the current key, ignore it.
210func (s *MergeIterator) Next() {
211	if len(s.h) == 0 {
212		return
213	}
214
215	smallest := s.h[0].itr
216	smallest.Next()
217
218	for len(s.h) > 0 {
219		smallest = s.h[0].itr
220		if !smallest.Valid() {
221			heap.Pop(&s.h)
222			continue
223		}
224
225		heap.Fix(&s.h, 0)
226		smallest = s.h[0].itr
227		if smallest.Valid() {
228			if !bytes.Equal(smallest.Key(), s.curKey) {
229				break
230			}
231			smallest.Next()
232		}
233	}
234	if !smallest.Valid() {
235		return
236	}
237	s.storeKey(smallest)
238}
239
240// Rewind seeks to first element (or last element for reverse iterator).
241func (s *MergeIterator) Rewind() {
242	for _, itr := range s.all {
243		itr.Rewind()
244	}
245	s.initHeap()
246}
247
248// Seek brings us to element with key >= given key.
249func (s *MergeIterator) Seek(key []byte) {
250	for _, itr := range s.all {
251		itr.Seek(key)
252	}
253	s.initHeap()
254}
255
256// Close implements y.Iterator
257func (s *MergeIterator) Close() error {
258	for _, itr := range s.all {
259		if err := itr.Close(); err != nil {
260			return errors.Wrap(err, "MergeIterator")
261		}
262	}
263	return nil
264}
265