1//  Copyright (c) 2015 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package gtreap provides an in-memory implementation of the
16// KVStore interfaces using the gtreap balanced-binary treap,
17// copy-on-write data structure.
18package gtreap
19
20import (
21	"bytes"
22	"sync"
23
24	"github.com/steveyen/gtreap"
25)
26
27type Iterator struct {
28	t *gtreap.Treap
29
30	m        sync.Mutex
31	cancelCh chan struct{}
32	nextCh   chan *Item
33	curr     *Item
34	currOk   bool
35
36	prefix []byte
37	start  []byte
38	end    []byte
39}
40
41func (w *Iterator) Seek(k []byte) {
42	if w.start != nil && bytes.Compare(k, w.start) < 0 {
43		k = w.start
44	}
45	if w.prefix != nil && !bytes.HasPrefix(k, w.prefix) {
46		if bytes.Compare(k, w.prefix) < 0 {
47			k = w.prefix
48		} else {
49			var end []byte
50			for i := len(w.prefix) - 1; i >= 0; i-- {
51				c := w.prefix[i]
52				if c < 0xff {
53					end = make([]byte, i+1)
54					copy(end, w.prefix)
55					end[i] = c + 1
56					break
57				}
58			}
59			k = end
60		}
61	}
62	w.restart(&Item{k: k})
63}
64
65func (w *Iterator) restart(start *Item) *Iterator {
66	cancelCh := make(chan struct{})
67	nextCh := make(chan *Item, 1)
68
69	w.m.Lock()
70	if w.cancelCh != nil {
71		close(w.cancelCh)
72	}
73	w.cancelCh = cancelCh
74	w.nextCh = nextCh
75	w.curr = nil
76	w.currOk = false
77	w.m.Unlock()
78
79	go func() {
80		if start != nil {
81			w.t.VisitAscend(start, func(itm gtreap.Item) bool {
82				select {
83				case <-cancelCh:
84					return false
85				case nextCh <- itm.(*Item):
86					return true
87				}
88			})
89		}
90		close(nextCh)
91	}()
92
93	w.Next()
94
95	return w
96}
97
98func (w *Iterator) Next() {
99	w.m.Lock()
100	nextCh := w.nextCh
101	w.m.Unlock()
102	w.curr, w.currOk = <-nextCh
103}
104
105func (w *Iterator) Current() ([]byte, []byte, bool) {
106	w.m.Lock()
107	defer w.m.Unlock()
108	if !w.currOk || w.curr == nil {
109		return nil, nil, false
110	}
111	if w.prefix != nil && !bytes.HasPrefix(w.curr.k, w.prefix) {
112		return nil, nil, false
113	} else if w.end != nil && bytes.Compare(w.curr.k, w.end) >= 0 {
114		return nil, nil, false
115	}
116	return w.curr.k, w.curr.v, w.currOk
117}
118
119func (w *Iterator) Key() []byte {
120	k, _, ok := w.Current()
121	if !ok {
122		return nil
123	}
124	return k
125}
126
127func (w *Iterator) Value() []byte {
128	_, v, ok := w.Current()
129	if !ok {
130		return nil
131	}
132	return v
133}
134
135func (w *Iterator) Valid() bool {
136	_, _, ok := w.Current()
137	return ok
138}
139
140func (w *Iterator) Close() error {
141	w.m.Lock()
142	if w.cancelCh != nil {
143		close(w.cancelCh)
144	}
145	w.cancelCh = nil
146	w.nextCh = nil
147	w.curr = nil
148	w.currOk = false
149	w.m.Unlock()
150
151	return nil
152}
153