1// Copyright (c) 2017 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package vellum 16 17import ( 18 "bytes" 19) 20 21// MergeFunc is used to choose the new value for a key when merging a slice 22// of iterators, and the same key is observed with multiple values. 23// Values presented to the MergeFunc will be in the same order as the 24// original slice creating the MergeIterator. This allows some MergeFunc 25// implementations to prioritize one iterator over another. 26type MergeFunc func([]uint64) uint64 27 28// MergeIterator implements the Iterator interface by traversing a slice 29// of iterators and merging the contents of them. If the same key exists 30// in mulitipe underlying iterators, a user-provided MergeFunc will be 31// invoked to choose the new value. 32type MergeIterator struct { 33 itrs []Iterator 34 f MergeFunc 35 currKs [][]byte 36 currVs []uint64 37 38 lowK []byte 39 lowV uint64 40 lowIdxs []int 41 42 mergeV []uint64 43} 44 45// NewMergeIterator creates a new MergeIterator over the provided slice of 46// Iterators and with the specified MergeFunc to resolve duplicate keys. 47func NewMergeIterator(itrs []Iterator, f MergeFunc) (*MergeIterator, error) { 48 rv := &MergeIterator{ 49 itrs: itrs, 50 f: f, 51 currKs: make([][]byte, len(itrs)), 52 currVs: make([]uint64, len(itrs)), 53 lowIdxs: make([]int, 0, len(itrs)), 54 mergeV: make([]uint64, 0, len(itrs)), 55 } 56 rv.init() 57 if rv.lowK == nil { 58 return rv, ErrIteratorDone 59 } 60 return rv, nil 61} 62 63func (m *MergeIterator) init() { 64 for i, itr := range m.itrs { 65 m.currKs[i], m.currVs[i] = itr.Current() 66 } 67 m.updateMatches() 68} 69 70func (m *MergeIterator) updateMatches() { 71 if len(m.itrs) < 1 { 72 return 73 } 74 m.lowK = m.currKs[0] 75 m.lowIdxs = m.lowIdxs[:0] 76 m.lowIdxs = append(m.lowIdxs, 0) 77 for i := 1; i < len(m.itrs); i++ { 78 if m.currKs[i] == nil { 79 continue 80 } 81 cmp := bytes.Compare(m.currKs[i], m.lowK) 82 if m.lowK == nil || cmp < 0 { 83 // reached a new low 84 m.lowK = m.currKs[i] 85 m.lowIdxs = m.lowIdxs[:0] 86 m.lowIdxs = append(m.lowIdxs, i) 87 } else if cmp == 0 { 88 m.lowIdxs = append(m.lowIdxs, i) 89 } 90 } 91 if len(m.lowIdxs) > 1 { 92 // merge multiple values 93 m.mergeV = m.mergeV[:0] 94 for _, vi := range m.lowIdxs { 95 m.mergeV = append(m.mergeV, m.currVs[vi]) 96 } 97 m.lowV = m.f(m.mergeV) 98 } else if len(m.lowIdxs) == 1 { 99 m.lowV = m.currVs[m.lowIdxs[0]] 100 } 101} 102 103// Current returns the key and value currently pointed to by this iterator. 104// If the iterator is not pointing at a valid value (because Iterator/Next/Seek) 105// returned an error previously, it may return nil,0. 106func (m *MergeIterator) Current() ([]byte, uint64) { 107 return m.lowK, m.lowV 108} 109 110// Next advances this iterator to the next key/value pair. If there is none, 111// then ErrIteratorDone is returned. 112func (m *MergeIterator) Next() error { 113 // move all the current low iterators to next 114 for _, vi := range m.lowIdxs { 115 err := m.itrs[vi].Next() 116 if err != nil && err != ErrIteratorDone { 117 return err 118 } 119 m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current() 120 } 121 m.updateMatches() 122 if m.lowK == nil { 123 return ErrIteratorDone 124 } 125 return nil 126} 127 128// Seek advances this iterator to the specified key/value pair. If this key 129// is not in the FST, Current() will return the next largest key. If this 130// seek operation would go past the last key, then ErrIteratorDone is returned. 131func (m *MergeIterator) Seek(key []byte) error { 132 for i := range m.itrs { 133 err := m.itrs[i].Seek(key) 134 if err != nil && err != ErrIteratorDone { 135 return err 136 } 137 } 138 m.updateMatches() 139 if m.lowK == nil { 140 return ErrIteratorDone 141 } 142 return nil 143} 144 145// Close will attempt to close all the underlying Iterators. If any errors 146// are encountered, the first will be returned. 147func (m *MergeIterator) Close() error { 148 var rv error 149 for i := range m.itrs { 150 // close all iterators, return first error if any 151 err := m.itrs[i].Close() 152 if rv == nil { 153 rv = err 154 } 155 } 156 return rv 157} 158 159// MergeMin chooses the minimum value 160func MergeMin(vals []uint64) uint64 { 161 rv := vals[0] 162 for _, v := range vals[1:] { 163 if v < rv { 164 rv = v 165 } 166 } 167 return rv 168} 169 170// MergeMax chooses the maximum value 171func MergeMax(vals []uint64) uint64 { 172 rv := vals[0] 173 for _, v := range vals[1:] { 174 if v > rv { 175 rv = v 176 } 177 } 178 return rv 179} 180 181// MergeSum sums the values 182func MergeSum(vals []uint64) uint64 { 183 rv := vals[0] 184 for _, v := range vals[1:] { 185 rv += v 186 } 187 return rv 188} 189