1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18	"bytes"
19	"fmt"
20	"os"
21	"reflect"
22	"testing"
23	"time"
24
25	"github.com/coreos/etcd/lease"
26	"github.com/coreos/etcd/mvcc/backend"
27	"github.com/coreos/etcd/mvcc/mvccpb"
28)
29
30// TestWatcherWatchID tests that each watcher provides unique watchID,
31// and the watched event attaches the correct watchID.
32func TestWatcherWatchID(t *testing.T) {
33	b, tmpPath := backend.NewDefaultTmpBackend()
34	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
35	defer cleanup(s, b, tmpPath)
36
37	w := s.NewWatchStream()
38	defer w.Close()
39
40	idm := make(map[WatchID]struct{})
41
42	for i := 0; i < 10; i++ {
43		id := w.Watch([]byte("foo"), nil, 0)
44		if _, ok := idm[id]; ok {
45			t.Errorf("#%d: id %d exists", i, id)
46		}
47		idm[id] = struct{}{}
48
49		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
50
51		resp := <-w.Chan()
52		if resp.WatchID != id {
53			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
54		}
55
56		if err := w.Cancel(id); err != nil {
57			t.Error(err)
58		}
59	}
60
61	s.Put([]byte("foo2"), []byte("bar"), lease.NoLease)
62
63	// unsynced watchers
64	for i := 10; i < 20; i++ {
65		id := w.Watch([]byte("foo2"), nil, 1)
66		if _, ok := idm[id]; ok {
67			t.Errorf("#%d: id %d exists", i, id)
68		}
69		idm[id] = struct{}{}
70
71		resp := <-w.Chan()
72		if resp.WatchID != id {
73			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
74		}
75
76		if err := w.Cancel(id); err != nil {
77			t.Error(err)
78		}
79	}
80}
81
82// TestWatcherWatchPrefix tests if Watch operation correctly watches
83// and returns events with matching prefixes.
84func TestWatcherWatchPrefix(t *testing.T) {
85	b, tmpPath := backend.NewDefaultTmpBackend()
86	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
87	defer cleanup(s, b, tmpPath)
88
89	w := s.NewWatchStream()
90	defer w.Close()
91
92	idm := make(map[WatchID]struct{})
93
94	val := []byte("bar")
95	keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar")
96
97	for i := 0; i < 10; i++ {
98		id := w.Watch(keyWatch, keyEnd, 0)
99		if _, ok := idm[id]; ok {
100			t.Errorf("#%d: unexpected duplicated id %x", i, id)
101		}
102		idm[id] = struct{}{}
103
104		s.Put(keyPut, val, lease.NoLease)
105
106		resp := <-w.Chan()
107		if resp.WatchID != id {
108			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
109		}
110
111		if err := w.Cancel(id); err != nil {
112			t.Errorf("#%d: unexpected cancel error %v", i, err)
113		}
114
115		if len(resp.Events) != 1 {
116			t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
117		}
118		if len(resp.Events) == 1 {
119			if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
120				t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
121			}
122		}
123	}
124
125	keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar")
126	s.Put(keyPut1, val, lease.NoLease)
127
128	// unsynced watchers
129	for i := 10; i < 15; i++ {
130		id := w.Watch(keyWatch1, keyEnd1, 1)
131		if _, ok := idm[id]; ok {
132			t.Errorf("#%d: id %d exists", i, id)
133		}
134		idm[id] = struct{}{}
135
136		resp := <-w.Chan()
137		if resp.WatchID != id {
138			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
139		}
140
141		if err := w.Cancel(id); err != nil {
142			t.Error(err)
143		}
144
145		if len(resp.Events) != 1 {
146			t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
147		}
148		if len(resp.Events) == 1 {
149			if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
150				t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
151			}
152		}
153	}
154}
155
156// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
157// does not create watcher, which panics when canceling in range tree.
158func TestWatcherWatchWrongRange(t *testing.T) {
159	b, tmpPath := backend.NewDefaultTmpBackend()
160	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
161	defer cleanup(s, b, tmpPath)
162
163	w := s.NewWatchStream()
164	defer w.Close()
165
166	if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 {
167		t.Fatalf("key == end range given; id expected -1, got %d", id)
168	}
169	if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 {
170		t.Fatalf("key > end range given; id expected -1, got %d", id)
171	}
172	// watch request with 'WithFromKey' has empty-byte range end
173	if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 {
174		t.Fatalf("\x00 is range given; id expected 0, got %d", id)
175	}
176}
177
178func TestWatchDeleteRange(t *testing.T) {
179	b, tmpPath := backend.NewDefaultTmpBackend()
180	s := newWatchableStore(b, &lease.FakeLessor{}, nil)
181
182	defer func() {
183		s.store.Close()
184		os.Remove(tmpPath)
185	}()
186
187	testKeyPrefix := []byte("foo")
188
189	for i := 0; i < 3; i++ {
190		s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
191	}
192
193	w := s.NewWatchStream()
194	from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
195	w.Watch(from, to, 0)
196
197	s.DeleteRange(from, to)
198
199	we := []mvccpb.Event{
200		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
201		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
202		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
203	}
204
205	select {
206	case r := <-w.Chan():
207		if !reflect.DeepEqual(r.Events, we) {
208			t.Errorf("event = %v, want %v", r.Events, we)
209		}
210	case <-time.After(10 * time.Second):
211		t.Fatal("failed to receive event after 10 seconds!")
212	}
213}
214
215// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
216// with given id inside watchStream.
217func TestWatchStreamCancelWatcherByID(t *testing.T) {
218	b, tmpPath := backend.NewDefaultTmpBackend()
219	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
220	defer cleanup(s, b, tmpPath)
221
222	w := s.NewWatchStream()
223	defer w.Close()
224
225	id := w.Watch([]byte("foo"), nil, 0)
226
227	tests := []struct {
228		cancelID WatchID
229		werr     error
230	}{
231		// no error should be returned when cancel the created watcher.
232		{id, nil},
233		// not exist error should be returned when cancel again.
234		{id, ErrWatcherNotExist},
235		// not exist error should be returned when cancel a bad id.
236		{id + 1, ErrWatcherNotExist},
237	}
238
239	for i, tt := range tests {
240		gerr := w.Cancel(tt.cancelID)
241
242		if gerr != tt.werr {
243			t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
244		}
245	}
246
247	if l := len(w.(*watchStream).cancels); l != 0 {
248		t.Errorf("cancels = %d, want 0", l)
249	}
250}
251
252// TestWatcherRequestProgress ensures synced watcher can correctly
253// report its correct progress.
254func TestWatcherRequestProgress(t *testing.T) {
255	b, tmpPath := backend.NewDefaultTmpBackend()
256
257	// manually create watchableStore instead of newWatchableStore
258	// because newWatchableStore automatically calls syncWatchers
259	// method to sync watchers in unsynced map. We want to keep watchers
260	// in unsynced to test if syncWatchers works as expected.
261	s := &watchableStore{
262		store:    NewStore(b, &lease.FakeLessor{}, nil),
263		unsynced: newWatcherGroup(),
264		synced:   newWatcherGroup(),
265	}
266
267	defer func() {
268		s.store.Close()
269		os.Remove(tmpPath)
270	}()
271
272	testKey := []byte("foo")
273	notTestKey := []byte("bad")
274	testValue := []byte("bar")
275	s.Put(testKey, testValue, lease.NoLease)
276
277	w := s.NewWatchStream()
278
279	badID := WatchID(1000)
280	w.RequestProgress(badID)
281	select {
282	case resp := <-w.Chan():
283		t.Fatalf("unexpected %+v", resp)
284	default:
285	}
286
287	id := w.Watch(notTestKey, nil, 1)
288	w.RequestProgress(id)
289	select {
290	case resp := <-w.Chan():
291		t.Fatalf("unexpected %+v", resp)
292	default:
293	}
294
295	s.syncWatchers()
296
297	w.RequestProgress(id)
298	wrs := WatchResponse{WatchID: 0, Revision: 2}
299	select {
300	case resp := <-w.Chan():
301		if !reflect.DeepEqual(resp, wrs) {
302			t.Fatalf("got %+v, expect %+v", resp, wrs)
303		}
304	case <-time.After(time.Second):
305		t.Fatal("failed to receive progress")
306	}
307}
308
309func TestWatcherWatchWithFilter(t *testing.T) {
310	b, tmpPath := backend.NewDefaultTmpBackend()
311	s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil))
312	defer cleanup(s, b, tmpPath)
313
314	w := s.NewWatchStream()
315	defer w.Close()
316
317	filterPut := func(e mvccpb.Event) bool {
318		return e.Type == mvccpb.PUT
319	}
320
321	w.Watch([]byte("foo"), nil, 0, filterPut)
322	done := make(chan struct{})
323
324	go func() {
325		<-w.Chan()
326		done <- struct{}{}
327	}()
328
329	s.Put([]byte("foo"), []byte("bar"), 0)
330
331	select {
332	case <-done:
333		t.Fatal("failed to filter put request")
334	case <-time.After(100 * time.Millisecond):
335	}
336
337	s.DeleteRange([]byte("foo"), nil)
338
339	select {
340	case <-done:
341	case <-time.After(100 * time.Millisecond):
342		t.Fatal("failed to receive delete request")
343	}
344}
345