1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"bytes"
19	"errors"
20	"sync"
21
22	"github.com/coreos/etcd/mvcc/mvccpb"
23)
24
25var (
26	ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
27)
28
29type WatchID int64
30
31// FilterFunc returns true if the given event should be filtered out.
32type FilterFunc func(e mvccpb.Event) bool
33
34type WatchStream interface {
35	// Watch creates a watcher. The watcher watches the events happening or
36	// happened on the given key or range [key, end) from the given startRev.
37	//
38	// The whole event history can be watched unless compacted.
39	// If `startRev` <=0, watch observes events after currentRev.
40	//
41	// The returned `id` is the ID of this watcher. It appears as WatchID
42	// in events that are sent to the created watcher through stream channel.
43	//
44	Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
45
46	// Chan returns a chan. All watch response will be sent to the returned chan.
47	Chan() <-chan WatchResponse
48
49	// RequestProgress requests the progress of the watcher with given ID. The response
50	// will only be sent if the watcher is currently synced.
51	// The responses will be sent through the WatchRespone Chan attached
52	// with this stream to ensure correct ordering.
53	// The responses contains no events. The revision in the response is the progress
54	// of the watchers since the watcher is currently synced.
55	RequestProgress(id WatchID)
56
57	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
58	// returned.
59	Cancel(id WatchID) error
60
61	// Close closes Chan and release all related resources.
62	Close()
63
64	// Rev returns the current revision of the KV the stream watches on.
65	Rev() int64
66}
67
68type WatchResponse struct {
69	// WatchID is the WatchID of the watcher this response sent to.
70	WatchID WatchID
71
72	// Events contains all the events that needs to send.
73	Events []mvccpb.Event
74
75	// Revision is the revision of the KV when the watchResponse is created.
76	// For a normal response, the revision should be the same as the last
77	// modified revision inside Events. For a delayed response to a unsynced
78	// watcher, the revision is greater than the last modified revision
79	// inside Events.
80	Revision int64
81
82	// CompactRevision is set when the watcher is cancelled due to compaction.
83	CompactRevision int64
84}
85
86// watchStream contains a collection of watchers that share
87// one streaming chan to send out watched events and other control events.
88type watchStream struct {
89	watchable watchable
90	ch        chan WatchResponse
91
92	mu sync.Mutex // guards fields below it
93	// nextID is the ID pre-allocated for next new watcher in this stream
94	nextID   WatchID
95	closed   bool
96	cancels  map[WatchID]cancelFunc
97	watchers map[WatchID]*watcher
98}
99
100// Watch creates a new watcher in the stream and returns its WatchID.
101// TODO: return error if ws is closed?
102func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
103	// prevent wrong range where key >= end lexicographically
104	// watch request with 'WithFromKey' has empty-byte range end
105	if len(end) != 0 && bytes.Compare(key, end) != -1 {
106		return -1
107	}
108
109	ws.mu.Lock()
110	defer ws.mu.Unlock()
111	if ws.closed {
112		return -1
113	}
114
115	id := ws.nextID
116	ws.nextID++
117
118	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
119
120	ws.cancels[id] = c
121	ws.watchers[id] = w
122	return id
123}
124
125func (ws *watchStream) Chan() <-chan WatchResponse {
126	return ws.ch
127}
128
129func (ws *watchStream) Cancel(id WatchID) error {
130	ws.mu.Lock()
131	cancel, ok := ws.cancels[id]
132	ok = ok && !ws.closed
133	if ok {
134		delete(ws.cancels, id)
135		delete(ws.watchers, id)
136	}
137	ws.mu.Unlock()
138	if !ok {
139		return ErrWatcherNotExist
140	}
141	cancel()
142	return nil
143}
144
145func (ws *watchStream) Close() {
146	ws.mu.Lock()
147	defer ws.mu.Unlock()
148
149	for _, cancel := range ws.cancels {
150		cancel()
151	}
152	ws.closed = true
153	close(ws.ch)
154	watchStreamGauge.Dec()
155}
156
157func (ws *watchStream) Rev() int64 {
158	ws.mu.Lock()
159	defer ws.mu.Unlock()
160	return ws.watchable.rev()
161}
162
163func (ws *watchStream) RequestProgress(id WatchID) {
164	ws.mu.Lock()
165	w, ok := ws.watchers[id]
166	ws.mu.Unlock()
167	if !ok {
168		return
169	}
170	ws.watchable.progress(w)
171}
172