1//  Copyright (c) 2018 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package zap
16
17import (
18	"bytes"
19
20	"github.com/blevesearch/vellum"
21)
22
23// enumerator provides an ordered traversal of multiple vellum
24// iterators.  Like JOIN of iterators, the enumerator produces a
25// sequence of (key, iteratorIndex, value) tuples, sorted by key ASC,
26// then iteratorIndex ASC, where the same key might be seen or
27// repeated across multiple child iterators.
28type enumerator struct {
29	itrs   []vellum.Iterator
30	currKs [][]byte
31	currVs []uint64
32
33	lowK    []byte
34	lowIdxs []int
35	lowCurr int
36}
37
38// newEnumerator returns a new enumerator over the vellum Iterators
39func newEnumerator(itrs []vellum.Iterator) (*enumerator, error) {
40	rv := &enumerator{
41		itrs:    itrs,
42		currKs:  make([][]byte, len(itrs)),
43		currVs:  make([]uint64, len(itrs)),
44		lowIdxs: make([]int, 0, len(itrs)),
45	}
46	for i, itr := range rv.itrs {
47		rv.currKs[i], rv.currVs[i] = itr.Current()
48	}
49	rv.updateMatches(false)
50	if rv.lowK == nil && len(rv.lowIdxs) == 0 {
51		return rv, vellum.ErrIteratorDone
52	}
53	return rv, nil
54}
55
56// updateMatches maintains the low key matches based on the currKs
57func (m *enumerator) updateMatches(skipEmptyKey bool) {
58	m.lowK = nil
59	m.lowIdxs = m.lowIdxs[:0]
60	m.lowCurr = 0
61
62	for i, key := range m.currKs {
63		if (key == nil && m.currVs[i] == 0) || // in case of empty iterator
64			(len(key) == 0 && skipEmptyKey) { // skip empty keys
65			continue
66		}
67
68		cmp := bytes.Compare(key, m.lowK)
69		if cmp < 0 || len(m.lowIdxs) == 0 {
70			// reached a new low
71			m.lowK = key
72			m.lowIdxs = m.lowIdxs[:0]
73			m.lowIdxs = append(m.lowIdxs, i)
74		} else if cmp == 0 {
75			m.lowIdxs = append(m.lowIdxs, i)
76		}
77	}
78}
79
80// Current returns the enumerator's current key, iterator-index, and
81// value.  If the enumerator is not pointing at a valid value (because
82// Next returned an error previously), Current will return nil,0,0.
83func (m *enumerator) Current() ([]byte, int, uint64) {
84	var i int
85	var v uint64
86	if m.lowCurr < len(m.lowIdxs) {
87		i = m.lowIdxs[m.lowCurr]
88		v = m.currVs[i]
89	}
90	return m.lowK, i, v
91}
92
93// GetLowIdxsAndValues will return all of the iterator indices
94// which point to the current key, and their corresponding
95// values.  This can be used by advanced caller which may need
96// to peek into these other sets of data before processing.
97func (m *enumerator) GetLowIdxsAndValues() ([]int, []uint64) {
98	values := make([]uint64, 0, len(m.lowIdxs))
99	for _, idx := range m.lowIdxs {
100		values = append(values, m.currVs[idx])
101	}
102	return m.lowIdxs, values
103}
104
105// Next advances the enumerator to the next key/iterator/value result,
106// else vellum.ErrIteratorDone is returned.
107func (m *enumerator) Next() error {
108	m.lowCurr += 1
109	if m.lowCurr >= len(m.lowIdxs) {
110		// move all the current low iterators forwards
111		for _, vi := range m.lowIdxs {
112			err := m.itrs[vi].Next()
113			if err != nil && err != vellum.ErrIteratorDone {
114				return err
115			}
116			m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
117		}
118		// can skip any empty keys encountered at this point
119		m.updateMatches(true)
120	}
121	if m.lowK == nil && len(m.lowIdxs) == 0 {
122		return vellum.ErrIteratorDone
123	}
124	return nil
125}
126
127// Close all the underlying Iterators.  The first error, if any, will
128// be returned.
129func (m *enumerator) Close() error {
130	var rv error
131	for _, itr := range m.itrs {
132		err := itr.Close()
133		if rv == nil {
134			rv = err
135		}
136	}
137	return rv
138}
139