1// Copyright 2016 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"math"
19
20	"github.com/coreos/etcd/pkg/adt"
21	"github.com/coreos/etcd/storage/storagepb"
22)
23
24var (
25	// watchBatchMaxRevs is the maximum distinct revisions that
26	// may be sent to an unsynced watcher at a time. Declared as
27	// var instead of const for testing purposes.
28	watchBatchMaxRevs = 1000
29)
30
31type eventBatch struct {
32	// evs is a batch of revision-ordered events
33	evs []storagepb.Event
34	// revs is the minimum unique revisions observed for this batch
35	revs int
36	// moreRev is first revision with more events following this batch
37	moreRev int64
38}
39
40func (eb *eventBatch) add(ev storagepb.Event) {
41	if eb.revs > watchBatchMaxRevs {
42		// maxed out batch size
43		return
44	}
45
46	if len(eb.evs) == 0 {
47		// base case
48		eb.revs = 1
49		eb.evs = append(eb.evs, ev)
50		return
51	}
52
53	// revision accounting
54	ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
55	evRev := ev.Kv.ModRevision
56	if evRev > ebRev {
57		eb.revs++
58		if eb.revs > watchBatchMaxRevs {
59			eb.moreRev = evRev
60			return
61		}
62	}
63
64	eb.evs = append(eb.evs, ev)
65}
66
67type watcherBatch map[*watcher]*eventBatch
68
69func (wb watcherBatch) add(w *watcher, ev storagepb.Event) {
70	eb := wb[w]
71	if eb == nil {
72		eb = &eventBatch{}
73		wb[w] = eb
74	}
75	eb.add(ev)
76}
77
78func (wb watcherBatch) contains(w *watcher) bool {
79	_, ok := wb[w]
80	return ok
81}
82
83// newWatcherBatch maps watchers to their matched events. It enables quick
84// events look up by watcher.
85func newWatcherBatch(wg *watcherGroup, evs []storagepb.Event) watcherBatch {
86	wb := make(watcherBatch)
87	for _, ev := range evs {
88		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
89			if ev.Kv.ModRevision >= w.cur {
90				// don't double notify
91				wb.add(w, ev)
92			}
93		}
94	}
95	return wb
96}
97
98type watcherSet map[*watcher]struct{}
99
100func (w watcherSet) add(wa *watcher) {
101	if _, ok := w[wa]; ok {
102		panic("add watcher twice!")
103	}
104	w[wa] = struct{}{}
105}
106
107func (w watcherSet) union(ws watcherSet) {
108	for wa := range ws {
109		w.add(wa)
110	}
111}
112
113func (w watcherSet) delete(wa *watcher) {
114	if _, ok := w[wa]; !ok {
115		panic("removing missing watcher!")
116	}
117	delete(w, wa)
118}
119
120type watcherSetByKey map[string]watcherSet
121
122func (w watcherSetByKey) add(wa *watcher) {
123	set := w[string(wa.key)]
124	if set == nil {
125		set = make(watcherSet)
126		w[string(wa.key)] = set
127	}
128	set.add(wa)
129}
130
131func (w watcherSetByKey) delete(wa *watcher) bool {
132	k := string(wa.key)
133	if v, ok := w[k]; ok {
134		if _, ok := v[wa]; ok {
135			delete(v, wa)
136			if len(v) == 0 {
137				// remove the set; nothing left
138				delete(w, k)
139			}
140			return true
141		}
142	}
143	return false
144}
145
146type interval struct {
147	begin string
148	end   string
149}
150
151type watcherSetByInterval map[interval]watcherSet
152
153// watcherGroup is a collection of watchers organized by their ranges
154type watcherGroup struct {
155	// keyWatchers has the watchers that watch on a single key
156	keyWatchers watcherSetByKey
157	// ranges has the watchers that watch a range; it is sorted by interval
158	ranges adt.IntervalTree
159	// watchers is the set of all watchers
160	watchers watcherSet
161}
162
163func newWatcherGroup() watcherGroup {
164	return watcherGroup{
165		keyWatchers: make(watcherSetByKey),
166		watchers:    make(watcherSet),
167	}
168}
169
170// add puts a watcher in the group.
171func (wg *watcherGroup) add(wa *watcher) {
172	wg.watchers.add(wa)
173	if wa.end == nil {
174		wg.keyWatchers.add(wa)
175		return
176	}
177
178	// interval already registered?
179	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
180	if iv := wg.ranges.Find(ivl); iv != nil {
181		iv.Val.(watcherSet).add(wa)
182		return
183	}
184
185	// not registered, put in interval tree
186	ws := make(watcherSet)
187	ws.add(wa)
188	wg.ranges.Insert(ivl, ws)
189}
190
191// contains is whether the given key has a watcher in the group.
192func (wg *watcherGroup) contains(key string) bool {
193	_, ok := wg.keyWatchers[key]
194	return ok || wg.ranges.Contains(adt.NewStringAffinePoint(key))
195}
196
197// size gives the number of unique watchers in the group.
198func (wg *watcherGroup) size() int { return len(wg.watchers) }
199
200// delete removes a watcher from the group.
201func (wg *watcherGroup) delete(wa *watcher) bool {
202	if _, ok := wg.watchers[wa]; !ok {
203		return false
204	}
205	wg.watchers.delete(wa)
206	if wa.end == nil {
207		wg.keyWatchers.delete(wa)
208		return true
209	}
210
211	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
212	iv := wg.ranges.Find(ivl)
213	if iv == nil {
214		return false
215	}
216
217	ws := iv.Val.(watcherSet)
218	delete(ws, wa)
219	if len(ws) == 0 {
220		// remove interval missing watchers
221		if ok := wg.ranges.Delete(ivl); !ok {
222			panic("could not remove watcher from interval tree")
223		}
224	}
225
226	return true
227}
228
229func (wg *watcherGroup) scanMinRev(curRev int64, compactRev int64) int64 {
230	minRev := int64(math.MaxInt64)
231	for w := range wg.watchers {
232		if w.cur > curRev {
233			panic("watcher current revision should not exceed current revision")
234		}
235		if w.cur < compactRev {
236			select {
237			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
238				wg.delete(w)
239			default:
240				// retry next time
241			}
242			continue
243		}
244		if minRev > w.cur {
245			minRev = w.cur
246		}
247	}
248	return minRev
249}
250
251// watcherSetByKey gets the set of watchers that recieve events on the given key.
252func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
253	wkeys := wg.keyWatchers[key]
254	wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
255
256	// zero-copy cases
257	switch {
258	case len(wranges) == 0:
259		// no need to merge ranges or copy; reuse single-key set
260		return wkeys
261	case len(wranges) == 0 && len(wkeys) == 0:
262		return nil
263	case len(wranges) == 1 && len(wkeys) == 0:
264		return wranges[0].Val.(watcherSet)
265	}
266
267	// copy case
268	ret := make(watcherSet)
269	ret.union(wg.keyWatchers[key])
270	for _, item := range wranges {
271		ret.union(item.Val.(watcherSet))
272	}
273	return ret
274}
275