1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "bytes" 19 "context" 20 "fmt" 21 "reflect" 22 "sort" 23 "sync" 24 "testing" 25 "time" 26 27 "github.com/coreos/etcd/etcdserver/api/v3rpc" 28 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 29 "github.com/coreos/etcd/mvcc/mvccpb" 30 "github.com/coreos/etcd/pkg/testutil" 31) 32 33// TestV3WatchFromCurrentRevision tests Watch APIs from current revision. 34func TestV3WatchFromCurrentRevision(t *testing.T) { 35 defer testutil.AfterTest(t) 36 tests := []struct { 37 putKeys []string 38 watchRequest *pb.WatchRequest 39 40 wresps []*pb.WatchResponse 41 }{ 42 // watch the key, matching 43 { 44 []string{"foo"}, 45 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 46 CreateRequest: &pb.WatchCreateRequest{ 47 Key: []byte("foo")}}}, 48 49 []*pb.WatchResponse{ 50 { 51 Header: &pb.ResponseHeader{Revision: 2}, 52 Created: false, 53 Events: []*mvccpb.Event{ 54 { 55 Type: mvccpb.PUT, 56 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 57 }, 58 }, 59 }, 60 }, 61 }, 62 // watch the key, non-matching 63 { 64 []string{"foo"}, 65 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 66 CreateRequest: &pb.WatchCreateRequest{ 67 Key: []byte("helloworld")}}}, 68 69 []*pb.WatchResponse{}, 70 }, 71 // watch the prefix, matching 72 { 73 []string{"fooLong"}, 74 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 75 CreateRequest: &pb.WatchCreateRequest{ 76 Key: []byte("foo"), 77 RangeEnd: []byte("fop")}}}, 78 79 []*pb.WatchResponse{ 80 { 81 Header: &pb.ResponseHeader{Revision: 2}, 82 Created: false, 83 Events: []*mvccpb.Event{ 84 { 85 Type: mvccpb.PUT, 86 Kv: &mvccpb.KeyValue{Key: []byte("fooLong"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 87 }, 88 }, 89 }, 90 }, 91 }, 92 // watch the prefix, non-matching 93 { 94 []string{"foo"}, 95 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 96 CreateRequest: &pb.WatchCreateRequest{ 97 Key: []byte("helloworld"), 98 RangeEnd: []byte("helloworle")}}}, 99 100 []*pb.WatchResponse{}, 101 }, 102 // watch full range, matching 103 { 104 []string{"fooLong"}, 105 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 106 CreateRequest: &pb.WatchCreateRequest{ 107 Key: []byte(""), 108 RangeEnd: []byte("\x00")}}}, 109 110 []*pb.WatchResponse{ 111 { 112 Header: &pb.ResponseHeader{Revision: 2}, 113 Created: false, 114 Events: []*mvccpb.Event{ 115 { 116 Type: mvccpb.PUT, 117 Kv: &mvccpb.KeyValue{Key: []byte("fooLong"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 118 }, 119 }, 120 }, 121 }, 122 }, 123 // multiple puts, one watcher with matching key 124 { 125 []string{"foo", "foo", "foo"}, 126 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 127 CreateRequest: &pb.WatchCreateRequest{ 128 Key: []byte("foo")}}}, 129 130 []*pb.WatchResponse{ 131 { 132 Header: &pb.ResponseHeader{Revision: 2}, 133 Created: false, 134 Events: []*mvccpb.Event{ 135 { 136 Type: mvccpb.PUT, 137 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 138 }, 139 }, 140 }, 141 { 142 Header: &pb.ResponseHeader{Revision: 3}, 143 Created: false, 144 Events: []*mvccpb.Event{ 145 { 146 Type: mvccpb.PUT, 147 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 3, Version: 2}, 148 }, 149 }, 150 }, 151 { 152 Header: &pb.ResponseHeader{Revision: 4}, 153 Created: false, 154 Events: []*mvccpb.Event{ 155 { 156 Type: mvccpb.PUT, 157 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 3}, 158 }, 159 }, 160 }, 161 }, 162 }, 163 // multiple puts, one watcher with matching prefix 164 { 165 []string{"foo", "foo", "foo"}, 166 &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 167 CreateRequest: &pb.WatchCreateRequest{ 168 Key: []byte("foo"), 169 RangeEnd: []byte("fop")}}}, 170 171 []*pb.WatchResponse{ 172 { 173 Header: &pb.ResponseHeader{Revision: 2}, 174 Created: false, 175 Events: []*mvccpb.Event{ 176 { 177 Type: mvccpb.PUT, 178 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 179 }, 180 }, 181 }, 182 { 183 Header: &pb.ResponseHeader{Revision: 3}, 184 Created: false, 185 Events: []*mvccpb.Event{ 186 { 187 Type: mvccpb.PUT, 188 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 3, Version: 2}, 189 }, 190 }, 191 }, 192 { 193 Header: &pb.ResponseHeader{Revision: 4}, 194 Created: false, 195 Events: []*mvccpb.Event{ 196 { 197 Type: mvccpb.PUT, 198 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 3}, 199 }, 200 }, 201 }, 202 }, 203 }, 204 } 205 206 for i, tt := range tests { 207 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 208 209 wAPI := toGRPC(clus.RandClient()).Watch 210 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 211 defer cancel() 212 wStream, err := wAPI.Watch(ctx) 213 if err != nil { 214 t.Fatalf("#%d: wAPI.Watch error: %v", i, err) 215 } 216 217 err = wStream.Send(tt.watchRequest) 218 if err != nil { 219 t.Fatalf("#%d: wStream.Send error: %v", i, err) 220 } 221 222 // ensure watcher request created a new watcher 223 cresp, err := wStream.Recv() 224 if err != nil { 225 t.Errorf("#%d: wStream.Recv error: %v", i, err) 226 clus.Terminate(t) 227 continue 228 } 229 if !cresp.Created { 230 t.Errorf("#%d: did not create watchid, got %+v", i, cresp) 231 clus.Terminate(t) 232 continue 233 } 234 if cresp.Canceled { 235 t.Errorf("#%d: canceled watcher on create %+v", i, cresp) 236 clus.Terminate(t) 237 continue 238 } 239 240 createdWatchId := cresp.WatchId 241 if cresp.Header == nil || cresp.Header.Revision != 1 { 242 t.Errorf("#%d: header revision got +%v, wanted revison 1", i, cresp) 243 clus.Terminate(t) 244 continue 245 } 246 247 // asynchronously create keys 248 go func() { 249 for _, k := range tt.putKeys { 250 kvc := toGRPC(clus.RandClient()).KV 251 req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")} 252 if _, err := kvc.Put(context.TODO(), req); err != nil { 253 t.Fatalf("#%d: couldn't put key (%v)", i, err) 254 } 255 } 256 }() 257 258 // check stream results 259 for j, wresp := range tt.wresps { 260 resp, err := wStream.Recv() 261 if err != nil { 262 t.Errorf("#%d.%d: wStream.Recv error: %v", i, j, err) 263 } 264 265 if resp.Header == nil { 266 t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j) 267 } 268 if resp.Header.Revision != wresp.Header.Revision { 269 t.Errorf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision) 270 } 271 272 if wresp.Created != resp.Created { 273 t.Errorf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created) 274 } 275 if resp.WatchId != createdWatchId { 276 t.Errorf("#%d.%d: resp.WatchId got = %d, want = %d", i, j, resp.WatchId, createdWatchId) 277 } 278 279 if !reflect.DeepEqual(resp.Events, wresp.Events) { 280 t.Errorf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events) 281 } 282 } 283 284 rok, nr := waitResponse(wStream, 1*time.Second) 285 if !rok { 286 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 287 } 288 289 // can't defer because tcp ports will be in use 290 clus.Terminate(t) 291 } 292} 293 294// TestV3WatchFutureRevision tests Watch APIs from a future revision. 295func TestV3WatchFutureRevision(t *testing.T) { 296 defer testutil.AfterTest(t) 297 298 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 299 defer clus.Terminate(t) 300 301 wAPI := toGRPC(clus.RandClient()).Watch 302 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 303 defer cancel() 304 wStream, err := wAPI.Watch(ctx) 305 if err != nil { 306 t.Fatalf("wAPI.Watch error: %v", err) 307 } 308 309 wkey := []byte("foo") 310 wrev := int64(10) 311 req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 312 CreateRequest: &pb.WatchCreateRequest{Key: wkey, StartRevision: wrev}}} 313 err = wStream.Send(req) 314 if err != nil { 315 t.Fatalf("wStream.Send error: %v", err) 316 } 317 318 // ensure watcher request created a new watcher 319 cresp, err := wStream.Recv() 320 if err != nil { 321 t.Fatalf("wStream.Recv error: %v", err) 322 } 323 if !cresp.Created { 324 t.Fatalf("create %v, want %v", cresp.Created, true) 325 } 326 327 kvc := toGRPC(clus.RandClient()).KV 328 for { 329 req := &pb.PutRequest{Key: wkey, Value: []byte("bar")} 330 resp, rerr := kvc.Put(context.TODO(), req) 331 if rerr != nil { 332 t.Fatalf("couldn't put key (%v)", rerr) 333 } 334 if resp.Header.Revision == wrev { 335 break 336 } 337 } 338 339 // ensure watcher request created a new watcher 340 cresp, err = wStream.Recv() 341 if err != nil { 342 t.Fatalf("wStream.Recv error: %v", err) 343 } 344 if cresp.Header.Revision != wrev { 345 t.Fatalf("revision = %d, want %d", cresp.Header.Revision, wrev) 346 } 347 if len(cresp.Events) != 1 { 348 t.Fatalf("failed to receive events") 349 } 350 if cresp.Events[0].Kv.ModRevision != wrev { 351 t.Errorf("mod revision = %d, want %d", cresp.Events[0].Kv.ModRevision, wrev) 352 } 353} 354 355// TestV3WatchWrongRange tests wrong range does not create watchers. 356func TestV3WatchWrongRange(t *testing.T) { 357 defer testutil.AfterTest(t) 358 359 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 360 defer clus.Terminate(t) 361 362 wAPI := toGRPC(clus.RandClient()).Watch 363 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 364 defer cancel() 365 wStream, err := wAPI.Watch(ctx) 366 if err != nil { 367 t.Fatalf("wAPI.Watch error: %v", err) 368 } 369 370 tests := []struct { 371 key []byte 372 end []byte 373 canceled bool 374 }{ 375 {[]byte("a"), []byte("a"), true}, // wrong range end 376 {[]byte("b"), []byte("a"), true}, // wrong range end 377 {[]byte("foo"), []byte{0}, false}, // watch request with 'WithFromKey' 378 } 379 for i, tt := range tests { 380 if err := wStream.Send(&pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 381 CreateRequest: &pb.WatchCreateRequest{Key: tt.key, RangeEnd: tt.end, StartRevision: 1}}}); err != nil { 382 t.Fatalf("#%d: wStream.Send error: %v", i, err) 383 } 384 cresp, err := wStream.Recv() 385 if err != nil { 386 t.Fatalf("#%d: wStream.Recv error: %v", i, err) 387 } 388 if !cresp.Created { 389 t.Fatalf("#%d: create %v, want %v", i, cresp.Created, true) 390 } 391 if cresp.Canceled != tt.canceled { 392 t.Fatalf("#%d: canceled %v, want %v", i, tt.canceled, cresp.Canceled) 393 } 394 if tt.canceled && cresp.WatchId != -1 { 395 t.Fatalf("#%d: canceled watch ID %d, want -1", i, cresp.WatchId) 396 } 397 } 398} 399 400// TestV3WatchCancelSynced tests Watch APIs cancellation from synced map. 401func TestV3WatchCancelSynced(t *testing.T) { 402 defer testutil.AfterTest(t) 403 testV3WatchCancel(t, 0) 404} 405 406// TestV3WatchCancelUnsynced tests Watch APIs cancellation from unsynced map. 407func TestV3WatchCancelUnsynced(t *testing.T) { 408 defer testutil.AfterTest(t) 409 testV3WatchCancel(t, 1) 410} 411 412func testV3WatchCancel(t *testing.T, startRev int64) { 413 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 414 defer clus.Terminate(t) 415 416 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 417 defer cancel() 418 wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) 419 if errW != nil { 420 t.Fatalf("wAPI.Watch error: %v", errW) 421 } 422 423 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 424 CreateRequest: &pb.WatchCreateRequest{ 425 Key: []byte("foo"), StartRevision: startRev}}} 426 if err := wStream.Send(wreq); err != nil { 427 t.Fatalf("wStream.Send error: %v", err) 428 } 429 430 wresp, errR := wStream.Recv() 431 if errR != nil { 432 t.Errorf("wStream.Recv error: %v", errR) 433 } 434 if !wresp.Created { 435 t.Errorf("wresp.Created got = %v, want = true", wresp.Created) 436 } 437 438 creq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CancelRequest{ 439 CancelRequest: &pb.WatchCancelRequest{ 440 WatchId: wresp.WatchId}}} 441 if err := wStream.Send(creq); err != nil { 442 t.Fatalf("wStream.Send error: %v", err) 443 } 444 445 cresp, err := wStream.Recv() 446 if err != nil { 447 t.Errorf("wStream.Recv error: %v", err) 448 } 449 if !cresp.Canceled { 450 t.Errorf("cresp.Canceled got = %v, want = true", cresp.Canceled) 451 } 452 453 kvc := toGRPC(clus.RandClient()).KV 454 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { 455 t.Errorf("couldn't put key (%v)", err) 456 } 457 458 // watch got canceled, so this should block 459 rok, nr := waitResponse(wStream, 1*time.Second) 460 if !rok { 461 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 462 } 463} 464 465// TestV3WatchCurrentPutOverlap ensures current watchers receive all events with 466// overlapping puts. 467func TestV3WatchCurrentPutOverlap(t *testing.T) { 468 defer testutil.AfterTest(t) 469 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 470 defer clus.Terminate(t) 471 472 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 473 defer cancel() 474 wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 475 if wErr != nil { 476 t.Fatalf("wAPI.Watch error: %v", wErr) 477 } 478 479 // last mod_revision that will be observed 480 nrRevisions := 32 481 // first revision already allocated as empty revision 482 for i := 1; i < nrRevisions; i++ { 483 go func() { 484 kvc := toGRPC(clus.RandClient()).KV 485 req := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} 486 if _, err := kvc.Put(context.TODO(), req); err != nil { 487 t.Fatalf("couldn't put key (%v)", err) 488 } 489 }() 490 } 491 492 // maps watcher to current expected revision 493 progress := make(map[int64]int64) 494 495 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 496 CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), RangeEnd: []byte("fop")}}} 497 if err := wStream.Send(wreq); err != nil { 498 t.Fatalf("first watch request failed (%v)", err) 499 } 500 501 more := true 502 progress[-1] = 0 // watcher creation pending 503 for more { 504 resp, err := wStream.Recv() 505 if err != nil { 506 t.Fatalf("wStream.Recv error: %v", err) 507 } 508 509 if resp.Created { 510 // accept events > header revision 511 progress[resp.WatchId] = resp.Header.Revision + 1 512 if resp.Header.Revision == int64(nrRevisions) { 513 // covered all revisions; create no more watchers 514 progress[-1] = int64(nrRevisions) + 1 515 } else if err := wStream.Send(wreq); err != nil { 516 t.Fatalf("watch request failed (%v)", err) 517 } 518 } else if len(resp.Events) == 0 { 519 t.Fatalf("got events %v, want non-empty", resp.Events) 520 } else { 521 wRev, ok := progress[resp.WatchId] 522 if !ok { 523 t.Fatalf("got %+v, but watch id shouldn't exist ", resp) 524 } 525 if resp.Events[0].Kv.ModRevision != wRev { 526 t.Fatalf("got %+v, wanted first revision %d", resp, wRev) 527 } 528 lastRev := resp.Events[len(resp.Events)-1].Kv.ModRevision 529 progress[resp.WatchId] = lastRev + 1 530 } 531 more = false 532 for _, v := range progress { 533 if v <= int64(nrRevisions) { 534 more = true 535 break 536 } 537 } 538 } 539 540 if rok, nr := waitResponse(wStream, time.Second); !rok { 541 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 542 } 543} 544 545// TestV3WatchEmptyKey ensures synced watchers see empty key PUTs as PUT events 546func TestV3WatchEmptyKey(t *testing.T) { 547 defer testutil.AfterTest(t) 548 549 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 550 defer clus.Terminate(t) 551 552 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 553 defer cancel() 554 555 ws, werr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 556 if werr != nil { 557 t.Fatal(werr) 558 } 559 req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 560 CreateRequest: &pb.WatchCreateRequest{ 561 Key: []byte("foo")}}} 562 if err := ws.Send(req); err != nil { 563 t.Fatal(err) 564 } 565 if _, err := ws.Recv(); err != nil { 566 t.Fatal(err) 567 } 568 569 // put a key with empty value 570 kvc := toGRPC(clus.RandClient()).KV 571 preq := &pb.PutRequest{Key: []byte("foo")} 572 if _, err := kvc.Put(context.TODO(), preq); err != nil { 573 t.Fatal(err) 574 } 575 576 // check received PUT 577 resp, rerr := ws.Recv() 578 if rerr != nil { 579 t.Fatal(rerr) 580 } 581 wevs := []*mvccpb.Event{ 582 { 583 Type: mvccpb.PUT, 584 Kv: &mvccpb.KeyValue{Key: []byte("foo"), CreateRevision: 2, ModRevision: 2, Version: 1}, 585 }, 586 } 587 if !reflect.DeepEqual(resp.Events, wevs) { 588 t.Fatalf("got %v, expected %v", resp.Events, wevs) 589 } 590} 591 592func TestV3WatchMultipleWatchersSynced(t *testing.T) { 593 defer testutil.AfterTest(t) 594 testV3WatchMultipleWatchers(t, 0) 595} 596 597func TestV3WatchMultipleWatchersUnsynced(t *testing.T) { 598 defer testutil.AfterTest(t) 599 testV3WatchMultipleWatchers(t, 1) 600} 601 602// testV3WatchMultipleWatchers tests multiple watchers on the same key 603// and one watcher with matching prefix. It first puts the key 604// that matches all watchers, and another key that matches only 605// one watcher to test if it receives expected events. 606func testV3WatchMultipleWatchers(t *testing.T, startRev int64) { 607 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 608 defer clus.Terminate(t) 609 610 kvc := toGRPC(clus.RandClient()).KV 611 612 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 613 defer cancel() 614 wStream, errW := toGRPC(clus.RandClient()).Watch.Watch(ctx) 615 if errW != nil { 616 t.Fatalf("wAPI.Watch error: %v", errW) 617 } 618 619 watchKeyN := 4 620 for i := 0; i < watchKeyN+1; i++ { 621 var wreq *pb.WatchRequest 622 if i < watchKeyN { 623 wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 624 CreateRequest: &pb.WatchCreateRequest{ 625 Key: []byte("foo"), StartRevision: startRev}}} 626 } else { 627 wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 628 CreateRequest: &pb.WatchCreateRequest{ 629 Key: []byte("fo"), RangeEnd: []byte("fp"), StartRevision: startRev}}} 630 } 631 if err := wStream.Send(wreq); err != nil { 632 t.Fatalf("wStream.Send error: %v", err) 633 } 634 } 635 636 ids := make(map[int64]struct{}) 637 for i := 0; i < watchKeyN+1; i++ { 638 wresp, err := wStream.Recv() 639 if err != nil { 640 t.Fatalf("wStream.Recv error: %v", err) 641 } 642 if !wresp.Created { 643 t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) 644 } 645 ids[wresp.WatchId] = struct{}{} 646 } 647 648 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { 649 t.Fatalf("couldn't put key (%v)", err) 650 } 651 652 for i := 0; i < watchKeyN+1; i++ { 653 wresp, err := wStream.Recv() 654 if err != nil { 655 t.Fatalf("wStream.Recv error: %v", err) 656 } 657 if _, ok := ids[wresp.WatchId]; !ok { 658 t.Errorf("watchId %d is not created!", wresp.WatchId) 659 } else { 660 delete(ids, wresp.WatchId) 661 } 662 if len(wresp.Events) == 0 { 663 t.Errorf("#%d: no events received", i) 664 } 665 for _, ev := range wresp.Events { 666 if string(ev.Kv.Key) != "foo" { 667 t.Errorf("ev.Kv.Key got = %s, want = foo", ev.Kv.Key) 668 } 669 if string(ev.Kv.Value) != "bar" { 670 t.Errorf("ev.Kv.Value got = %s, want = bar", ev.Kv.Value) 671 } 672 } 673 } 674 675 // now put one key that has only one matching watcher 676 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("fo"), Value: []byte("bar")}); err != nil { 677 t.Fatalf("couldn't put key (%v)", err) 678 } 679 wresp, err := wStream.Recv() 680 if err != nil { 681 t.Errorf("wStream.Recv error: %v", err) 682 } 683 if len(wresp.Events) != 1 { 684 t.Fatalf("len(wresp.Events) got = %d, want = 1", len(wresp.Events)) 685 } 686 if string(wresp.Events[0].Kv.Key) != "fo" { 687 t.Errorf("wresp.Events[0].Kv.Key got = %s, want = fo", wresp.Events[0].Kv.Key) 688 } 689 690 // now Recv should block because there is no more events coming 691 rok, nr := waitResponse(wStream, 1*time.Second) 692 if !rok { 693 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 694 } 695} 696 697func TestV3WatchMultipleEventsTxnSynced(t *testing.T) { 698 defer testutil.AfterTest(t) 699 testV3WatchMultipleEventsTxn(t, 0) 700} 701 702func TestV3WatchMultipleEventsTxnUnsynced(t *testing.T) { 703 defer testutil.AfterTest(t) 704 testV3WatchMultipleEventsTxn(t, 1) 705} 706 707// testV3WatchMultipleEventsTxn tests Watch APIs when it receives multiple events. 708func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { 709 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 710 defer clus.Terminate(t) 711 712 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 713 defer cancel() 714 wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 715 if wErr != nil { 716 t.Fatalf("wAPI.Watch error: %v", wErr) 717 } 718 719 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 720 CreateRequest: &pb.WatchCreateRequest{ 721 Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: startRev}}} 722 if err := wStream.Send(wreq); err != nil { 723 t.Fatalf("wStream.Send error: %v", err) 724 } 725 if resp, err := wStream.Recv(); err != nil || !resp.Created { 726 t.Fatalf("create response failed: resp=%v, err=%v", resp, err) 727 } 728 729 kvc := toGRPC(clus.RandClient()).KV 730 txn := pb.TxnRequest{} 731 for i := 0; i < 3; i++ { 732 ru := &pb.RequestOp{} 733 ru.Request = &pb.RequestOp_RequestPut{ 734 RequestPut: &pb.PutRequest{ 735 Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}} 736 txn.Success = append(txn.Success, ru) 737 } 738 739 tresp, err := kvc.Txn(context.Background(), &txn) 740 if err != nil { 741 t.Fatalf("kvc.Txn error: %v", err) 742 } 743 if !tresp.Succeeded { 744 t.Fatalf("kvc.Txn failed: %+v", tresp) 745 } 746 747 events := []*mvccpb.Event{} 748 for len(events) < 3 { 749 resp, err := wStream.Recv() 750 if err != nil { 751 t.Errorf("wStream.Recv error: %v", err) 752 } 753 events = append(events, resp.Events...) 754 } 755 sort.Sort(eventsSortByKey(events)) 756 757 wevents := []*mvccpb.Event{ 758 { 759 Type: mvccpb.PUT, 760 Kv: &mvccpb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 761 }, 762 { 763 Type: mvccpb.PUT, 764 Kv: &mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 765 }, 766 { 767 Type: mvccpb.PUT, 768 Kv: &mvccpb.KeyValue{Key: []byte("foo2"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 769 }, 770 } 771 772 if !reflect.DeepEqual(events, wevents) { 773 t.Errorf("events got = %+v, want = %+v", events, wevents) 774 } 775 776 rok, nr := waitResponse(wStream, 1*time.Second) 777 if !rok { 778 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 779 } 780} 781 782type eventsSortByKey []*mvccpb.Event 783 784func (evs eventsSortByKey) Len() int { return len(evs) } 785func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] } 786func (evs eventsSortByKey) Less(i, j int) bool { 787 return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 788} 789 790func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { 791 defer testutil.AfterTest(t) 792 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 793 defer clus.Terminate(t) 794 795 kvc := toGRPC(clus.RandClient()).KV 796 797 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil { 798 t.Fatalf("couldn't put key (%v)", err) 799 } 800 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil { 801 t.Fatalf("couldn't put key (%v)", err) 802 } 803 804 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 805 defer cancel() 806 wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 807 if wErr != nil { 808 t.Fatalf("wAPI.Watch error: %v", wErr) 809 } 810 811 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 812 CreateRequest: &pb.WatchCreateRequest{ 813 Key: []byte("foo"), RangeEnd: []byte("fop"), StartRevision: 1}}} 814 if err := wStream.Send(wreq); err != nil { 815 t.Fatalf("wStream.Send error: %v", err) 816 } 817 818 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo0"), Value: []byte("bar")}); err != nil { 819 t.Fatalf("couldn't put key (%v)", err) 820 } 821 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo1"), Value: []byte("bar")}); err != nil { 822 t.Fatalf("couldn't put key (%v)", err) 823 } 824 825 allWevents := []*mvccpb.Event{ 826 { 827 Type: mvccpb.PUT, 828 Kv: &mvccpb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 829 }, 830 { 831 Type: mvccpb.PUT, 832 Kv: &mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1}, 833 }, 834 { 835 Type: mvccpb.PUT, 836 Kv: &mvccpb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 2}, 837 }, 838 { 839 Type: mvccpb.PUT, 840 Kv: &mvccpb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 5, Version: 2}, 841 }, 842 } 843 844 events := []*mvccpb.Event{} 845 for len(events) < 4 { 846 resp, err := wStream.Recv() 847 if err != nil { 848 t.Errorf("wStream.Recv error: %v", err) 849 } 850 if resp.Created { 851 continue 852 } 853 events = append(events, resp.Events...) 854 // if PUT requests are committed by now, first receive would return 855 // multiple events, but if not, it returns a single event. In SSD, 856 // it should return 4 events at once. 857 } 858 859 if !reflect.DeepEqual(events, allWevents) { 860 t.Errorf("events got = %+v, want = %+v", events, allWevents) 861 } 862 863 rok, nr := waitResponse(wStream, 1*time.Second) 864 if !rok { 865 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 866 } 867} 868 869func TestV3WatchMultipleStreamsSynced(t *testing.T) { 870 defer testutil.AfterTest(t) 871 testV3WatchMultipleStreams(t, 0) 872} 873 874func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { 875 defer testutil.AfterTest(t) 876 testV3WatchMultipleStreams(t, 1) 877} 878 879// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. 880func testV3WatchMultipleStreams(t *testing.T, startRev int64) { 881 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 882 defer clus.Terminate(t) 883 884 wAPI := toGRPC(clus.RandClient()).Watch 885 kvc := toGRPC(clus.RandClient()).KV 886 887 streams := make([]pb.Watch_WatchClient, 5) 888 for i := range streams { 889 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 890 defer cancel() 891 wStream, errW := wAPI.Watch(ctx) 892 if errW != nil { 893 t.Fatalf("wAPI.Watch error: %v", errW) 894 } 895 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 896 CreateRequest: &pb.WatchCreateRequest{ 897 Key: []byte("foo"), StartRevision: startRev}}} 898 if err := wStream.Send(wreq); err != nil { 899 t.Fatalf("wStream.Send error: %v", err) 900 } 901 streams[i] = wStream 902 } 903 904 for _, wStream := range streams { 905 wresp, err := wStream.Recv() 906 if err != nil { 907 t.Fatalf("wStream.Recv error: %v", err) 908 } 909 if !wresp.Created { 910 t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) 911 } 912 } 913 914 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { 915 t.Fatalf("couldn't put key (%v)", err) 916 } 917 918 var wg sync.WaitGroup 919 wg.Add(len(streams)) 920 wevents := []*mvccpb.Event{ 921 { 922 Type: mvccpb.PUT, 923 Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, 924 }, 925 } 926 for i := range streams { 927 go func(i int) { 928 defer wg.Done() 929 wStream := streams[i] 930 wresp, err := wStream.Recv() 931 if err != nil { 932 t.Fatalf("wStream.Recv error: %v", err) 933 } 934 if wresp.WatchId != 0 { 935 t.Errorf("watchId got = %d, want = 0", wresp.WatchId) 936 } 937 if !reflect.DeepEqual(wresp.Events, wevents) { 938 t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents) 939 } 940 // now Recv should block because there is no more events coming 941 rok, nr := waitResponse(wStream, 1*time.Second) 942 if !rok { 943 t.Errorf("unexpected pb.WatchResponse is received %+v", nr) 944 } 945 }(i) 946 } 947 wg.Wait() 948} 949 950// waitResponse waits on the given stream for given duration. 951// If there is no more events, true and a nil response will be 952// returned closing the WatchClient stream. Or the response will 953// be returned. 954func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) { 955 rCh := make(chan *pb.WatchResponse, 1) 956 donec := make(chan struct{}) 957 defer close(donec) 958 go func() { 959 resp, _ := wc.Recv() 960 select { 961 case rCh <- resp: 962 case <-donec: 963 } 964 }() 965 select { 966 case nr := <-rCh: 967 return false, nr 968 case <-time.After(timeout): 969 } 970 // didn't get response 971 wc.CloseSend() 972 return true, nil 973} 974 975func TestWatchWithProgressNotify(t *testing.T) { 976 // accelerate report interval so test terminates quickly 977 oldpi := v3rpc.GetProgressReportInterval() 978 // using atomics to avoid race warnings 979 v3rpc.SetProgressReportInterval(3 * time.Second) 980 testInterval := 3 * time.Second 981 defer func() { v3rpc.SetProgressReportInterval(oldpi) }() 982 983 defer testutil.AfterTest(t) 984 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 985 defer clus.Terminate(t) 986 987 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 988 defer cancel() 989 wStream, wErr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 990 if wErr != nil { 991 t.Fatalf("wAPI.Watch error: %v", wErr) 992 } 993 994 // create two watchers, one with progressNotify set. 995 wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 996 CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1, ProgressNotify: true}}} 997 if err := wStream.Send(wreq); err != nil { 998 t.Fatalf("watch request failed (%v)", err) 999 } 1000 wreq = &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 1001 CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}}} 1002 if err := wStream.Send(wreq); err != nil { 1003 t.Fatalf("watch request failed (%v)", err) 1004 } 1005 1006 // two creation + one notification 1007 for i := 0; i < 3; i++ { 1008 rok, resp := waitResponse(wStream, testInterval+time.Second) 1009 if resp.Created { 1010 continue 1011 } 1012 1013 if rok { 1014 t.Errorf("failed to receive response from watch stream") 1015 } 1016 if resp.Header.Revision != 1 { 1017 t.Errorf("revision = %d, want 1", resp.Header.Revision) 1018 } 1019 if len(resp.Events) != 0 { 1020 t.Errorf("len(resp.Events) = %d, want 0", len(resp.Events)) 1021 } 1022 } 1023 1024 // no more notification 1025 rok, resp := waitResponse(wStream, time.Second) 1026 if !rok { 1027 t.Errorf("unexpected pb.WatchResponse is received %+v", resp) 1028 } 1029} 1030 1031// TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams. 1032func TestV3WatchClose(t *testing.T) { 1033 defer testutil.AfterTest(t) 1034 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 1035 defer clus.Terminate(t) 1036 1037 c := clus.Client(0) 1038 wapi := toGRPC(c).Watch 1039 1040 var wg sync.WaitGroup 1041 wg.Add(100) 1042 for i := 0; i < 100; i++ { 1043 go func() { 1044 ctx, cancel := context.WithCancel(context.TODO()) 1045 defer func() { 1046 wg.Done() 1047 cancel() 1048 }() 1049 ws, err := wapi.Watch(ctx) 1050 if err != nil { 1051 return 1052 } 1053 cr := &pb.WatchCreateRequest{Key: []byte("a")} 1054 req := &pb.WatchRequest{ 1055 RequestUnion: &pb.WatchRequest_CreateRequest{ 1056 CreateRequest: cr}} 1057 ws.Send(req) 1058 ws.Recv() 1059 }() 1060 } 1061 1062 clus.Members[0].DropConnections() 1063 wg.Wait() 1064} 1065 1066// TestV3WatchWithFilter ensures watcher filters out the events correctly. 1067func TestV3WatchWithFilter(t *testing.T) { 1068 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 1069 defer clus.Terminate(t) 1070 1071 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) 1072 defer cancel() 1073 1074 ws, werr := toGRPC(clus.RandClient()).Watch.Watch(ctx) 1075 if werr != nil { 1076 t.Fatal(werr) 1077 } 1078 req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 1079 CreateRequest: &pb.WatchCreateRequest{ 1080 Key: []byte("foo"), 1081 Filters: []pb.WatchCreateRequest_FilterType{pb.WatchCreateRequest_NOPUT}, 1082 }}} 1083 if err := ws.Send(req); err != nil { 1084 t.Fatal(err) 1085 } 1086 if _, err := ws.Recv(); err != nil { 1087 t.Fatal(err) 1088 } 1089 1090 recv := make(chan *pb.WatchResponse) 1091 go func() { 1092 // check received PUT 1093 resp, rerr := ws.Recv() 1094 if rerr != nil { 1095 t.Fatal(rerr) 1096 } 1097 recv <- resp 1098 }() 1099 1100 // put a key with empty value 1101 kvc := toGRPC(clus.RandClient()).KV 1102 preq := &pb.PutRequest{Key: []byte("foo")} 1103 if _, err := kvc.Put(context.TODO(), preq); err != nil { 1104 t.Fatal(err) 1105 } 1106 1107 select { 1108 case <-recv: 1109 t.Fatal("failed to filter out put event") 1110 case <-time.After(100 * time.Millisecond): 1111 } 1112 1113 dreq := &pb.DeleteRangeRequest{Key: []byte("foo")} 1114 if _, err := kvc.DeleteRange(context.TODO(), dreq); err != nil { 1115 t.Fatal(err) 1116 } 1117 1118 select { 1119 case resp := <-recv: 1120 wevs := []*mvccpb.Event{ 1121 { 1122 Type: mvccpb.DELETE, 1123 Kv: &mvccpb.KeyValue{Key: []byte("foo"), ModRevision: 3}, 1124 }, 1125 } 1126 if !reflect.DeepEqual(resp.Events, wevs) { 1127 t.Fatalf("got %v, expected %v", resp.Events, wevs) 1128 } 1129 case <-time.After(100 * time.Millisecond): 1130 t.Fatal("failed to receive delete event") 1131 } 1132} 1133 1134func TestV3WatchWithPrevKV(t *testing.T) { 1135 defer testutil.AfterTest(t) 1136 clus := NewClusterV3(t, &ClusterConfig{Size: 1}) 1137 defer clus.Terminate(t) 1138 1139 wctx, wcancel := context.WithCancel(context.Background()) 1140 defer wcancel() 1141 1142 tests := []struct { 1143 key string 1144 end string 1145 vals []string 1146 }{{ 1147 key: "foo", 1148 end: "fop", 1149 vals: []string{"bar1", "bar2"}, 1150 }, { 1151 key: "/abc", 1152 end: "/abd", 1153 vals: []string{"first", "second"}, 1154 }} 1155 for i, tt := range tests { 1156 kvc := toGRPC(clus.RandClient()).KV 1157 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(tt.key), Value: []byte(tt.vals[0])}); err != nil { 1158 t.Fatal(err) 1159 } 1160 1161 ws, werr := toGRPC(clus.RandClient()).Watch.Watch(wctx) 1162 if werr != nil { 1163 t.Fatal(werr) 1164 } 1165 1166 req := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ 1167 CreateRequest: &pb.WatchCreateRequest{ 1168 Key: []byte(tt.key), 1169 RangeEnd: []byte(tt.end), 1170 PrevKv: true, 1171 }}} 1172 if err := ws.Send(req); err != nil { 1173 t.Fatal(err) 1174 } 1175 if _, err := ws.Recv(); err != nil { 1176 t.Fatal(err) 1177 } 1178 1179 if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(tt.key), Value: []byte(tt.vals[1])}); err != nil { 1180 t.Fatal(err) 1181 } 1182 1183 recv := make(chan *pb.WatchResponse) 1184 go func() { 1185 // check received PUT 1186 resp, rerr := ws.Recv() 1187 if rerr != nil { 1188 t.Fatal(rerr) 1189 } 1190 recv <- resp 1191 }() 1192 1193 select { 1194 case resp := <-recv: 1195 if tt.vals[1] != string(resp.Events[0].Kv.Value) { 1196 t.Errorf("#%d: unequal value: want=%s, get=%s", i, tt.vals[1], resp.Events[0].Kv.Value) 1197 } 1198 if tt.vals[0] != string(resp.Events[0].PrevKv.Value) { 1199 t.Errorf("#%d: unequal value: want=%s, get=%s", i, tt.vals[0], resp.Events[0].PrevKv.Value) 1200 } 1201 case <-time.After(30 * time.Second): 1202 t.Error("timeout waiting for watch response") 1203 } 1204 } 1205} 1206