1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "sync" 22 "time" 23 24 v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" 25 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 26 mvccpb "github.com/coreos/etcd/mvcc/mvccpb" 27 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/metadata" 31 "google.golang.org/grpc/status" 32) 33 34const ( 35 EventTypeDelete = mvccpb.DELETE 36 EventTypePut = mvccpb.PUT 37 38 closeSendErrTimeout = 250 * time.Millisecond 39) 40 41type Event mvccpb.Event 42 43type WatchChan <-chan WatchResponse 44 45type Watcher interface { 46 // Watch watches on a key or prefix. The watched events will be returned 47 // through the returned channel. If revisions waiting to be sent over the 48 // watch are compacted, then the watch will be canceled by the server, the 49 // client will post a compacted error watch response, and the channel will close. 50 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, 51 // and "WatchResponse" from this closed channel has zero events and nil "Err()". 52 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, 53 // to release the associated resources. 54 // 55 // If the context is "context.Background/TODO", returned "WatchChan" will 56 // not be closed and block until event is triggered, except when server 57 // returns a non-recoverable error (e.g. ErrCompacted). 58 // For example, when context passed with "WithRequireLeader" and the 59 // connected server has no leader (e.g. due to network partition), 60 // error "etcdserver: no leader" (ErrNoLeader) will be returned, 61 // and then "WatchChan" is closed with non-nil "Err()". 62 // In order to prevent a watch stream being stuck in a partitioned node, 63 // make sure to wrap context with "WithRequireLeader". 64 // 65 // Otherwise, as long as the context has not been canceled or timed out, 66 // watch will retry on other recoverable errors forever until reconnected. 67 // 68 // TODO: explicitly set context error in the last "WatchResponse" message and close channel? 69 // Currently, client contexts are overwritten with "valCtx" that never closes. 70 // TODO(v3.4): configure watch retry policy, limit maximum retry number 71 // (see https://github.com/etcd-io/etcd/issues/8980) 72 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan 73 74 // RequestProgress requests a progress notify response be sent in all watch channels. 75 RequestProgress(ctx context.Context) error 76 77 // Close closes the watcher and cancels all watch requests. 78 Close() error 79} 80 81type WatchResponse struct { 82 Header pb.ResponseHeader 83 Events []*Event 84 85 // CompactRevision is the minimum revision the watcher may receive. 86 CompactRevision int64 87 88 // Canceled is used to indicate watch failure. 89 // If the watch failed and the stream was about to close, before the channel is closed, 90 // the channel sends a final response that has Canceled set to true with a non-nil Err(). 91 Canceled bool 92 93 // Created is used to indicate the creation of the watcher. 94 Created bool 95 96 closeErr error 97 98 // cancelReason is a reason of canceling watch 99 cancelReason string 100} 101 102// IsCreate returns true if the event tells that the key is newly created. 103func (e *Event) IsCreate() bool { 104 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision 105} 106 107// IsModify returns true if the event tells that a new value is put on existing key. 108func (e *Event) IsModify() bool { 109 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision 110} 111 112// Err is the error value if this WatchResponse holds an error. 113func (wr *WatchResponse) Err() error { 114 switch { 115 case wr.closeErr != nil: 116 return v3rpc.Error(wr.closeErr) 117 case wr.CompactRevision != 0: 118 return v3rpc.ErrCompacted 119 case wr.Canceled: 120 if len(wr.cancelReason) != 0 { 121 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) 122 } 123 return v3rpc.ErrFutureRev 124 } 125 return nil 126} 127 128// IsProgressNotify returns true if the WatchResponse is progress notification. 129func (wr *WatchResponse) IsProgressNotify() bool { 130 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 131} 132 133// watcher implements the Watcher interface 134type watcher struct { 135 remote pb.WatchClient 136 callOpts []grpc.CallOption 137 138 // mu protects the grpc streams map 139 mu sync.RWMutex 140 141 // streams holds all the active grpc streams keyed by ctx value. 142 streams map[string]*watchGrpcStream 143} 144 145// watchGrpcStream tracks all watch resources attached to a single grpc stream. 146type watchGrpcStream struct { 147 owner *watcher 148 remote pb.WatchClient 149 callOpts []grpc.CallOption 150 151 // ctx controls internal remote.Watch requests 152 ctx context.Context 153 // ctxKey is the key used when looking up this stream's context 154 ctxKey string 155 cancel context.CancelFunc 156 157 // substreams holds all active watchers on this grpc stream 158 substreams map[int64]*watcherStream 159 // resuming holds all resuming watchers on this grpc stream 160 resuming []*watcherStream 161 162 // reqc sends a watch request from Watch() to the main goroutine 163 reqc chan watchStreamRequest 164 // respc receives data from the watch client 165 respc chan *pb.WatchResponse 166 // donec closes to broadcast shutdown 167 donec chan struct{} 168 // errc transmits errors from grpc Recv to the watch stream reconnect logic 169 errc chan error 170 // closingc gets the watcherStream of closing watchers 171 closingc chan *watcherStream 172 // wg is Done when all substream goroutines have exited 173 wg sync.WaitGroup 174 175 // resumec closes to signal that all substreams should begin resuming 176 resumec chan struct{} 177 // closeErr is the error that closed the watch stream 178 closeErr error 179} 180 181// watchStreamRequest is a union of the supported watch request operation types 182type watchStreamRequest interface { 183 toPB() *pb.WatchRequest 184} 185 186// watchRequest is issued by the subscriber to start a new watcher 187type watchRequest struct { 188 ctx context.Context 189 key string 190 end string 191 rev int64 192 193 // send created notification event if this field is true 194 createdNotify bool 195 // progressNotify is for progress updates 196 progressNotify bool 197 // fragmentation should be disabled by default 198 // if true, split watch events when total exceeds 199 // "--max-request-bytes" flag value + 512-byte 200 fragment bool 201 202 // filters is the list of events to filter out 203 filters []pb.WatchCreateRequest_FilterType 204 // get the previous key-value pair before the event happens 205 prevKV bool 206 // retc receives a chan WatchResponse once the watcher is established 207 retc chan chan WatchResponse 208} 209 210// progressRequest is issued by the subscriber to request watch progress 211type progressRequest struct { 212} 213 214// watcherStream represents a registered watcher 215type watcherStream struct { 216 // initReq is the request that initiated this request 217 initReq watchRequest 218 219 // outc publishes watch responses to subscriber 220 outc chan WatchResponse 221 // recvc buffers watch responses before publishing 222 recvc chan *WatchResponse 223 // donec closes when the watcherStream goroutine stops. 224 donec chan struct{} 225 // closing is set to true when stream should be scheduled to shutdown. 226 closing bool 227 // id is the registered watch id on the grpc stream 228 id int64 229 230 // buf holds all events received from etcd but not yet consumed by the client 231 buf []*WatchResponse 232} 233 234func NewWatcher(c *Client) Watcher { 235 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) 236} 237 238func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { 239 w := &watcher{ 240 remote: wc, 241 streams: make(map[string]*watchGrpcStream), 242 } 243 if c != nil { 244 w.callOpts = c.callOpts 245 } 246 return w 247} 248 249// never closes 250var valCtxCh = make(chan struct{}) 251var zeroTime = time.Unix(0, 0) 252 253// ctx with only the values; never Done 254type valCtx struct{ context.Context } 255 256func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } 257func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } 258func (vc *valCtx) Err() error { return nil } 259 260func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { 261 ctx, cancel := context.WithCancel(&valCtx{inctx}) 262 wgs := &watchGrpcStream{ 263 owner: w, 264 remote: w.remote, 265 callOpts: w.callOpts, 266 ctx: ctx, 267 ctxKey: streamKeyFromCtx(inctx), 268 cancel: cancel, 269 substreams: make(map[int64]*watcherStream), 270 respc: make(chan *pb.WatchResponse), 271 reqc: make(chan watchStreamRequest), 272 donec: make(chan struct{}), 273 errc: make(chan error, 1), 274 closingc: make(chan *watcherStream), 275 resumec: make(chan struct{}), 276 } 277 go wgs.run() 278 return wgs 279} 280 281// Watch posts a watch request to run() and waits for a new watcher channel 282func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { 283 ow := opWatch(key, opts...) 284 285 var filters []pb.WatchCreateRequest_FilterType 286 if ow.filterPut { 287 filters = append(filters, pb.WatchCreateRequest_NOPUT) 288 } 289 if ow.filterDelete { 290 filters = append(filters, pb.WatchCreateRequest_NODELETE) 291 } 292 293 wr := &watchRequest{ 294 ctx: ctx, 295 createdNotify: ow.createdNotify, 296 key: string(ow.key), 297 end: string(ow.end), 298 rev: ow.rev, 299 progressNotify: ow.progressNotify, 300 fragment: ow.fragment, 301 filters: filters, 302 prevKV: ow.prevKV, 303 retc: make(chan chan WatchResponse, 1), 304 } 305 306 ok := false 307 ctxKey := streamKeyFromCtx(ctx) 308 309 // find or allocate appropriate grpc watch stream 310 w.mu.Lock() 311 if w.streams == nil { 312 // closed 313 w.mu.Unlock() 314 ch := make(chan WatchResponse) 315 close(ch) 316 return ch 317 } 318 wgs := w.streams[ctxKey] 319 if wgs == nil { 320 wgs = w.newWatcherGrpcStream(ctx) 321 w.streams[ctxKey] = wgs 322 } 323 donec := wgs.donec 324 reqc := wgs.reqc 325 w.mu.Unlock() 326 327 // couldn't create channel; return closed channel 328 closeCh := make(chan WatchResponse, 1) 329 330 // submit request 331 select { 332 case reqc <- wr: 333 ok = true 334 case <-wr.ctx.Done(): 335 case <-donec: 336 if wgs.closeErr != nil { 337 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 338 break 339 } 340 // retry; may have dropped stream from no ctxs 341 return w.Watch(ctx, key, opts...) 342 } 343 344 // receive channel 345 if ok { 346 select { 347 case ret := <-wr.retc: 348 return ret 349 case <-ctx.Done(): 350 case <-donec: 351 if wgs.closeErr != nil { 352 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 353 break 354 } 355 // retry; may have dropped stream from no ctxs 356 return w.Watch(ctx, key, opts...) 357 } 358 } 359 360 close(closeCh) 361 return closeCh 362} 363 364func (w *watcher) Close() (err error) { 365 w.mu.Lock() 366 streams := w.streams 367 w.streams = nil 368 w.mu.Unlock() 369 for _, wgs := range streams { 370 if werr := wgs.close(); werr != nil { 371 err = werr 372 } 373 } 374 // Consider context.Canceled as a successful close 375 if err == context.Canceled { 376 err = nil 377 } 378 return err 379} 380 381// RequestProgress requests a progress notify response be sent in all watch channels. 382func (w *watcher) RequestProgress(ctx context.Context) (err error) { 383 ctxKey := streamKeyFromCtx(ctx) 384 385 w.mu.Lock() 386 if w.streams == nil { 387 w.mu.Unlock() 388 return fmt.Errorf("no stream found for context") 389 } 390 wgs := w.streams[ctxKey] 391 if wgs == nil { 392 wgs = w.newWatcherGrpcStream(ctx) 393 w.streams[ctxKey] = wgs 394 } 395 donec := wgs.donec 396 reqc := wgs.reqc 397 w.mu.Unlock() 398 399 pr := &progressRequest{} 400 401 select { 402 case reqc <- pr: 403 return nil 404 case <-ctx.Done(): 405 if err == nil { 406 return ctx.Err() 407 } 408 return err 409 case <-donec: 410 if wgs.closeErr != nil { 411 return wgs.closeErr 412 } 413 // retry; may have dropped stream from no ctxs 414 return w.RequestProgress(ctx) 415 } 416} 417 418func (w *watchGrpcStream) close() (err error) { 419 w.cancel() 420 <-w.donec 421 select { 422 case err = <-w.errc: 423 default: 424 } 425 return toErr(w.ctx, err) 426} 427 428func (w *watcher) closeStream(wgs *watchGrpcStream) { 429 w.mu.Lock() 430 close(wgs.donec) 431 wgs.cancel() 432 if w.streams != nil { 433 delete(w.streams, wgs.ctxKey) 434 } 435 w.mu.Unlock() 436} 437 438func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { 439 // check watch ID for backward compatibility (<= v3.3) 440 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { 441 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) 442 // failed; no channel 443 close(ws.recvc) 444 return 445 } 446 ws.id = resp.WatchId 447 w.substreams[ws.id] = ws 448} 449 450func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { 451 select { 452 case ws.outc <- *resp: 453 case <-ws.initReq.ctx.Done(): 454 case <-time.After(closeSendErrTimeout): 455 } 456 close(ws.outc) 457} 458 459func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { 460 // send channel response in case stream was never established 461 select { 462 case ws.initReq.retc <- ws.outc: 463 default: 464 } 465 // close subscriber's channel 466 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { 467 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) 468 } else if ws.outc != nil { 469 close(ws.outc) 470 } 471 if ws.id != -1 { 472 delete(w.substreams, ws.id) 473 return 474 } 475 for i := range w.resuming { 476 if w.resuming[i] == ws { 477 w.resuming[i] = nil 478 return 479 } 480 } 481} 482 483// run is the root of the goroutines for managing a watcher client 484func (w *watchGrpcStream) run() { 485 var wc pb.Watch_WatchClient 486 var closeErr error 487 488 // substreams marked to close but goroutine still running; needed for 489 // avoiding double-closing recvc on grpc stream teardown 490 closing := make(map[*watcherStream]struct{}) 491 492 defer func() { 493 w.closeErr = closeErr 494 // shutdown substreams and resuming substreams 495 for _, ws := range w.substreams { 496 if _, ok := closing[ws]; !ok { 497 close(ws.recvc) 498 closing[ws] = struct{}{} 499 } 500 } 501 for _, ws := range w.resuming { 502 if _, ok := closing[ws]; ws != nil && !ok { 503 close(ws.recvc) 504 closing[ws] = struct{}{} 505 } 506 } 507 w.joinSubstreams() 508 for range closing { 509 w.closeSubstream(<-w.closingc) 510 } 511 w.wg.Wait() 512 w.owner.closeStream(w) 513 }() 514 515 // start a stream with the etcd grpc server 516 if wc, closeErr = w.newWatchClient(); closeErr != nil { 517 return 518 } 519 520 cancelSet := make(map[int64]struct{}) 521 522 var cur *pb.WatchResponse 523 for { 524 select { 525 // Watch() requested 526 case req := <-w.reqc: 527 switch wreq := req.(type) { 528 case *watchRequest: 529 outc := make(chan WatchResponse, 1) 530 // TODO: pass custom watch ID? 531 ws := &watcherStream{ 532 initReq: *wreq, 533 id: -1, 534 outc: outc, 535 // unbuffered so resumes won't cause repeat events 536 recvc: make(chan *WatchResponse), 537 } 538 539 ws.donec = make(chan struct{}) 540 w.wg.Add(1) 541 go w.serveSubstream(ws, w.resumec) 542 543 // queue up for watcher creation/resume 544 w.resuming = append(w.resuming, ws) 545 if len(w.resuming) == 1 { 546 // head of resume queue, can register a new watcher 547 wc.Send(ws.initReq.toPB()) 548 } 549 case *progressRequest: 550 wc.Send(wreq.toPB()) 551 } 552 553 // new events from the watch client 554 case pbresp := <-w.respc: 555 if cur == nil || pbresp.Created || pbresp.Canceled { 556 cur = pbresp 557 } else if cur != nil && cur.WatchId == pbresp.WatchId { 558 // merge new events 559 cur.Events = append(cur.Events, pbresp.Events...) 560 // update "Fragment" field; last response with "Fragment" == false 561 cur.Fragment = pbresp.Fragment 562 } 563 564 switch { 565 case pbresp.Created: 566 // response to head of queue creation 567 if ws := w.resuming[0]; ws != nil { 568 w.addSubstream(pbresp, ws) 569 w.dispatchEvent(pbresp) 570 w.resuming[0] = nil 571 } 572 573 if ws := w.nextResume(); ws != nil { 574 wc.Send(ws.initReq.toPB()) 575 } 576 577 // reset for next iteration 578 cur = nil 579 580 case pbresp.Canceled && pbresp.CompactRevision == 0: 581 delete(cancelSet, pbresp.WatchId) 582 if ws, ok := w.substreams[pbresp.WatchId]; ok { 583 // signal to stream goroutine to update closingc 584 close(ws.recvc) 585 closing[ws] = struct{}{} 586 } 587 588 // reset for next iteration 589 cur = nil 590 591 case cur.Fragment: 592 // watch response events are still fragmented 593 // continue to fetch next fragmented event arrival 594 continue 595 596 default: 597 // dispatch to appropriate watch stream 598 ok := w.dispatchEvent(cur) 599 600 // reset for next iteration 601 cur = nil 602 603 if ok { 604 break 605 } 606 607 // watch response on unexpected watch id; cancel id 608 if _, ok := cancelSet[pbresp.WatchId]; ok { 609 break 610 } 611 612 cancelSet[pbresp.WatchId] = struct{}{} 613 cr := &pb.WatchRequest_CancelRequest{ 614 CancelRequest: &pb.WatchCancelRequest{ 615 WatchId: pbresp.WatchId, 616 }, 617 } 618 req := &pb.WatchRequest{RequestUnion: cr} 619 wc.Send(req) 620 } 621 622 // watch client failed on Recv; spawn another if possible 623 case err := <-w.errc: 624 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { 625 closeErr = err 626 return 627 } 628 if wc, closeErr = w.newWatchClient(); closeErr != nil { 629 return 630 } 631 if ws := w.nextResume(); ws != nil { 632 wc.Send(ws.initReq.toPB()) 633 } 634 cancelSet = make(map[int64]struct{}) 635 636 case <-w.ctx.Done(): 637 return 638 639 case ws := <-w.closingc: 640 w.closeSubstream(ws) 641 delete(closing, ws) 642 // no more watchers on this stream, shutdown 643 if len(w.substreams)+len(w.resuming) == 0 { 644 return 645 } 646 } 647 } 648} 649 650// nextResume chooses the next resuming to register with the grpc stream. Abandoned 651// streams are marked as nil in the queue since the head must wait for its inflight registration. 652func (w *watchGrpcStream) nextResume() *watcherStream { 653 for len(w.resuming) != 0 { 654 if w.resuming[0] != nil { 655 return w.resuming[0] 656 } 657 w.resuming = w.resuming[1:len(w.resuming)] 658 } 659 return nil 660} 661 662// dispatchEvent sends a WatchResponse to the appropriate watcher stream 663func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { 664 events := make([]*Event, len(pbresp.Events)) 665 for i, ev := range pbresp.Events { 666 events[i] = (*Event)(ev) 667 } 668 // TODO: return watch ID? 669 wr := &WatchResponse{ 670 Header: *pbresp.Header, 671 Events: events, 672 CompactRevision: pbresp.CompactRevision, 673 Created: pbresp.Created, 674 Canceled: pbresp.Canceled, 675 cancelReason: pbresp.CancelReason, 676 } 677 678 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to 679 // indicate they should be broadcast. 680 if wr.IsProgressNotify() && pbresp.WatchId == -1 { 681 return w.broadcastResponse(wr) 682 } 683 684 return w.unicastResponse(wr, pbresp.WatchId) 685 686} 687 688// broadcastResponse send a watch response to all watch substreams. 689func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { 690 for _, ws := range w.substreams { 691 select { 692 case ws.recvc <- wr: 693 case <-ws.donec: 694 } 695 } 696 return true 697} 698 699// unicastResponse sends a watch response to a specific watch substream. 700func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { 701 ws, ok := w.substreams[watchId] 702 if !ok { 703 return false 704 } 705 select { 706 case ws.recvc <- wr: 707 case <-ws.donec: 708 return false 709 } 710 return true 711} 712 713// serveWatchClient forwards messages from the grpc stream to run() 714func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { 715 for { 716 resp, err := wc.Recv() 717 if err != nil { 718 select { 719 case w.errc <- err: 720 case <-w.donec: 721 } 722 return 723 } 724 select { 725 case w.respc <- resp: 726 case <-w.donec: 727 return 728 } 729 } 730} 731 732// serveSubstream forwards watch responses from run() to the subscriber 733func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { 734 if ws.closing { 735 panic("created substream goroutine but substream is closing") 736 } 737 738 // nextRev is the minimum expected next revision 739 nextRev := ws.initReq.rev 740 resuming := false 741 defer func() { 742 if !resuming { 743 ws.closing = true 744 } 745 close(ws.donec) 746 if !resuming { 747 w.closingc <- ws 748 } 749 w.wg.Done() 750 }() 751 752 emptyWr := &WatchResponse{} 753 for { 754 curWr := emptyWr 755 outc := ws.outc 756 757 if len(ws.buf) > 0 { 758 curWr = ws.buf[0] 759 } else { 760 outc = nil 761 } 762 select { 763 case outc <- *curWr: 764 if ws.buf[0].Err() != nil { 765 return 766 } 767 ws.buf[0] = nil 768 ws.buf = ws.buf[1:] 769 case wr, ok := <-ws.recvc: 770 if !ok { 771 // shutdown from closeSubstream 772 return 773 } 774 775 if wr.Created { 776 if ws.initReq.retc != nil { 777 ws.initReq.retc <- ws.outc 778 // to prevent next write from taking the slot in buffered channel 779 // and posting duplicate create events 780 ws.initReq.retc = nil 781 782 // send first creation event only if requested 783 if ws.initReq.createdNotify { 784 ws.outc <- *wr 785 } 786 // once the watch channel is returned, a current revision 787 // watch must resume at the store revision. This is necessary 788 // for the following case to work as expected: 789 // wch := m1.Watch("a") 790 // m2.Put("a", "b") 791 // <-wch 792 // If the revision is only bound on the first observed event, 793 // if wch is disconnected before the Put is issued, then reconnects 794 // after it is committed, it'll miss the Put. 795 if ws.initReq.rev == 0 { 796 nextRev = wr.Header.Revision 797 } 798 } 799 } else { 800 // current progress of watch; <= store revision 801 nextRev = wr.Header.Revision 802 } 803 804 if len(wr.Events) > 0 { 805 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 806 } 807 ws.initReq.rev = nextRev 808 809 // created event is already sent above, 810 // watcher should not post duplicate events 811 if wr.Created { 812 continue 813 } 814 815 // TODO pause channel if buffer gets too large 816 ws.buf = append(ws.buf, wr) 817 case <-w.ctx.Done(): 818 return 819 case <-ws.initReq.ctx.Done(): 820 return 821 case <-resumec: 822 resuming = true 823 return 824 } 825 } 826 // lazily send cancel message if events on missing id 827} 828 829func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { 830 // mark all substreams as resuming 831 close(w.resumec) 832 w.resumec = make(chan struct{}) 833 w.joinSubstreams() 834 for _, ws := range w.substreams { 835 ws.id = -1 836 w.resuming = append(w.resuming, ws) 837 } 838 // strip out nils, if any 839 var resuming []*watcherStream 840 for _, ws := range w.resuming { 841 if ws != nil { 842 resuming = append(resuming, ws) 843 } 844 } 845 w.resuming = resuming 846 w.substreams = make(map[int64]*watcherStream) 847 848 // connect to grpc stream while accepting watcher cancelation 849 stopc := make(chan struct{}) 850 donec := w.waitCancelSubstreams(stopc) 851 wc, err := w.openWatchClient() 852 close(stopc) 853 <-donec 854 855 // serve all non-closing streams, even if there's a client error 856 // so that the teardown path can shutdown the streams as expected. 857 for _, ws := range w.resuming { 858 if ws.closing { 859 continue 860 } 861 ws.donec = make(chan struct{}) 862 w.wg.Add(1) 863 go w.serveSubstream(ws, w.resumec) 864 } 865 866 if err != nil { 867 return nil, v3rpc.Error(err) 868 } 869 870 // receive data from new grpc stream 871 go w.serveWatchClient(wc) 872 return wc, nil 873} 874 875func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { 876 var wg sync.WaitGroup 877 wg.Add(len(w.resuming)) 878 donec := make(chan struct{}) 879 for i := range w.resuming { 880 go func(ws *watcherStream) { 881 defer wg.Done() 882 if ws.closing { 883 if ws.initReq.ctx.Err() != nil && ws.outc != nil { 884 close(ws.outc) 885 ws.outc = nil 886 } 887 return 888 } 889 select { 890 case <-ws.initReq.ctx.Done(): 891 // closed ws will be removed from resuming 892 ws.closing = true 893 close(ws.outc) 894 ws.outc = nil 895 w.wg.Add(1) 896 go func() { 897 defer w.wg.Done() 898 w.closingc <- ws 899 }() 900 case <-stopc: 901 } 902 }(w.resuming[i]) 903 } 904 go func() { 905 defer close(donec) 906 wg.Wait() 907 }() 908 return donec 909} 910 911// joinSubstreams waits for all substream goroutines to complete. 912func (w *watchGrpcStream) joinSubstreams() { 913 for _, ws := range w.substreams { 914 <-ws.donec 915 } 916 for _, ws := range w.resuming { 917 if ws != nil { 918 <-ws.donec 919 } 920 } 921} 922 923var maxBackoff = 100 * time.Millisecond 924 925// openWatchClient retries opening a watch client until success or halt. 926// manually retry in case "ws==nil && err==nil" 927// TODO: remove FailFast=false 928func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { 929 backoff := time.Millisecond 930 for { 931 select { 932 case <-w.ctx.Done(): 933 if err == nil { 934 return nil, w.ctx.Err() 935 } 936 return nil, err 937 default: 938 } 939 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { 940 break 941 } 942 if isHaltErr(w.ctx, err) { 943 return nil, v3rpc.Error(err) 944 } 945 if isUnavailableErr(w.ctx, err) { 946 // retry, but backoff 947 if backoff < maxBackoff { 948 // 25% backoff factor 949 backoff = backoff + backoff/4 950 if backoff > maxBackoff { 951 backoff = maxBackoff 952 } 953 } 954 time.Sleep(backoff) 955 } 956 } 957 return ws, nil 958} 959 960// toPB converts an internal watch request structure to its protobuf WatchRequest structure. 961func (wr *watchRequest) toPB() *pb.WatchRequest { 962 req := &pb.WatchCreateRequest{ 963 StartRevision: wr.rev, 964 Key: []byte(wr.key), 965 RangeEnd: []byte(wr.end), 966 ProgressNotify: wr.progressNotify, 967 Filters: wr.filters, 968 PrevKv: wr.prevKV, 969 Fragment: wr.fragment, 970 } 971 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} 972 return &pb.WatchRequest{RequestUnion: cr} 973} 974 975// toPB converts an internal progress request structure to its protobuf WatchRequest structure. 976func (pr *progressRequest) toPB() *pb.WatchRequest { 977 req := &pb.WatchProgressRequest{} 978 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} 979 return &pb.WatchRequest{RequestUnion: cr} 980} 981 982func streamKeyFromCtx(ctx context.Context) string { 983 if md, ok := metadata.FromOutgoingContext(ctx); ok { 984 return fmt.Sprintf("%+v", md) 985 } 986 return "" 987} 988