1//  Copyright (c) 2017 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package vellum
16
17import (
18	"bytes"
19)
20
21// MergeFunc is used to choose the new value for a key when merging a slice
22// of iterators, and the same key is observed with multiple values.
23// Values presented to the MergeFunc will be in the same order as the
24// original slice creating the MergeIterator.  This allows some MergeFunc
25// implementations to prioritize one iterator over another.
26type MergeFunc func([]uint64) uint64
27
28// MergeIterator implements the Iterator interface by traversing a slice
29// of iterators and merging the contents of them.  If the same key exists
30// in mulitipe underlying iterators, a user-provided MergeFunc will be
31// invoked to choose the new value.
32type MergeIterator struct {
33	itrs   []Iterator
34	f      MergeFunc
35	currKs [][]byte
36	currVs []uint64
37
38	lowK    []byte
39	lowV    uint64
40	lowIdxs []int
41
42	mergeV []uint64
43}
44
45// NewMergeIterator creates a new MergeIterator over the provided slice of
46// Iterators and with the specified MergeFunc to resolve duplicate keys.
47func NewMergeIterator(itrs []Iterator, f MergeFunc) (*MergeIterator, error) {
48	rv := &MergeIterator{
49		itrs:    itrs,
50		f:       f,
51		currKs:  make([][]byte, len(itrs)),
52		currVs:  make([]uint64, len(itrs)),
53		lowIdxs: make([]int, 0, len(itrs)),
54		mergeV:  make([]uint64, 0, len(itrs)),
55	}
56	rv.init()
57	if rv.lowK == nil {
58		return rv, ErrIteratorDone
59	}
60	return rv, nil
61}
62
63func (m *MergeIterator) init() {
64	for i, itr := range m.itrs {
65		m.currKs[i], m.currVs[i] = itr.Current()
66	}
67	m.updateMatches()
68}
69
70func (m *MergeIterator) updateMatches() {
71	if len(m.itrs) < 1 {
72		return
73	}
74	m.lowK = m.currKs[0]
75	m.lowIdxs = m.lowIdxs[:0]
76	m.lowIdxs = append(m.lowIdxs, 0)
77	for i := 1; i < len(m.itrs); i++ {
78		if m.currKs[i] == nil {
79			continue
80		}
81		cmp := bytes.Compare(m.currKs[i], m.lowK)
82		if m.lowK == nil || cmp < 0 {
83			// reached a new low
84			m.lowK = m.currKs[i]
85			m.lowIdxs = m.lowIdxs[:0]
86			m.lowIdxs = append(m.lowIdxs, i)
87		} else if cmp == 0 {
88			m.lowIdxs = append(m.lowIdxs, i)
89		}
90	}
91	if len(m.lowIdxs) > 1 {
92		// merge multiple values
93		m.mergeV = m.mergeV[:0]
94		for _, vi := range m.lowIdxs {
95			m.mergeV = append(m.mergeV, m.currVs[vi])
96		}
97		m.lowV = m.f(m.mergeV)
98	} else if len(m.lowIdxs) == 1 {
99		m.lowV = m.currVs[m.lowIdxs[0]]
100	}
101}
102
103// Current returns the key and value currently pointed to by this iterator.
104// If the iterator is not pointing at a valid value (because Iterator/Next/Seek)
105// returned an error previously, it may return nil,0.
106func (m *MergeIterator) Current() ([]byte, uint64) {
107	return m.lowK, m.lowV
108}
109
110// Next advances this iterator to the next key/value pair.  If there is none,
111// then ErrIteratorDone is returned.
112func (m *MergeIterator) Next() error {
113	// move all the current low iterators to next
114	for _, vi := range m.lowIdxs {
115		err := m.itrs[vi].Next()
116		if err != nil && err != ErrIteratorDone {
117			return err
118		}
119		m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
120	}
121	m.updateMatches()
122	if m.lowK == nil {
123		return ErrIteratorDone
124	}
125	return nil
126}
127
128// Seek advances this iterator to the specified key/value pair.  If this key
129// is not in the FST, Current() will return the next largest key.  If this
130// seek operation would go past the last key, then ErrIteratorDone is returned.
131func (m *MergeIterator) Seek(key []byte) error {
132	for i := range m.itrs {
133		err := m.itrs[i].Seek(key)
134		if err != nil && err != ErrIteratorDone {
135			return err
136		}
137	}
138	m.updateMatches()
139	if m.lowK == nil {
140		return ErrIteratorDone
141	}
142	return nil
143}
144
145// Close will attempt to close all the underlying Iterators.  If any errors
146// are encountered, the first will be returned.
147func (m *MergeIterator) Close() error {
148	var rv error
149	for i := range m.itrs {
150		// close all iterators, return first error if any
151		err := m.itrs[i].Close()
152		if rv == nil {
153			rv = err
154		}
155	}
156	return rv
157}
158
159// MergeMin chooses the minimum value
160func MergeMin(vals []uint64) uint64 {
161	rv := vals[0]
162	for _, v := range vals[1:] {
163		if v < rv {
164			rv = v
165		}
166	}
167	return rv
168}
169
170// MergeMax chooses the maximum value
171func MergeMax(vals []uint64) uint64 {
172	rv := vals[0]
173	for _, v := range vals[1:] {
174		if v > rv {
175			rv = v
176		}
177	}
178	return rv
179}
180
181// MergeSum sums the values
182func MergeSum(vals []uint64) uint64 {
183	rv := vals[0]
184	for _, v := range vals[1:] {
185		rv += v
186	}
187	return rv
188}
189