1// Copyright (c) 2015 Couchbase, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package gtreap provides an in-memory implementation of the 16// KVStore interfaces using the gtreap balanced-binary treap, 17// copy-on-write data structure. 18package gtreap 19 20import ( 21 "bytes" 22 "sync" 23 24 "github.com/steveyen/gtreap" 25) 26 27type Iterator struct { 28 t *gtreap.Treap 29 30 m sync.Mutex 31 cancelCh chan struct{} 32 nextCh chan *Item 33 curr *Item 34 currOk bool 35 36 prefix []byte 37 start []byte 38 end []byte 39} 40 41func (w *Iterator) Seek(k []byte) { 42 if w.start != nil && bytes.Compare(k, w.start) < 0 { 43 k = w.start 44 } 45 if w.prefix != nil && !bytes.HasPrefix(k, w.prefix) { 46 if bytes.Compare(k, w.prefix) < 0 { 47 k = w.prefix 48 } else { 49 var end []byte 50 for i := len(w.prefix) - 1; i >= 0; i-- { 51 c := w.prefix[i] 52 if c < 0xff { 53 end = make([]byte, i+1) 54 copy(end, w.prefix) 55 end[i] = c + 1 56 break 57 } 58 } 59 k = end 60 } 61 } 62 w.restart(&Item{k: k}) 63} 64 65func (w *Iterator) restart(start *Item) *Iterator { 66 cancelCh := make(chan struct{}) 67 nextCh := make(chan *Item, 1) 68 69 w.m.Lock() 70 if w.cancelCh != nil { 71 close(w.cancelCh) 72 } 73 w.cancelCh = cancelCh 74 w.nextCh = nextCh 75 w.curr = nil 76 w.currOk = false 77 w.m.Unlock() 78 79 go func() { 80 if start != nil { 81 w.t.VisitAscend(start, func(itm gtreap.Item) bool { 82 select { 83 case <-cancelCh: 84 return false 85 case nextCh <- itm.(*Item): 86 return true 87 } 88 }) 89 } 90 close(nextCh) 91 }() 92 93 w.Next() 94 95 return w 96} 97 98func (w *Iterator) Next() { 99 w.m.Lock() 100 nextCh := w.nextCh 101 w.m.Unlock() 102 w.curr, w.currOk = <-nextCh 103} 104 105func (w *Iterator) Current() ([]byte, []byte, bool) { 106 w.m.Lock() 107 defer w.m.Unlock() 108 if !w.currOk || w.curr == nil { 109 return nil, nil, false 110 } 111 if w.prefix != nil && !bytes.HasPrefix(w.curr.k, w.prefix) { 112 return nil, nil, false 113 } else if w.end != nil && bytes.Compare(w.curr.k, w.end) >= 0 { 114 return nil, nil, false 115 } 116 return w.curr.k, w.curr.v, w.currOk 117} 118 119func (w *Iterator) Key() []byte { 120 k, _, ok := w.Current() 121 if !ok { 122 return nil 123 } 124 return k 125} 126 127func (w *Iterator) Value() []byte { 128 _, v, ok := w.Current() 129 if !ok { 130 return nil 131 } 132 return v 133} 134 135func (w *Iterator) Valid() bool { 136 _, _, ok := w.Current() 137 return ok 138} 139 140func (w *Iterator) Close() error { 141 w.m.Lock() 142 if w.cancelCh != nil { 143 close(w.cancelCh) 144 } 145 w.cancelCh = nil 146 w.nextCh = nil 147 w.curr = nil 148 w.currOk = false 149 w.m.Unlock() 150 151 return nil 152} 153