1// Copyright 2016 CoreOS, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"fmt"
19	"reflect"
20	"sort"
21	"testing"
22	"time"
23
24	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
25	"github.com/coreos/etcd/clientv3"
26	"github.com/coreos/etcd/etcdserver/api/v3rpc"
27	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
28	"github.com/coreos/etcd/integration"
29	"github.com/coreos/etcd/pkg/testutil"
30	storagepb "github.com/coreos/etcd/storage/storagepb"
31)
32
33type watcherTest func(*testing.T, *watchctx)
34
35type watchctx struct {
36	clus    *integration.ClusterV3
37	w       clientv3.Watcher
38	wclient *clientv3.Client
39	kv      clientv3.KV
40	ch      clientv3.WatchChan
41}
42
43func runWatchTest(t *testing.T, f watcherTest) {
44	defer testutil.AfterTest(t)
45
46	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
47	defer clus.Terminate(t)
48
49	wclient := clus.RandClient()
50	w := clientv3.NewWatcher(wclient)
51	defer w.Close()
52	// select a different client from wclient so puts succeed if
53	// a test knocks out the watcher client
54	kvclient := clus.RandClient()
55	for kvclient == wclient {
56		kvclient = clus.RandClient()
57	}
58	kv := clientv3.NewKV(kvclient)
59
60	wctx := &watchctx{clus, w, wclient, kv, nil}
61	f(t, wctx)
62}
63
64// TestWatchMultiWatcher modifies multiple keys and observes the changes.
65func TestWatchMultiWatcher(t *testing.T) {
66	runWatchTest(t, testWatchMultiWatcher)
67}
68
69func testWatchMultiWatcher(t *testing.T, wctx *watchctx) {
70	numKeyUpdates := 4
71	keys := []string{"foo", "bar", "baz"}
72
73	donec := make(chan struct{})
74	readyc := make(chan struct{})
75	for _, k := range keys {
76		// key watcher
77		go func(key string) {
78			ch := wctx.w.Watch(context.TODO(), key)
79			if ch == nil {
80				t.Fatalf("expected watcher channel, got nil")
81			}
82			readyc <- struct{}{}
83			for i := 0; i < numKeyUpdates; i++ {
84				resp, ok := <-ch
85				if !ok {
86					t.Fatalf("watcher unexpectedly closed")
87				}
88				v := fmt.Sprintf("%s-%d", key, i)
89				gotv := string(resp.Events[0].Kv.Value)
90				if gotv != v {
91					t.Errorf("#%d: got %s, wanted %s", i, gotv, v)
92				}
93			}
94			donec <- struct{}{}
95		}(k)
96	}
97	// prefix watcher on "b" (bar and baz)
98	go func() {
99		prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix())
100		if prefixc == nil {
101			t.Fatalf("expected watcher channel, got nil")
102		}
103		readyc <- struct{}{}
104		evs := []*storagepb.Event{}
105		for i := 0; i < numKeyUpdates*2; i++ {
106			resp, ok := <-prefixc
107			if !ok {
108				t.Fatalf("watcher unexpectedly closed")
109			}
110			evs = append(evs, resp.Events...)
111		}
112
113		// check response
114		expected := []string{}
115		bkeys := []string{"bar", "baz"}
116		for _, k := range bkeys {
117			for i := 0; i < numKeyUpdates; i++ {
118				expected = append(expected, fmt.Sprintf("%s-%d", k, i))
119			}
120		}
121		got := []string{}
122		for _, ev := range evs {
123			got = append(got, string(ev.Kv.Value))
124		}
125		sort.Strings(got)
126		if reflect.DeepEqual(expected, got) == false {
127			t.Errorf("got %v, expected %v", got, expected)
128		}
129
130		// ensure no extra data
131		select {
132		case resp, ok := <-prefixc:
133			if !ok {
134				t.Fatalf("watcher unexpectedly closed")
135			}
136			t.Fatalf("unexpected event %+v", resp)
137		case <-time.After(time.Second):
138		}
139		donec <- struct{}{}
140	}()
141
142	// wait for watcher bring up
143	for i := 0; i < len(keys)+1; i++ {
144		<-readyc
145	}
146	// generate events
147	ctx := context.TODO()
148	for i := 0; i < numKeyUpdates; i++ {
149		for _, k := range keys {
150			v := fmt.Sprintf("%s-%d", k, i)
151			if _, err := wctx.kv.Put(ctx, k, v); err != nil {
152				t.Fatal(err)
153			}
154		}
155	}
156	// wait for watcher shutdown
157	for i := 0; i < len(keys)+1; i++ {
158		<-donec
159	}
160}
161
162// TestWatchRange tests watcher creates ranges
163func TestWatchRange(t *testing.T) {
164	runWatchTest(t, testWatchReconnInit)
165}
166
167func testWatchRange(t *testing.T, wctx *watchctx) {
168	if wctx.ch = wctx.w.Watch(context.TODO(), "a", clientv3.WithRange("c")); wctx.ch == nil {
169		t.Fatalf("expected non-nil channel")
170	}
171	putAndWatch(t, wctx, "a", "a")
172	putAndWatch(t, wctx, "b", "b")
173	putAndWatch(t, wctx, "bar", "bar")
174}
175
176// TestWatchReconnRequest tests the send failure path when requesting a watcher.
177func TestWatchReconnRequest(t *testing.T) {
178	runWatchTest(t, testWatchReconnRequest)
179}
180
181func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
182	donec, stopc := make(chan struct{}), make(chan struct{}, 1)
183	go func() {
184		timer := time.After(2 * time.Second)
185		defer close(donec)
186		// take down watcher connection
187		for {
188			wctx.wclient.ActiveConnection().Close()
189			select {
190			case <-timer:
191				// spinning on close may live lock reconnection
192				return
193			case <-stopc:
194				return
195			default:
196			}
197		}
198	}()
199	// should reconnect when requesting watch
200	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
201		t.Fatalf("expected non-nil channel")
202	}
203
204	// wait for disconnections to stop
205	stopc <- struct{}{}
206	<-donec
207
208	// ensure watcher works
209	putAndWatch(t, wctx, "a", "a")
210}
211
212// TestWatchReconnInit tests watcher resumes correctly if connection lost
213// before any data was sent.
214func TestWatchReconnInit(t *testing.T) {
215	runWatchTest(t, testWatchReconnInit)
216}
217
218func testWatchReconnInit(t *testing.T, wctx *watchctx) {
219	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
220		t.Fatalf("expected non-nil channel")
221	}
222	// take down watcher connection
223	wctx.wclient.ActiveConnection().Close()
224	// watcher should recover
225	putAndWatch(t, wctx, "a", "a")
226}
227
228// TestWatchReconnRunning tests watcher resumes correctly if connection lost
229// after data was sent.
230func TestWatchReconnRunning(t *testing.T) {
231	runWatchTest(t, testWatchReconnRunning)
232}
233
234func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
235	if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
236		t.Fatalf("expected non-nil channel")
237	}
238	putAndWatch(t, wctx, "a", "a")
239	// take down watcher connection
240	wctx.wclient.ActiveConnection().Close()
241	// watcher should recover
242	putAndWatch(t, wctx, "a", "b")
243}
244
245// TestWatchCancelImmediate ensures a closed channel is returned
246// if the context is cancelled.
247func TestWatchCancelImmediate(t *testing.T) {
248	runWatchTest(t, testWatchCancelImmediate)
249}
250
251func testWatchCancelImmediate(t *testing.T, wctx *watchctx) {
252	ctx, cancel := context.WithCancel(context.Background())
253	cancel()
254	wch := wctx.w.Watch(ctx, "a")
255	select {
256	case wresp, ok := <-wch:
257		if ok {
258			t.Fatalf("read wch got %v; expected closed channel", wresp)
259		}
260	default:
261		t.Fatalf("closed watcher channel should not block")
262	}
263}
264
265// TestWatchCancelInit tests watcher closes correctly after no events.
266func TestWatchCancelInit(t *testing.T) {
267	runWatchTest(t, testWatchCancelInit)
268}
269
270func testWatchCancelInit(t *testing.T, wctx *watchctx) {
271	ctx, cancel := context.WithCancel(context.Background())
272	if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
273		t.Fatalf("expected non-nil watcher channel")
274	}
275	cancel()
276	select {
277	case <-time.After(time.Second):
278		t.Fatalf("took too long to cancel")
279	case _, ok := <-wctx.ch:
280		if ok {
281			t.Fatalf("expected watcher channel to close")
282		}
283	}
284}
285
286// TestWatchCancelRunning tests watcher closes correctly after events.
287func TestWatchCancelRunning(t *testing.T) {
288	runWatchTest(t, testWatchCancelRunning)
289}
290
291func testWatchCancelRunning(t *testing.T, wctx *watchctx) {
292	ctx, cancel := context.WithCancel(context.Background())
293	if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil {
294		t.Fatalf("expected non-nil watcher channel")
295	}
296	if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil {
297		t.Fatal(err)
298	}
299	cancel()
300	select {
301	case <-time.After(time.Second):
302		t.Fatalf("took too long to cancel")
303	case v, ok := <-wctx.ch:
304		if !ok {
305			// closed before getting put; OK
306			break
307		}
308		// got the PUT; should close next
309		select {
310		case <-time.After(time.Second):
311			t.Fatalf("took too long to close")
312		case v, ok = <-wctx.ch:
313			if ok {
314				t.Fatalf("expected watcher channel to close, got %v", v)
315			}
316		}
317	}
318}
319
320func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
321	if _, err := wctx.kv.Put(context.TODO(), key, val); err != nil {
322		t.Fatal(err)
323	}
324	select {
325	case <-time.After(5 * time.Second):
326		t.Fatalf("watch timed out")
327	case v, ok := <-wctx.ch:
328		if !ok {
329			t.Fatalf("unexpected watch close")
330		}
331		if string(v.Events[0].Kv.Value) != val {
332			t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val)
333		}
334	}
335}
336
337// TestWatchCompactRevision ensures the CompactRevision error is given on a
338// compaction event ahead of a watcher.
339func TestWatchCompactRevision(t *testing.T) {
340	defer testutil.AfterTest(t)
341
342	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
343	defer clus.Terminate(t)
344
345	// set some keys
346	kv := clientv3.NewKV(clus.RandClient())
347	for i := 0; i < 5; i++ {
348		if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
349			t.Fatal(err)
350		}
351	}
352
353	w := clientv3.NewWatcher(clus.RandClient())
354	defer w.Close()
355
356	if err := kv.Compact(context.TODO(), 4); err != nil {
357		t.Fatal(err)
358	}
359	wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2))
360
361	// get compacted error message
362	wresp, ok := <-wch
363	if !ok {
364		t.Fatalf("expected wresp, but got closed channel")
365	}
366	if wresp.Err() != rpctypes.ErrCompacted {
367		t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err())
368	}
369
370	// ensure the channel is closed
371	if wresp, ok = <-wch; ok {
372		t.Fatalf("expected closed channel, but got %v", wresp)
373	}
374}
375
376func TestWatchWithProgressNotify(t *testing.T)        { testWatchWithProgressNotify(t, true) }
377func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) }
378
379func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
380	defer testutil.AfterTest(t)
381
382	clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
383	defer clus.Terminate(t)
384
385	wc := clientv3.NewWatcher(clus.RandClient())
386	defer wc.Close()
387
388	testInterval := 3 * time.Second
389	pi := v3rpc.ProgressReportInterval
390	v3rpc.ProgressReportInterval = testInterval
391	defer func() { v3rpc.ProgressReportInterval = pi }()
392
393	opts := []clientv3.OpOption{clientv3.WithProgressNotify()}
394	if watchOnPut {
395		opts = append(opts, clientv3.WithPrefix())
396	}
397	rch := wc.Watch(context.Background(), "foo", opts...)
398
399	select {
400	case resp := <-rch: // wait for notification
401		if len(resp.Events) != 0 {
402			t.Fatalf("resp.Events expected none, got %+v", resp.Events)
403		}
404	case <-time.After(2 * pi):
405		t.Fatalf("watch response expected in %v, but timed out", pi)
406	}
407
408	kvc := clientv3.NewKV(clus.RandClient())
409	if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil {
410		t.Fatal(err)
411	}
412
413	select {
414	case resp := <-rch:
415		if resp.Header.Revision != 2 {
416			t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision)
417		}
418		if watchOnPut { // wait for put if watch on the put key
419			ev := []*storagepb.Event{{Type: storagepb.PUT,
420				Kv: &storagepb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}}
421			if !reflect.DeepEqual(ev, resp.Events) {
422				t.Fatalf("expected %+v, got %+v", ev, resp.Events)
423			}
424		} else if len(resp.Events) != 0 { // wait for notification otherwise
425			t.Fatalf("expected no events, but got %+v", resp.Events)
426		}
427	case <-time.After(2 * pi):
428		t.Fatalf("watch response expected in %v, but timed out", pi)
429	}
430}
431