1// Copyright 2016 CoreOS, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "fmt" 19 "reflect" 20 "sort" 21 "testing" 22 "time" 23 24 "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" 25 "github.com/coreos/etcd/clientv3" 26 "github.com/coreos/etcd/etcdserver/api/v3rpc" 27 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" 28 "github.com/coreos/etcd/integration" 29 "github.com/coreos/etcd/pkg/testutil" 30 storagepb "github.com/coreos/etcd/storage/storagepb" 31) 32 33type watcherTest func(*testing.T, *watchctx) 34 35type watchctx struct { 36 clus *integration.ClusterV3 37 w clientv3.Watcher 38 wclient *clientv3.Client 39 kv clientv3.KV 40 ch clientv3.WatchChan 41} 42 43func runWatchTest(t *testing.T, f watcherTest) { 44 defer testutil.AfterTest(t) 45 46 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 47 defer clus.Terminate(t) 48 49 wclient := clus.RandClient() 50 w := clientv3.NewWatcher(wclient) 51 defer w.Close() 52 // select a different client from wclient so puts succeed if 53 // a test knocks out the watcher client 54 kvclient := clus.RandClient() 55 for kvclient == wclient { 56 kvclient = clus.RandClient() 57 } 58 kv := clientv3.NewKV(kvclient) 59 60 wctx := &watchctx{clus, w, wclient, kv, nil} 61 f(t, wctx) 62} 63 64// TestWatchMultiWatcher modifies multiple keys and observes the changes. 65func TestWatchMultiWatcher(t *testing.T) { 66 runWatchTest(t, testWatchMultiWatcher) 67} 68 69func testWatchMultiWatcher(t *testing.T, wctx *watchctx) { 70 numKeyUpdates := 4 71 keys := []string{"foo", "bar", "baz"} 72 73 donec := make(chan struct{}) 74 readyc := make(chan struct{}) 75 for _, k := range keys { 76 // key watcher 77 go func(key string) { 78 ch := wctx.w.Watch(context.TODO(), key) 79 if ch == nil { 80 t.Fatalf("expected watcher channel, got nil") 81 } 82 readyc <- struct{}{} 83 for i := 0; i < numKeyUpdates; i++ { 84 resp, ok := <-ch 85 if !ok { 86 t.Fatalf("watcher unexpectedly closed") 87 } 88 v := fmt.Sprintf("%s-%d", key, i) 89 gotv := string(resp.Events[0].Kv.Value) 90 if gotv != v { 91 t.Errorf("#%d: got %s, wanted %s", i, gotv, v) 92 } 93 } 94 donec <- struct{}{} 95 }(k) 96 } 97 // prefix watcher on "b" (bar and baz) 98 go func() { 99 prefixc := wctx.w.Watch(context.TODO(), "b", clientv3.WithPrefix()) 100 if prefixc == nil { 101 t.Fatalf("expected watcher channel, got nil") 102 } 103 readyc <- struct{}{} 104 evs := []*storagepb.Event{} 105 for i := 0; i < numKeyUpdates*2; i++ { 106 resp, ok := <-prefixc 107 if !ok { 108 t.Fatalf("watcher unexpectedly closed") 109 } 110 evs = append(evs, resp.Events...) 111 } 112 113 // check response 114 expected := []string{} 115 bkeys := []string{"bar", "baz"} 116 for _, k := range bkeys { 117 for i := 0; i < numKeyUpdates; i++ { 118 expected = append(expected, fmt.Sprintf("%s-%d", k, i)) 119 } 120 } 121 got := []string{} 122 for _, ev := range evs { 123 got = append(got, string(ev.Kv.Value)) 124 } 125 sort.Strings(got) 126 if reflect.DeepEqual(expected, got) == false { 127 t.Errorf("got %v, expected %v", got, expected) 128 } 129 130 // ensure no extra data 131 select { 132 case resp, ok := <-prefixc: 133 if !ok { 134 t.Fatalf("watcher unexpectedly closed") 135 } 136 t.Fatalf("unexpected event %+v", resp) 137 case <-time.After(time.Second): 138 } 139 donec <- struct{}{} 140 }() 141 142 // wait for watcher bring up 143 for i := 0; i < len(keys)+1; i++ { 144 <-readyc 145 } 146 // generate events 147 ctx := context.TODO() 148 for i := 0; i < numKeyUpdates; i++ { 149 for _, k := range keys { 150 v := fmt.Sprintf("%s-%d", k, i) 151 if _, err := wctx.kv.Put(ctx, k, v); err != nil { 152 t.Fatal(err) 153 } 154 } 155 } 156 // wait for watcher shutdown 157 for i := 0; i < len(keys)+1; i++ { 158 <-donec 159 } 160} 161 162// TestWatchRange tests watcher creates ranges 163func TestWatchRange(t *testing.T) { 164 runWatchTest(t, testWatchReconnInit) 165} 166 167func testWatchRange(t *testing.T, wctx *watchctx) { 168 if wctx.ch = wctx.w.Watch(context.TODO(), "a", clientv3.WithRange("c")); wctx.ch == nil { 169 t.Fatalf("expected non-nil channel") 170 } 171 putAndWatch(t, wctx, "a", "a") 172 putAndWatch(t, wctx, "b", "b") 173 putAndWatch(t, wctx, "bar", "bar") 174} 175 176// TestWatchReconnRequest tests the send failure path when requesting a watcher. 177func TestWatchReconnRequest(t *testing.T) { 178 runWatchTest(t, testWatchReconnRequest) 179} 180 181func testWatchReconnRequest(t *testing.T, wctx *watchctx) { 182 donec, stopc := make(chan struct{}), make(chan struct{}, 1) 183 go func() { 184 timer := time.After(2 * time.Second) 185 defer close(donec) 186 // take down watcher connection 187 for { 188 wctx.wclient.ActiveConnection().Close() 189 select { 190 case <-timer: 191 // spinning on close may live lock reconnection 192 return 193 case <-stopc: 194 return 195 default: 196 } 197 } 198 }() 199 // should reconnect when requesting watch 200 if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { 201 t.Fatalf("expected non-nil channel") 202 } 203 204 // wait for disconnections to stop 205 stopc <- struct{}{} 206 <-donec 207 208 // ensure watcher works 209 putAndWatch(t, wctx, "a", "a") 210} 211 212// TestWatchReconnInit tests watcher resumes correctly if connection lost 213// before any data was sent. 214func TestWatchReconnInit(t *testing.T) { 215 runWatchTest(t, testWatchReconnInit) 216} 217 218func testWatchReconnInit(t *testing.T, wctx *watchctx) { 219 if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { 220 t.Fatalf("expected non-nil channel") 221 } 222 // take down watcher connection 223 wctx.wclient.ActiveConnection().Close() 224 // watcher should recover 225 putAndWatch(t, wctx, "a", "a") 226} 227 228// TestWatchReconnRunning tests watcher resumes correctly if connection lost 229// after data was sent. 230func TestWatchReconnRunning(t *testing.T) { 231 runWatchTest(t, testWatchReconnRunning) 232} 233 234func testWatchReconnRunning(t *testing.T, wctx *watchctx) { 235 if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { 236 t.Fatalf("expected non-nil channel") 237 } 238 putAndWatch(t, wctx, "a", "a") 239 // take down watcher connection 240 wctx.wclient.ActiveConnection().Close() 241 // watcher should recover 242 putAndWatch(t, wctx, "a", "b") 243} 244 245// TestWatchCancelImmediate ensures a closed channel is returned 246// if the context is cancelled. 247func TestWatchCancelImmediate(t *testing.T) { 248 runWatchTest(t, testWatchCancelImmediate) 249} 250 251func testWatchCancelImmediate(t *testing.T, wctx *watchctx) { 252 ctx, cancel := context.WithCancel(context.Background()) 253 cancel() 254 wch := wctx.w.Watch(ctx, "a") 255 select { 256 case wresp, ok := <-wch: 257 if ok { 258 t.Fatalf("read wch got %v; expected closed channel", wresp) 259 } 260 default: 261 t.Fatalf("closed watcher channel should not block") 262 } 263} 264 265// TestWatchCancelInit tests watcher closes correctly after no events. 266func TestWatchCancelInit(t *testing.T) { 267 runWatchTest(t, testWatchCancelInit) 268} 269 270func testWatchCancelInit(t *testing.T, wctx *watchctx) { 271 ctx, cancel := context.WithCancel(context.Background()) 272 if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil { 273 t.Fatalf("expected non-nil watcher channel") 274 } 275 cancel() 276 select { 277 case <-time.After(time.Second): 278 t.Fatalf("took too long to cancel") 279 case _, ok := <-wctx.ch: 280 if ok { 281 t.Fatalf("expected watcher channel to close") 282 } 283 } 284} 285 286// TestWatchCancelRunning tests watcher closes correctly after events. 287func TestWatchCancelRunning(t *testing.T) { 288 runWatchTest(t, testWatchCancelRunning) 289} 290 291func testWatchCancelRunning(t *testing.T, wctx *watchctx) { 292 ctx, cancel := context.WithCancel(context.Background()) 293 if wctx.ch = wctx.w.Watch(ctx, "a"); wctx.ch == nil { 294 t.Fatalf("expected non-nil watcher channel") 295 } 296 if _, err := wctx.kv.Put(ctx, "a", "a"); err != nil { 297 t.Fatal(err) 298 } 299 cancel() 300 select { 301 case <-time.After(time.Second): 302 t.Fatalf("took too long to cancel") 303 case v, ok := <-wctx.ch: 304 if !ok { 305 // closed before getting put; OK 306 break 307 } 308 // got the PUT; should close next 309 select { 310 case <-time.After(time.Second): 311 t.Fatalf("took too long to close") 312 case v, ok = <-wctx.ch: 313 if ok { 314 t.Fatalf("expected watcher channel to close, got %v", v) 315 } 316 } 317 } 318} 319 320func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { 321 if _, err := wctx.kv.Put(context.TODO(), key, val); err != nil { 322 t.Fatal(err) 323 } 324 select { 325 case <-time.After(5 * time.Second): 326 t.Fatalf("watch timed out") 327 case v, ok := <-wctx.ch: 328 if !ok { 329 t.Fatalf("unexpected watch close") 330 } 331 if string(v.Events[0].Kv.Value) != val { 332 t.Fatalf("bad value got %v, wanted %v", v.Events[0].Kv.Value, val) 333 } 334 } 335} 336 337// TestWatchCompactRevision ensures the CompactRevision error is given on a 338// compaction event ahead of a watcher. 339func TestWatchCompactRevision(t *testing.T) { 340 defer testutil.AfterTest(t) 341 342 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 343 defer clus.Terminate(t) 344 345 // set some keys 346 kv := clientv3.NewKV(clus.RandClient()) 347 for i := 0; i < 5; i++ { 348 if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil { 349 t.Fatal(err) 350 } 351 } 352 353 w := clientv3.NewWatcher(clus.RandClient()) 354 defer w.Close() 355 356 if err := kv.Compact(context.TODO(), 4); err != nil { 357 t.Fatal(err) 358 } 359 wch := w.Watch(context.Background(), "foo", clientv3.WithRev(2)) 360 361 // get compacted error message 362 wresp, ok := <-wch 363 if !ok { 364 t.Fatalf("expected wresp, but got closed channel") 365 } 366 if wresp.Err() != rpctypes.ErrCompacted { 367 t.Fatalf("wresp.Err() expected ErrCompacteed, but got %v", wresp.Err()) 368 } 369 370 // ensure the channel is closed 371 if wresp, ok = <-wch; ok { 372 t.Fatalf("expected closed channel, but got %v", wresp) 373 } 374} 375 376func TestWatchWithProgressNotify(t *testing.T) { testWatchWithProgressNotify(t, true) } 377func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) } 378 379func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { 380 defer testutil.AfterTest(t) 381 382 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 383 defer clus.Terminate(t) 384 385 wc := clientv3.NewWatcher(clus.RandClient()) 386 defer wc.Close() 387 388 testInterval := 3 * time.Second 389 pi := v3rpc.ProgressReportInterval 390 v3rpc.ProgressReportInterval = testInterval 391 defer func() { v3rpc.ProgressReportInterval = pi }() 392 393 opts := []clientv3.OpOption{clientv3.WithProgressNotify()} 394 if watchOnPut { 395 opts = append(opts, clientv3.WithPrefix()) 396 } 397 rch := wc.Watch(context.Background(), "foo", opts...) 398 399 select { 400 case resp := <-rch: // wait for notification 401 if len(resp.Events) != 0 { 402 t.Fatalf("resp.Events expected none, got %+v", resp.Events) 403 } 404 case <-time.After(2 * pi): 405 t.Fatalf("watch response expected in %v, but timed out", pi) 406 } 407 408 kvc := clientv3.NewKV(clus.RandClient()) 409 if _, err := kvc.Put(context.TODO(), "foox", "bar"); err != nil { 410 t.Fatal(err) 411 } 412 413 select { 414 case resp := <-rch: 415 if resp.Header.Revision != 2 { 416 t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision) 417 } 418 if watchOnPut { // wait for put if watch on the put key 419 ev := []*storagepb.Event{{Type: storagepb.PUT, 420 Kv: &storagepb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}}} 421 if !reflect.DeepEqual(ev, resp.Events) { 422 t.Fatalf("expected %+v, got %+v", ev, resp.Events) 423 } 424 } else if len(resp.Events) != 0 { // wait for notification otherwise 425 t.Fatalf("expected no events, but got %+v", resp.Events) 426 } 427 case <-time.After(2 * pi): 428 t.Fatalf("watch response expected in %v, but timed out", pi) 429 } 430} 431