1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"sync"
19	"time"
20
21	"github.com/coreos/etcd/lease"
22	"github.com/coreos/etcd/mvcc/backend"
23	"github.com/coreos/etcd/mvcc/mvccpb"
24)
25
26const (
27	// chanBufLen is the length of the buffered chan
28	// for sending out watched events.
29	// TODO: find a good buf value. 1024 is just a random one that
30	// seems to be reasonable.
31	chanBufLen = 1024
32
33	// maxWatchersPerSync is the number of watchers to sync in a single batch
34	maxWatchersPerSync = 512
35)
36
37type watchable interface {
38	watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
39	progress(w *watcher)
40	rev() int64
41}
42
43type watchableStore struct {
44	mu sync.Mutex
45
46	*store
47
48	// victims are watcher batches that were blocked on the watch channel
49	victims []watcherBatch
50	victimc chan struct{}
51
52	// contains all unsynced watchers that needs to sync with events that have happened
53	unsynced watcherGroup
54
55	// contains all synced watchers that are in sync with the progress of the store.
56	// The key of the map is the key that the watcher watches on.
57	synced watcherGroup
58
59	stopc chan struct{}
60	wg    sync.WaitGroup
61}
62
63// cancelFunc updates unsynced and synced maps when running
64// cancel operations.
65type cancelFunc func()
66
67func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV {
68	return newWatchableStore(b, le, ig)
69}
70
71func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore {
72	s := &watchableStore{
73		store:    NewStore(b, le, ig),
74		victimc:  make(chan struct{}, 1),
75		unsynced: newWatcherGroup(),
76		synced:   newWatcherGroup(),
77		stopc:    make(chan struct{}),
78	}
79	if s.le != nil {
80		// use this store as the deleter so revokes trigger watch events
81		s.le.SetRangeDeleter(s)
82	}
83	s.wg.Add(2)
84	go s.syncWatchersLoop()
85	go s.syncVictimsLoop()
86	return s
87}
88
89func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
90	s.mu.Lock()
91	defer s.mu.Unlock()
92
93	rev = s.store.Put(key, value, lease)
94	changes := s.store.getChanges()
95	if len(changes) != 1 {
96		plog.Panicf("unexpected len(changes) != 1 after put")
97	}
98
99	ev := mvccpb.Event{
100		Type: mvccpb.PUT,
101		Kv:   &changes[0],
102	}
103	s.notify(rev, []mvccpb.Event{ev})
104	return rev
105}
106
107func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
108	s.mu.Lock()
109	defer s.mu.Unlock()
110
111	n, rev = s.store.DeleteRange(key, end)
112	changes := s.store.getChanges()
113
114	if len(changes) != int(n) {
115		plog.Panicf("unexpected len(changes) != n after deleteRange")
116	}
117
118	if n == 0 {
119		return n, rev
120	}
121
122	evs := make([]mvccpb.Event, n)
123	for i := range changes {
124		evs[i] = mvccpb.Event{
125			Type: mvccpb.DELETE,
126			Kv:   &changes[i]}
127		evs[i].Kv.ModRevision = rev
128	}
129	s.notify(rev, evs)
130	return n, rev
131}
132
133func (s *watchableStore) TxnBegin() int64 {
134	s.mu.Lock()
135	return s.store.TxnBegin()
136}
137
138func (s *watchableStore) TxnEnd(txnID int64) error {
139	err := s.store.TxnEnd(txnID)
140	if err != nil {
141		return err
142	}
143
144	changes := s.getChanges()
145	if len(changes) == 0 {
146		s.mu.Unlock()
147		return nil
148	}
149
150	rev := s.store.Rev()
151	evs := make([]mvccpb.Event, len(changes))
152	for i, change := range changes {
153		switch change.CreateRevision {
154		case 0:
155			evs[i] = mvccpb.Event{
156				Type: mvccpb.DELETE,
157				Kv:   &changes[i]}
158			evs[i].Kv.ModRevision = rev
159		default:
160			evs[i] = mvccpb.Event{
161				Type: mvccpb.PUT,
162				Kv:   &changes[i]}
163		}
164	}
165
166	s.notify(rev, evs)
167	s.mu.Unlock()
168
169	return nil
170}
171
172func (s *watchableStore) Close() error {
173	close(s.stopc)
174	s.wg.Wait()
175	return s.store.Close()
176}
177
178func (s *watchableStore) NewWatchStream() WatchStream {
179	watchStreamGauge.Inc()
180	return &watchStream{
181		watchable: s,
182		ch:        make(chan WatchResponse, chanBufLen),
183		cancels:   make(map[WatchID]cancelFunc),
184		watchers:  make(map[WatchID]*watcher),
185	}
186}
187
188func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
189	s.mu.Lock()
190	defer s.mu.Unlock()
191
192	wa := &watcher{
193		key:    key,
194		end:    end,
195		minRev: startRev,
196		id:     id,
197		ch:     ch,
198		fcs:    fcs,
199	}
200
201	s.store.mu.Lock()
202	synced := startRev > s.store.currentRev.main || startRev == 0
203	if synced {
204		wa.minRev = s.store.currentRev.main + 1
205		if startRev > wa.minRev {
206			wa.minRev = startRev
207		}
208	}
209	s.store.mu.Unlock()
210	if synced {
211		s.synced.add(wa)
212	} else {
213		slowWatcherGauge.Inc()
214		s.unsynced.add(wa)
215	}
216	watcherGauge.Inc()
217
218	return wa, func() { s.cancelWatcher(wa) }
219}
220
221// cancelWatcher removes references of the watcher from the watchableStore
222func (s *watchableStore) cancelWatcher(wa *watcher) {
223	for {
224		s.mu.Lock()
225
226		if s.unsynced.delete(wa) {
227			slowWatcherGauge.Dec()
228			break
229		} else if s.synced.delete(wa) {
230			break
231		} else if wa.compacted {
232			break
233		}
234
235		if !wa.victim {
236			panic("watcher not victim but not in watch groups")
237		}
238
239		var victimBatch watcherBatch
240		for _, wb := range s.victims {
241			if wb[wa] != nil {
242				victimBatch = wb
243				break
244			}
245		}
246		if victimBatch != nil {
247			slowWatcherGauge.Dec()
248			delete(victimBatch, wa)
249			break
250		}
251
252		// victim being processed so not accessible; retry
253		s.mu.Unlock()
254		time.Sleep(time.Millisecond)
255	}
256
257	watcherGauge.Dec()
258	s.mu.Unlock()
259}
260
261func (s *watchableStore) Restore(b backend.Backend) error {
262	s.mu.Lock()
263	defer s.mu.Unlock()
264	err := s.store.Restore(b)
265	if err != nil {
266		return err
267	}
268
269	for wa := range s.synced.watchers {
270		wa.restore = true
271		s.unsynced.add(wa)
272	}
273	s.synced = newWatcherGroup()
274	return nil
275}
276
277// syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
278func (s *watchableStore) syncWatchersLoop() {
279	defer s.wg.Done()
280
281	for {
282		s.mu.Lock()
283		st := time.Now()
284		lastUnsyncedWatchers := s.unsynced.size()
285		s.syncWatchers()
286		unsyncedWatchers := s.unsynced.size()
287		s.mu.Unlock()
288		syncDuration := time.Since(st)
289
290		waitDuration := 100 * time.Millisecond
291		// more work pending?
292		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
293			// be fair to other store operations by yielding time taken
294			waitDuration = syncDuration
295		}
296
297		select {
298		case <-time.After(waitDuration):
299		case <-s.stopc:
300			return
301		}
302	}
303}
304
305// syncVictimsLoop tries to write precomputed watcher responses to
306// watchers that had a blocked watcher channel
307func (s *watchableStore) syncVictimsLoop() {
308	defer s.wg.Done()
309
310	for {
311		for s.moveVictims() != 0 {
312			// try to update all victim watchers
313		}
314		s.mu.Lock()
315		isEmpty := len(s.victims) == 0
316		s.mu.Unlock()
317
318		var tickc <-chan time.Time
319		if !isEmpty {
320			tickc = time.After(10 * time.Millisecond)
321		}
322
323		select {
324		case <-tickc:
325		case <-s.victimc:
326		case <-s.stopc:
327			return
328		}
329	}
330}
331
332// moveVictims tries to update watches with already pending event data
333func (s *watchableStore) moveVictims() (moved int) {
334	s.mu.Lock()
335	victims := s.victims
336	s.victims = nil
337	s.mu.Unlock()
338
339	var newVictim watcherBatch
340	for _, wb := range victims {
341		// try to send responses again
342		for w, eb := range wb {
343			// watcher has observed the store up to, but not including, w.minRev
344			rev := w.minRev - 1
345			if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
346				pendingEventsGauge.Add(float64(len(eb.evs)))
347			} else {
348				if newVictim == nil {
349					newVictim = make(watcherBatch)
350				}
351				newVictim[w] = eb
352				continue
353			}
354			moved++
355		}
356
357		// assign completed victim watchers to unsync/sync
358		s.mu.Lock()
359		s.store.mu.Lock()
360		curRev := s.store.currentRev.main
361		for w, eb := range wb {
362			if newVictim != nil && newVictim[w] != nil {
363				// couldn't send watch response; stays victim
364				continue
365			}
366			w.victim = false
367			if eb.moreRev != 0 {
368				w.minRev = eb.moreRev
369			}
370			if w.minRev <= curRev {
371				s.unsynced.add(w)
372			} else {
373				slowWatcherGauge.Dec()
374				s.synced.add(w)
375			}
376		}
377		s.store.mu.Unlock()
378		s.mu.Unlock()
379	}
380
381	if len(newVictim) > 0 {
382		s.mu.Lock()
383		s.victims = append(s.victims, newVictim)
384		s.mu.Unlock()
385	}
386
387	return moved
388}
389
390// syncWatchers syncs unsynced watchers by:
391//	1. choose a set of watchers from the unsynced watcher group
392//	2. iterate over the set to get the minimum revision and remove compacted watchers
393//	3. use minimum revision to get all key-value pairs and send those events to watchers
394//	4. remove synced watchers in set from unsynced group and move to synced group
395func (s *watchableStore) syncWatchers() {
396	if s.unsynced.size() == 0 {
397		return
398	}
399
400	s.store.mu.Lock()
401	defer s.store.mu.Unlock()
402
403	// in order to find key-value pairs from unsynced watchers, we need to
404	// find min revision index, and these revisions can be used to
405	// query the backend store of key-value pairs
406	curRev := s.store.currentRev.main
407	compactionRev := s.store.compactMainRev
408	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
409	minBytes, maxBytes := newRevBytes(), newRevBytes()
410	revToBytes(revision{main: minRev}, minBytes)
411	revToBytes(revision{main: curRev + 1}, maxBytes)
412
413	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
414	// values are actual key-value pairs in backend.
415	tx := s.store.b.BatchTx()
416	tx.Lock()
417	revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
418	evs := kvsToEvents(wg, revs, vs)
419	tx.Unlock()
420
421	var victims watcherBatch
422	wb := newWatcherBatch(wg, evs)
423	for w := range wg.watchers {
424		w.minRev = curRev + 1
425
426		eb, ok := wb[w]
427		if !ok {
428			// bring un-notified watcher to synced
429			s.synced.add(w)
430			s.unsynced.delete(w)
431			continue
432		}
433
434		if eb.moreRev != 0 {
435			w.minRev = eb.moreRev
436		}
437
438		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
439			pendingEventsGauge.Add(float64(len(eb.evs)))
440		} else {
441			if victims == nil {
442				victims = make(watcherBatch)
443			}
444			w.victim = true
445		}
446
447		if w.victim {
448			victims[w] = eb
449		} else {
450			if eb.moreRev != 0 {
451				// stay unsynced; more to read
452				continue
453			}
454			s.synced.add(w)
455		}
456		s.unsynced.delete(w)
457	}
458	s.addVictim(victims)
459
460	vsz := 0
461	for _, v := range s.victims {
462		vsz += len(v)
463	}
464	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
465}
466
467// kvsToEvents gets all events for the watchers from all key-value pairs
468func kvsToEvents(wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
469	for i, v := range vals {
470		var kv mvccpb.KeyValue
471		if err := kv.Unmarshal(v); err != nil {
472			plog.Panicf("cannot unmarshal event: %v", err)
473		}
474
475		if !wg.contains(string(kv.Key)) {
476			continue
477		}
478
479		ty := mvccpb.PUT
480		if isTombstone(revs[i]) {
481			ty = mvccpb.DELETE
482			// patch in mod revision so watchers won't skip
483			kv.ModRevision = bytesToRev(revs[i]).main
484		}
485		evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
486	}
487	return evs
488}
489
490// notify notifies the fact that given event at the given rev just happened to
491// watchers that watch on the key of the event.
492func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
493	var victim watcherBatch
494	for w, eb := range newWatcherBatch(&s.synced, evs) {
495		if eb.revs != 1 {
496			plog.Panicf("unexpected multiple revisions in notification")
497		}
498
499		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
500			pendingEventsGauge.Add(float64(len(eb.evs)))
501		} else {
502			// move slow watcher to victims
503			w.minRev = rev + 1
504			if victim == nil {
505				victim = make(watcherBatch)
506			}
507			w.victim = true
508			victim[w] = eb
509			s.synced.delete(w)
510			slowWatcherGauge.Inc()
511		}
512	}
513	s.addVictim(victim)
514}
515
516func (s *watchableStore) addVictim(victim watcherBatch) {
517	if victim == nil {
518		return
519	}
520	s.victims = append(s.victims, victim)
521	select {
522	case s.victimc <- struct{}{}:
523	default:
524	}
525}
526
527func (s *watchableStore) rev() int64 { return s.store.Rev() }
528
529func (s *watchableStore) progress(w *watcher) {
530	s.mu.Lock()
531	defer s.mu.Unlock()
532
533	if _, ok := s.synced.watchers[w]; ok {
534		w.send(WatchResponse{WatchID: w.id, Revision: s.rev()})
535		// If the ch is full, this watcher is receiving events.
536		// We do not need to send progress at all.
537	}
538}
539
540type watcher struct {
541	// the watcher key
542	key []byte
543	// end indicates the end of the range to watch.
544	// If end is set, the watcher is on a range.
545	end []byte
546
547	// victim is set when ch is blocked and undergoing victim processing
548	victim bool
549
550	// compacted is set when the watcher is removed because of compaction
551	compacted bool
552
553	// restore is true when the watcher is being restored from leader snapshot
554	// which means that this watcher has just been moved from "synced" to "unsynced"
555	// watcher group, possibly with a future revision when it was first added
556	// to the synced watcher
557	// "unsynced" watcher revision must always be <= current revision,
558	// except when the watcher were to be moved from "synced" watcher group
559	restore bool
560
561	// minRev is the minimum revision update the watcher will accept
562	minRev int64
563	id     WatchID
564
565	fcs []FilterFunc
566	// a chan to send out the watch response.
567	// The chan might be shared with other watchers.
568	ch chan<- WatchResponse
569}
570
571func (w *watcher) send(wr WatchResponse) bool {
572	progressEvent := len(wr.Events) == 0
573
574	if len(w.fcs) != 0 {
575		ne := make([]mvccpb.Event, 0, len(wr.Events))
576		for i := range wr.Events {
577			filtered := false
578			for _, filter := range w.fcs {
579				if filter(wr.Events[i]) {
580					filtered = true
581					break
582				}
583			}
584			if !filtered {
585				ne = append(ne, wr.Events[i])
586			}
587		}
588		wr.Events = ne
589	}
590
591	// if all events are filtered out, we should send nothing.
592	if !progressEvent && len(wr.Events) == 0 {
593		return true
594	}
595	select {
596	case w.ch <- wr:
597		return true
598	default:
599		return false
600	}
601}
602