1/* 2 * Copyright 2017 Dgraph Labs, Inc. and Contributors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package y 18 19import ( 20 "bytes" 21 "container/heap" 22 "encoding/binary" 23 24 "github.com/ooni/psiphon/oopsi/github.com/pkg/errors" 25) 26 27// ValueStruct represents the value info that can be associated with a key, but also the internal 28// Meta field. 29type ValueStruct struct { 30 Meta byte 31 UserMeta byte 32 ExpiresAt uint64 33 Value []byte 34 35 Version uint64 // This field is not serialized. Only for internal usage. 36} 37 38func sizeVarint(x uint64) (n int) { 39 for { 40 n++ 41 x >>= 7 42 if x == 0 { 43 break 44 } 45 } 46 return n 47} 48 49// EncodedSize is the size of the ValueStruct when encoded 50func (v *ValueStruct) EncodedSize() uint16 { 51 sz := len(v.Value) + 2 // meta, usermeta. 52 if v.ExpiresAt == 0 { 53 return uint16(sz + 1) 54 } 55 56 enc := sizeVarint(v.ExpiresAt) 57 return uint16(sz + enc) 58} 59 60// Decode uses the length of the slice to infer the length of the Value field. 61func (v *ValueStruct) Decode(b []byte) { 62 v.Meta = b[0] 63 v.UserMeta = b[1] 64 var sz int 65 v.ExpiresAt, sz = binary.Uvarint(b[2:]) 66 v.Value = b[2+sz:] 67} 68 69// Encode expects a slice of length at least v.EncodedSize(). 70func (v *ValueStruct) Encode(b []byte) { 71 b[0] = v.Meta 72 b[1] = v.UserMeta 73 sz := binary.PutUvarint(b[2:], v.ExpiresAt) 74 copy(b[2+sz:], v.Value) 75} 76 77// EncodeTo should be kept in sync with the Encode function above. The reason 78// this function exists is to avoid creating byte arrays per key-value pair in 79// table/builder.go. 80func (v *ValueStruct) EncodeTo(buf *bytes.Buffer) { 81 buf.WriteByte(v.Meta) 82 buf.WriteByte(v.UserMeta) 83 var enc [binary.MaxVarintLen64]byte 84 sz := binary.PutUvarint(enc[:], v.ExpiresAt) 85 buf.Write(enc[:sz]) 86 buf.Write(v.Value) 87} 88 89// Iterator is an interface for a basic iterator. 90type Iterator interface { 91 Next() 92 Rewind() 93 Seek(key []byte) 94 Key() []byte 95 Value() ValueStruct 96 Valid() bool 97 98 // All iterators should be closed so that file garbage collection works. 99 Close() error 100} 101 102type elem struct { 103 itr Iterator 104 nice int 105 reversed bool 106} 107 108type elemHeap []*elem 109 110func (eh elemHeap) Len() int { return len(eh) } 111func (eh elemHeap) Swap(i, j int) { eh[i], eh[j] = eh[j], eh[i] } 112func (eh *elemHeap) Push(x interface{}) { *eh = append(*eh, x.(*elem)) } 113func (eh *elemHeap) Pop() interface{} { 114 // Remove the last element, because Go has already swapped 0th elem <-> last. 115 old := *eh 116 n := len(old) 117 x := old[n-1] 118 *eh = old[0 : n-1] 119 return x 120} 121func (eh elemHeap) Less(i, j int) bool { 122 cmp := CompareKeys(eh[i].itr.Key(), eh[j].itr.Key()) 123 if cmp < 0 { 124 return !eh[i].reversed 125 } 126 if cmp > 0 { 127 return eh[i].reversed 128 } 129 // The keys are equal. In this case, lower nice take precedence. This is important. 130 return eh[i].nice < eh[j].nice 131} 132 133// MergeIterator merges multiple iterators. 134// NOTE: MergeIterator owns the array of iterators and is responsible for closing them. 135type MergeIterator struct { 136 h elemHeap 137 curKey []byte 138 reversed bool 139 140 all []Iterator 141} 142 143// NewMergeIterator returns a new MergeIterator from a list of Iterators. 144func NewMergeIterator(iters []Iterator, reversed bool) *MergeIterator { 145 m := &MergeIterator{all: iters, reversed: reversed} 146 m.h = make(elemHeap, 0, len(iters)) 147 m.initHeap() 148 return m 149} 150 151func (s *MergeIterator) storeKey(smallest Iterator) { 152 if cap(s.curKey) < len(smallest.Key()) { 153 s.curKey = make([]byte, 2*len(smallest.Key())) 154 } 155 s.curKey = s.curKey[:len(smallest.Key())] 156 copy(s.curKey, smallest.Key()) 157} 158 159// initHeap checks all iterators and initializes our heap and array of keys. 160// Whenever we reverse direction, we need to run this. 161func (s *MergeIterator) initHeap() { 162 s.h = s.h[:0] 163 for idx, itr := range s.all { 164 if !itr.Valid() { 165 continue 166 } 167 e := &elem{itr: itr, nice: idx, reversed: s.reversed} 168 s.h = append(s.h, e) 169 } 170 heap.Init(&s.h) 171 for len(s.h) > 0 { 172 it := s.h[0].itr 173 if it == nil || !it.Valid() { 174 heap.Pop(&s.h) 175 continue 176 } 177 s.storeKey(s.h[0].itr) 178 break 179 } 180} 181 182// Valid returns whether the MergeIterator is at a valid element. 183func (s *MergeIterator) Valid() bool { 184 if s == nil { 185 return false 186 } 187 if len(s.h) == 0 { 188 return false 189 } 190 return s.h[0].itr.Valid() 191} 192 193// Key returns the key associated with the current iterator 194func (s *MergeIterator) Key() []byte { 195 if len(s.h) == 0 { 196 return nil 197 } 198 return s.h[0].itr.Key() 199} 200 201// Value returns the value associated with the iterator. 202func (s *MergeIterator) Value() ValueStruct { 203 if len(s.h) == 0 { 204 return ValueStruct{} 205 } 206 return s.h[0].itr.Value() 207} 208 209// Next returns the next element. If it is the same as the current key, ignore it. 210func (s *MergeIterator) Next() { 211 if len(s.h) == 0 { 212 return 213 } 214 215 smallest := s.h[0].itr 216 smallest.Next() 217 218 for len(s.h) > 0 { 219 smallest = s.h[0].itr 220 if !smallest.Valid() { 221 heap.Pop(&s.h) 222 continue 223 } 224 225 heap.Fix(&s.h, 0) 226 smallest = s.h[0].itr 227 if smallest.Valid() { 228 if !bytes.Equal(smallest.Key(), s.curKey) { 229 break 230 } 231 smallest.Next() 232 } 233 } 234 if !smallest.Valid() { 235 return 236 } 237 s.storeKey(smallest) 238} 239 240// Rewind seeks to first element (or last element for reverse iterator). 241func (s *MergeIterator) Rewind() { 242 for _, itr := range s.all { 243 itr.Rewind() 244 } 245 s.initHeap() 246} 247 248// Seek brings us to element with key >= given key. 249func (s *MergeIterator) Seek(key []byte) { 250 for _, itr := range s.all { 251 itr.Seek(key) 252 } 253 s.initHeap() 254} 255 256// Close implements y.Iterator 257func (s *MergeIterator) Close() error { 258 for _, itr := range s.all { 259 if err := itr.Close(); err != nil { 260 return errors.Wrap(err, "MergeIterator") 261 } 262 } 263 return nil 264} 265