1// Copyright 2016 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "math" 19 20 "github.com/coreos/etcd/pkg/adt" 21 "github.com/coreos/etcd/storage/storagepb" 22) 23 24var ( 25 // watchBatchMaxRevs is the maximum distinct revisions that 26 // may be sent to an unsynced watcher at a time. Declared as 27 // var instead of const for testing purposes. 28 watchBatchMaxRevs = 1000 29) 30 31type eventBatch struct { 32 // evs is a batch of revision-ordered events 33 evs []storagepb.Event 34 // revs is the minimum unique revisions observed for this batch 35 revs int 36 // moreRev is first revision with more events following this batch 37 moreRev int64 38} 39 40func (eb *eventBatch) add(ev storagepb.Event) { 41 if eb.revs > watchBatchMaxRevs { 42 // maxed out batch size 43 return 44 } 45 46 if len(eb.evs) == 0 { 47 // base case 48 eb.revs = 1 49 eb.evs = append(eb.evs, ev) 50 return 51 } 52 53 // revision accounting 54 ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision 55 evRev := ev.Kv.ModRevision 56 if evRev > ebRev { 57 eb.revs++ 58 if eb.revs > watchBatchMaxRevs { 59 eb.moreRev = evRev 60 return 61 } 62 } 63 64 eb.evs = append(eb.evs, ev) 65} 66 67type watcherBatch map[*watcher]*eventBatch 68 69func (wb watcherBatch) add(w *watcher, ev storagepb.Event) { 70 eb := wb[w] 71 if eb == nil { 72 eb = &eventBatch{} 73 wb[w] = eb 74 } 75 eb.add(ev) 76} 77 78func (wb watcherBatch) contains(w *watcher) bool { 79 _, ok := wb[w] 80 return ok 81} 82 83// newWatcherBatch maps watchers to their matched events. It enables quick 84// events look up by watcher. 85func newWatcherBatch(wg *watcherGroup, evs []storagepb.Event) watcherBatch { 86 wb := make(watcherBatch) 87 for _, ev := range evs { 88 for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { 89 if ev.Kv.ModRevision >= w.cur { 90 // don't double notify 91 wb.add(w, ev) 92 } 93 } 94 } 95 return wb 96} 97 98type watcherSet map[*watcher]struct{} 99 100func (w watcherSet) add(wa *watcher) { 101 if _, ok := w[wa]; ok { 102 panic("add watcher twice!") 103 } 104 w[wa] = struct{}{} 105} 106 107func (w watcherSet) union(ws watcherSet) { 108 for wa := range ws { 109 w.add(wa) 110 } 111} 112 113func (w watcherSet) delete(wa *watcher) { 114 if _, ok := w[wa]; !ok { 115 panic("removing missing watcher!") 116 } 117 delete(w, wa) 118} 119 120type watcherSetByKey map[string]watcherSet 121 122func (w watcherSetByKey) add(wa *watcher) { 123 set := w[string(wa.key)] 124 if set == nil { 125 set = make(watcherSet) 126 w[string(wa.key)] = set 127 } 128 set.add(wa) 129} 130 131func (w watcherSetByKey) delete(wa *watcher) bool { 132 k := string(wa.key) 133 if v, ok := w[k]; ok { 134 if _, ok := v[wa]; ok { 135 delete(v, wa) 136 if len(v) == 0 { 137 // remove the set; nothing left 138 delete(w, k) 139 } 140 return true 141 } 142 } 143 return false 144} 145 146type interval struct { 147 begin string 148 end string 149} 150 151type watcherSetByInterval map[interval]watcherSet 152 153// watcherGroup is a collection of watchers organized by their ranges 154type watcherGroup struct { 155 // keyWatchers has the watchers that watch on a single key 156 keyWatchers watcherSetByKey 157 // ranges has the watchers that watch a range; it is sorted by interval 158 ranges adt.IntervalTree 159 // watchers is the set of all watchers 160 watchers watcherSet 161} 162 163func newWatcherGroup() watcherGroup { 164 return watcherGroup{ 165 keyWatchers: make(watcherSetByKey), 166 watchers: make(watcherSet), 167 } 168} 169 170// add puts a watcher in the group. 171func (wg *watcherGroup) add(wa *watcher) { 172 wg.watchers.add(wa) 173 if wa.end == nil { 174 wg.keyWatchers.add(wa) 175 return 176 } 177 178 // interval already registered? 179 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) 180 if iv := wg.ranges.Find(ivl); iv != nil { 181 iv.Val.(watcherSet).add(wa) 182 return 183 } 184 185 // not registered, put in interval tree 186 ws := make(watcherSet) 187 ws.add(wa) 188 wg.ranges.Insert(ivl, ws) 189} 190 191// contains is whether the given key has a watcher in the group. 192func (wg *watcherGroup) contains(key string) bool { 193 _, ok := wg.keyWatchers[key] 194 return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key)) 195} 196 197// size gives the number of unique watchers in the group. 198func (wg *watcherGroup) size() int { return len(wg.watchers) } 199 200// delete removes a watcher from the group. 201func (wg *watcherGroup) delete(wa *watcher) bool { 202 if _, ok := wg.watchers[wa]; !ok { 203 return false 204 } 205 wg.watchers.delete(wa) 206 if wa.end == nil { 207 wg.keyWatchers.delete(wa) 208 return true 209 } 210 211 ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end)) 212 iv := wg.ranges.Find(ivl) 213 if iv == nil { 214 return false 215 } 216 217 ws := iv.Val.(watcherSet) 218 delete(ws, wa) 219 if len(ws) == 0 { 220 // remove interval missing watchers 221 if ok := wg.ranges.Delete(ivl); !ok { 222 panic("could not remove watcher from interval tree") 223 } 224 } 225 226 return true 227} 228 229func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 { 230 minRev := int64(math.MaxInt64) 231 for w := range wg.watchers { 232 if w.cur > curRev { 233 panic("watcher current revision should not exceed current revision") 234 } 235 if w.cur < compactRev { 236 select { 237 case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: 238 wg.delete(w) 239 default: 240 // retry next time 241 } 242 continue 243 } 244 if minRev > w.cur { 245 minRev = w.cur 246 } 247 } 248 return minRev 249} 250 251// watcherSetByKey gets the set of watchers that recieve events on the given key. 252func (wg *watcherGroup) watcherSetByKey(key string) watcherSet { 253 wkeys := wg.keyWatchers[key] 254 wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key)) 255 256 // zero-copy cases 257 switch { 258 case len(wranges) == 0: 259 // no need to merge ranges or copy; reuse single-key set 260 return wkeys 261 case len(wranges) == 0 && len(wkeys) == 0: 262 return nil 263 case len(wranges) == 1 && len(wkeys) == 0: 264 return wranges[0].Val.(watcherSet) 265 } 266 267 // copy case 268 ret := make(watcherSet) 269 ret.union(wg.keyWatchers[key]) 270 for _, item := range wranges { 271 ret.union(item.Val.(watcherSet)) 272 } 273 return ret 274} 275