1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package store
16
17import (
18	"container/list"
19	"path"
20	"strings"
21	"sync"
22	"sync/atomic"
23
24	etcdErr "github.com/coreos/etcd/error"
25)
26
27// A watcherHub contains all subscribed watchers
28// watchers is a map with watched path as key and watcher as value
29// EventHistory keeps the old events for watcherHub. It is used to help
30// watcher to get a continuous event history. Or a watcher might miss the
31// event happens between the end of the first watch command and the start
32// of the second command.
33type watcherHub struct {
34	// count must be the first element to keep 64-bit alignment for atomic
35	// access
36
37	count int64 // current number of watchers.
38
39	mutex        sync.Mutex
40	watchers     map[string]*list.List
41	EventHistory *EventHistory
42}
43
44// newWatchHub creates a watcherHub. The capacity determines how many events we will
45// keep in the eventHistory.
46// Typically, we only need to keep a small size of history[smaller than 20K].
47// Ideally, it should smaller than 20K/s[max throughput] * 2 * 50ms[RTT] = 2000
48func newWatchHub(capacity int) *watcherHub {
49	return &watcherHub{
50		watchers:     make(map[string]*list.List),
51		EventHistory: newEventHistory(capacity),
52	}
53}
54
55// Watch function returns a Watcher.
56// If recursive is true, the first change after index under key will be sent to the event channel of the watcher.
57// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
58// If index is zero, watch will start from the current index + 1.
59func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
60	reportWatchRequest()
61	event, err := wh.EventHistory.scan(key, recursive, index)
62
63	if err != nil {
64		err.Index = storeIndex
65		return nil, err
66	}
67
68	w := &watcher{
69		eventChan:  make(chan *Event, 100), // use a buffered channel
70		recursive:  recursive,
71		stream:     stream,
72		sinceIndex: index,
73		startIndex: storeIndex,
74		hub:        wh,
75	}
76
77	wh.mutex.Lock()
78	defer wh.mutex.Unlock()
79	// If the event exists in the known history, append the EtcdIndex and return immediately
80	if event != nil {
81		ne := event.Clone()
82		ne.EtcdIndex = storeIndex
83		w.eventChan <- ne
84		return w, nil
85	}
86
87	l, ok := wh.watchers[key]
88
89	var elem *list.Element
90
91	if ok { // add the new watcher to the back of the list
92		elem = l.PushBack(w)
93	} else { // create a new list and add the new watcher
94		l = list.New()
95		elem = l.PushBack(w)
96		wh.watchers[key] = l
97	}
98
99	w.remove = func() {
100		if w.removed { // avoid removing it twice
101			return
102		}
103		w.removed = true
104		l.Remove(elem)
105		atomic.AddInt64(&wh.count, -1)
106		reportWatcherRemoved()
107		if l.Len() == 0 {
108			delete(wh.watchers, key)
109		}
110	}
111
112	atomic.AddInt64(&wh.count, 1)
113	reportWatcherAdded()
114
115	return w, nil
116}
117
118func (wh *watcherHub) add(e *Event) {
119	wh.EventHistory.addEvent(e)
120}
121
122// notify function accepts an event and notify to the watchers.
123func (wh *watcherHub) notify(e *Event) {
124	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
125
126	segments := strings.Split(e.Node.Key, "/")
127
128	currPath := "/"
129
130	// walk through all the segments of the path and notify the watchers
131	// if the path is "/foo/bar", it will notify watchers with path "/",
132	// "/foo" and "/foo/bar"
133
134	for _, segment := range segments {
135		currPath = path.Join(currPath, segment)
136		// notify the watchers who interests in the changes of current path
137		wh.notifyWatchers(e, currPath, false)
138	}
139}
140
141func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
142	wh.mutex.Lock()
143	defer wh.mutex.Unlock()
144
145	l, ok := wh.watchers[nodePath]
146	if ok {
147		curr := l.Front()
148
149		for curr != nil {
150			next := curr.Next() // save reference to the next one in the list
151
152			w, _ := curr.Value.(*watcher)
153
154			originalPath := (e.Node.Key == nodePath)
155			if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
156				if !w.stream { // do not remove the stream watcher
157					// if we successfully notify a watcher
158					// we need to remove the watcher from the list
159					// and decrease the counter
160					w.removed = true
161					l.Remove(curr)
162					atomic.AddInt64(&wh.count, -1)
163					reportWatcherRemoved()
164				}
165			}
166
167			curr = next // update current to the next element in the list
168		}
169
170		if l.Len() == 0 {
171			// if we have notified all watcher in the list
172			// we can delete the list
173			delete(wh.watchers, nodePath)
174		}
175	}
176}
177
178// clone function clones the watcherHub and return the cloned one.
179// only clone the static content. do not clone the current watchers.
180func (wh *watcherHub) clone() *watcherHub {
181	clonedHistory := wh.EventHistory.clone()
182
183	return &watcherHub{
184		EventHistory: clonedHistory,
185	}
186}
187
188// isHidden checks to see if key path is considered hidden to watch path i.e. the
189// last element is hidden or it's within a hidden directory
190func isHidden(watchPath, keyPath string) bool {
191	// When deleting a directory, watchPath might be deeper than the actual keyPath
192	// For example, when deleting /foo we also need to notify watchers on /foo/bar.
193	if len(watchPath) > len(keyPath) {
194		return false
195	}
196	// if watch path is just a "/", after path will start without "/"
197	// add a "/" to deal with the special case when watchPath is "/"
198	afterPath := path.Clean("/" + keyPath[len(watchPath):])
199	return strings.Contains(afterPath, "/_")
200}
201