1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "sync" 22 "time" 23 24 v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 25 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 26 "go.etcd.io/etcd/mvcc/mvccpb" 27 28 "go.uber.org/zap" 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/status" 33) 34 35const ( 36 EventTypeDelete = mvccpb.DELETE 37 EventTypePut = mvccpb.PUT 38 39 closeSendErrTimeout = 250 * time.Millisecond 40) 41 42type Event mvccpb.Event 43 44type WatchChan <-chan WatchResponse 45 46type Watcher interface { 47 // Watch watches on a key or prefix. The watched events will be returned 48 // through the returned channel. If revisions waiting to be sent over the 49 // watch are compacted, then the watch will be canceled by the server, the 50 // client will post a compacted error watch response, and the channel will close. 51 // If the requested revision is 0 or unspecified, the returned channel will 52 // return watch events that happen after the server receives the watch request. 53 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, 54 // and "WatchResponse" from this closed channel has zero events and nil "Err()". 55 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, 56 // to release the associated resources. 57 // 58 // If the context is "context.Background/TODO", returned "WatchChan" will 59 // not be closed and block until event is triggered, except when server 60 // returns a non-recoverable error (e.g. ErrCompacted). 61 // For example, when context passed with "WithRequireLeader" and the 62 // connected server has no leader (e.g. due to network partition), 63 // error "etcdserver: no leader" (ErrNoLeader) will be returned, 64 // and then "WatchChan" is closed with non-nil "Err()". 65 // In order to prevent a watch stream being stuck in a partitioned node, 66 // make sure to wrap context with "WithRequireLeader". 67 // 68 // Otherwise, as long as the context has not been canceled or timed out, 69 // watch will retry on other recoverable errors forever until reconnected. 70 // 71 // TODO: explicitly set context error in the last "WatchResponse" message and close channel? 72 // Currently, client contexts are overwritten with "valCtx" that never closes. 73 // TODO(v3.4): configure watch retry policy, limit maximum retry number 74 // (see https://github.com/etcd-io/etcd/issues/8980) 75 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan 76 77 // RequestProgress requests a progress notify response be sent in all watch channels. 78 RequestProgress(ctx context.Context) error 79 80 // Close closes the watcher and cancels all watch requests. 81 Close() error 82} 83 84type WatchResponse struct { 85 Header pb.ResponseHeader 86 Events []*Event 87 88 // CompactRevision is the minimum revision the watcher may receive. 89 CompactRevision int64 90 91 // Canceled is used to indicate watch failure. 92 // If the watch failed and the stream was about to close, before the channel is closed, 93 // the channel sends a final response that has Canceled set to true with a non-nil Err(). 94 Canceled bool 95 96 // Created is used to indicate the creation of the watcher. 97 Created bool 98 99 closeErr error 100 101 // cancelReason is a reason of canceling watch 102 cancelReason string 103} 104 105// IsCreate returns true if the event tells that the key is newly created. 106func (e *Event) IsCreate() bool { 107 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision 108} 109 110// IsModify returns true if the event tells that a new value is put on existing key. 111func (e *Event) IsModify() bool { 112 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision 113} 114 115// Err is the error value if this WatchResponse holds an error. 116func (wr *WatchResponse) Err() error { 117 switch { 118 case wr.closeErr != nil: 119 return v3rpc.Error(wr.closeErr) 120 case wr.CompactRevision != 0: 121 return v3rpc.ErrCompacted 122 case wr.Canceled: 123 if len(wr.cancelReason) != 0 { 124 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) 125 } 126 return v3rpc.ErrFutureRev 127 } 128 return nil 129} 130 131// IsProgressNotify returns true if the WatchResponse is progress notification. 132func (wr *WatchResponse) IsProgressNotify() bool { 133 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 134} 135 136// watcher implements the Watcher interface 137type watcher struct { 138 remote pb.WatchClient 139 callOpts []grpc.CallOption 140 141 // mu protects the grpc streams map 142 mu sync.RWMutex 143 144 // streams holds all the active grpc streams keyed by ctx value. 145 streams map[string]*watchGrpcStream 146 lg *zap.Logger 147} 148 149// watchGrpcStream tracks all watch resources attached to a single grpc stream. 150type watchGrpcStream struct { 151 owner *watcher 152 remote pb.WatchClient 153 callOpts []grpc.CallOption 154 155 // ctx controls internal remote.Watch requests 156 ctx context.Context 157 // ctxKey is the key used when looking up this stream's context 158 ctxKey string 159 cancel context.CancelFunc 160 161 // substreams holds all active watchers on this grpc stream 162 substreams map[int64]*watcherStream 163 // resuming holds all resuming watchers on this grpc stream 164 resuming []*watcherStream 165 166 // reqc sends a watch request from Watch() to the main goroutine 167 reqc chan watchStreamRequest 168 // respc receives data from the watch client 169 respc chan *pb.WatchResponse 170 // donec closes to broadcast shutdown 171 donec chan struct{} 172 // errc transmits errors from grpc Recv to the watch stream reconnect logic 173 errc chan error 174 // closingc gets the watcherStream of closing watchers 175 closingc chan *watcherStream 176 // wg is Done when all substream goroutines have exited 177 wg sync.WaitGroup 178 179 // resumec closes to signal that all substreams should begin resuming 180 resumec chan struct{} 181 // closeErr is the error that closed the watch stream 182 closeErr error 183} 184 185// watchStreamRequest is a union of the supported watch request operation types 186type watchStreamRequest interface { 187 toPB() *pb.WatchRequest 188} 189 190// watchRequest is issued by the subscriber to start a new watcher 191type watchRequest struct { 192 ctx context.Context 193 key string 194 end string 195 rev int64 196 197 // send created notification event if this field is true 198 createdNotify bool 199 // progressNotify is for progress updates 200 progressNotify bool 201 // fragmentation should be disabled by default 202 // if true, split watch events when total exceeds 203 // "--max-request-bytes" flag value + 512-byte 204 fragment bool 205 206 // filters is the list of events to filter out 207 filters []pb.WatchCreateRequest_FilterType 208 // get the previous key-value pair before the event happens 209 prevKV bool 210 // retc receives a chan WatchResponse once the watcher is established 211 retc chan chan WatchResponse 212} 213 214// progressRequest is issued by the subscriber to request watch progress 215type progressRequest struct { 216} 217 218// watcherStream represents a registered watcher 219type watcherStream struct { 220 // initReq is the request that initiated this request 221 initReq watchRequest 222 223 // outc publishes watch responses to subscriber 224 outc chan WatchResponse 225 // recvc buffers watch responses before publishing 226 recvc chan *WatchResponse 227 // donec closes when the watcherStream goroutine stops. 228 donec chan struct{} 229 // closing is set to true when stream should be scheduled to shutdown. 230 closing bool 231 // id is the registered watch id on the grpc stream 232 id int64 233 234 // buf holds all events received from etcd but not yet consumed by the client 235 buf []*WatchResponse 236} 237 238func NewWatcher(c *Client) Watcher { 239 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) 240} 241 242func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { 243 w := &watcher{ 244 remote: wc, 245 streams: make(map[string]*watchGrpcStream), 246 } 247 if c != nil { 248 w.callOpts = c.callOpts 249 w.lg = c.lg 250 } 251 return w 252} 253 254// never closes 255var valCtxCh = make(chan struct{}) 256var zeroTime = time.Unix(0, 0) 257 258// ctx with only the values; never Done 259type valCtx struct{ context.Context } 260 261func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } 262func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } 263func (vc *valCtx) Err() error { return nil } 264 265func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { 266 ctx, cancel := context.WithCancel(&valCtx{inctx}) 267 wgs := &watchGrpcStream{ 268 owner: w, 269 remote: w.remote, 270 callOpts: w.callOpts, 271 ctx: ctx, 272 ctxKey: streamKeyFromCtx(inctx), 273 cancel: cancel, 274 substreams: make(map[int64]*watcherStream), 275 respc: make(chan *pb.WatchResponse), 276 reqc: make(chan watchStreamRequest), 277 donec: make(chan struct{}), 278 errc: make(chan error, 1), 279 closingc: make(chan *watcherStream), 280 resumec: make(chan struct{}), 281 } 282 go wgs.run() 283 return wgs 284} 285 286// Watch posts a watch request to run() and waits for a new watcher channel 287func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { 288 ow := opWatch(key, opts...) 289 290 var filters []pb.WatchCreateRequest_FilterType 291 if ow.filterPut { 292 filters = append(filters, pb.WatchCreateRequest_NOPUT) 293 } 294 if ow.filterDelete { 295 filters = append(filters, pb.WatchCreateRequest_NODELETE) 296 } 297 298 wr := &watchRequest{ 299 ctx: ctx, 300 createdNotify: ow.createdNotify, 301 key: string(ow.key), 302 end: string(ow.end), 303 rev: ow.rev, 304 progressNotify: ow.progressNotify, 305 fragment: ow.fragment, 306 filters: filters, 307 prevKV: ow.prevKV, 308 retc: make(chan chan WatchResponse, 1), 309 } 310 311 ok := false 312 ctxKey := streamKeyFromCtx(ctx) 313 314 // find or allocate appropriate grpc watch stream 315 w.mu.Lock() 316 if w.streams == nil { 317 // closed 318 w.mu.Unlock() 319 ch := make(chan WatchResponse) 320 close(ch) 321 return ch 322 } 323 wgs := w.streams[ctxKey] 324 if wgs == nil { 325 wgs = w.newWatcherGrpcStream(ctx) 326 w.streams[ctxKey] = wgs 327 } 328 donec := wgs.donec 329 reqc := wgs.reqc 330 w.mu.Unlock() 331 332 // couldn't create channel; return closed channel 333 closeCh := make(chan WatchResponse, 1) 334 335 // submit request 336 select { 337 case reqc <- wr: 338 ok = true 339 case <-wr.ctx.Done(): 340 case <-donec: 341 if wgs.closeErr != nil { 342 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 343 break 344 } 345 // retry; may have dropped stream from no ctxs 346 return w.Watch(ctx, key, opts...) 347 } 348 349 // receive channel 350 if ok { 351 select { 352 case ret := <-wr.retc: 353 return ret 354 case <-ctx.Done(): 355 case <-donec: 356 if wgs.closeErr != nil { 357 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 358 break 359 } 360 // retry; may have dropped stream from no ctxs 361 return w.Watch(ctx, key, opts...) 362 } 363 } 364 365 close(closeCh) 366 return closeCh 367} 368 369func (w *watcher) Close() (err error) { 370 w.mu.Lock() 371 streams := w.streams 372 w.streams = nil 373 w.mu.Unlock() 374 for _, wgs := range streams { 375 if werr := wgs.close(); werr != nil { 376 err = werr 377 } 378 } 379 // Consider context.Canceled as a successful close 380 if err == context.Canceled { 381 err = nil 382 } 383 return err 384} 385 386// RequestProgress requests a progress notify response be sent in all watch channels. 387func (w *watcher) RequestProgress(ctx context.Context) (err error) { 388 ctxKey := streamKeyFromCtx(ctx) 389 390 w.mu.Lock() 391 if w.streams == nil { 392 w.mu.Unlock() 393 return fmt.Errorf("no stream found for context") 394 } 395 wgs := w.streams[ctxKey] 396 if wgs == nil { 397 wgs = w.newWatcherGrpcStream(ctx) 398 w.streams[ctxKey] = wgs 399 } 400 donec := wgs.donec 401 reqc := wgs.reqc 402 w.mu.Unlock() 403 404 pr := &progressRequest{} 405 406 select { 407 case reqc <- pr: 408 return nil 409 case <-ctx.Done(): 410 return ctx.Err() 411 case <-donec: 412 if wgs.closeErr != nil { 413 return wgs.closeErr 414 } 415 // retry; may have dropped stream from no ctxs 416 return w.RequestProgress(ctx) 417 } 418} 419 420func (w *watchGrpcStream) close() (err error) { 421 w.cancel() 422 <-w.donec 423 select { 424 case err = <-w.errc: 425 default: 426 } 427 return toErr(w.ctx, err) 428} 429 430func (w *watcher) closeStream(wgs *watchGrpcStream) { 431 w.mu.Lock() 432 close(wgs.donec) 433 wgs.cancel() 434 if w.streams != nil { 435 delete(w.streams, wgs.ctxKey) 436 } 437 w.mu.Unlock() 438} 439 440func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { 441 // check watch ID for backward compatibility (<= v3.3) 442 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { 443 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) 444 // failed; no channel 445 close(ws.recvc) 446 return 447 } 448 ws.id = resp.WatchId 449 w.substreams[ws.id] = ws 450} 451 452func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { 453 select { 454 case ws.outc <- *resp: 455 case <-ws.initReq.ctx.Done(): 456 case <-time.After(closeSendErrTimeout): 457 } 458 close(ws.outc) 459} 460 461func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { 462 // send channel response in case stream was never established 463 select { 464 case ws.initReq.retc <- ws.outc: 465 default: 466 } 467 // close subscriber's channel 468 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { 469 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) 470 } else if ws.outc != nil { 471 close(ws.outc) 472 } 473 if ws.id != -1 { 474 delete(w.substreams, ws.id) 475 return 476 } 477 for i := range w.resuming { 478 if w.resuming[i] == ws { 479 w.resuming[i] = nil 480 return 481 } 482 } 483} 484 485// run is the root of the goroutines for managing a watcher client 486func (w *watchGrpcStream) run() { 487 var wc pb.Watch_WatchClient 488 var closeErr error 489 490 // substreams marked to close but goroutine still running; needed for 491 // avoiding double-closing recvc on grpc stream teardown 492 closing := make(map[*watcherStream]struct{}) 493 494 defer func() { 495 w.closeErr = closeErr 496 // shutdown substreams and resuming substreams 497 for _, ws := range w.substreams { 498 if _, ok := closing[ws]; !ok { 499 close(ws.recvc) 500 closing[ws] = struct{}{} 501 } 502 } 503 for _, ws := range w.resuming { 504 if _, ok := closing[ws]; ws != nil && !ok { 505 close(ws.recvc) 506 closing[ws] = struct{}{} 507 } 508 } 509 w.joinSubstreams() 510 for range closing { 511 w.closeSubstream(<-w.closingc) 512 } 513 w.wg.Wait() 514 w.owner.closeStream(w) 515 }() 516 517 // start a stream with the etcd grpc server 518 if wc, closeErr = w.newWatchClient(); closeErr != nil { 519 return 520 } 521 522 cancelSet := make(map[int64]struct{}) 523 524 var cur *pb.WatchResponse 525 for { 526 select { 527 // Watch() requested 528 case req := <-w.reqc: 529 switch wreq := req.(type) { 530 case *watchRequest: 531 outc := make(chan WatchResponse, 1) 532 // TODO: pass custom watch ID? 533 ws := &watcherStream{ 534 initReq: *wreq, 535 id: -1, 536 outc: outc, 537 // unbuffered so resumes won't cause repeat events 538 recvc: make(chan *WatchResponse), 539 } 540 541 ws.donec = make(chan struct{}) 542 w.wg.Add(1) 543 go w.serveSubstream(ws, w.resumec) 544 545 // queue up for watcher creation/resume 546 w.resuming = append(w.resuming, ws) 547 if len(w.resuming) == 1 { 548 // head of resume queue, can register a new watcher 549 if err := wc.Send(ws.initReq.toPB()); err != nil { 550 lg.Warningf("error when sending request: %v", err) 551 } 552 } 553 case *progressRequest: 554 if err := wc.Send(wreq.toPB()); err != nil { 555 lg.Warningf("error when sending request: %v", err) 556 } 557 } 558 559 // new events from the watch client 560 case pbresp := <-w.respc: 561 if cur == nil || pbresp.Created || pbresp.Canceled { 562 cur = pbresp 563 } else if cur != nil && cur.WatchId == pbresp.WatchId { 564 // merge new events 565 cur.Events = append(cur.Events, pbresp.Events...) 566 // update "Fragment" field; last response with "Fragment" == false 567 cur.Fragment = pbresp.Fragment 568 } 569 570 switch { 571 case pbresp.Created: 572 // response to head of queue creation 573 if ws := w.resuming[0]; ws != nil { 574 w.addSubstream(pbresp, ws) 575 w.dispatchEvent(pbresp) 576 w.resuming[0] = nil 577 } 578 579 if ws := w.nextResume(); ws != nil { 580 if err := wc.Send(ws.initReq.toPB()); err != nil { 581 lg.Warningf("error when sending request: %v", err) 582 } 583 } 584 585 // reset for next iteration 586 cur = nil 587 588 case pbresp.Canceled && pbresp.CompactRevision == 0: 589 delete(cancelSet, pbresp.WatchId) 590 if ws, ok := w.substreams[pbresp.WatchId]; ok { 591 // signal to stream goroutine to update closingc 592 close(ws.recvc) 593 closing[ws] = struct{}{} 594 } 595 596 // reset for next iteration 597 cur = nil 598 599 case cur.Fragment: 600 // watch response events are still fragmented 601 // continue to fetch next fragmented event arrival 602 continue 603 604 default: 605 // dispatch to appropriate watch stream 606 ok := w.dispatchEvent(cur) 607 608 // reset for next iteration 609 cur = nil 610 611 if ok { 612 break 613 } 614 615 // watch response on unexpected watch id; cancel id 616 if _, ok := cancelSet[pbresp.WatchId]; ok { 617 break 618 } 619 620 cancelSet[pbresp.WatchId] = struct{}{} 621 cr := &pb.WatchRequest_CancelRequest{ 622 CancelRequest: &pb.WatchCancelRequest{ 623 WatchId: pbresp.WatchId, 624 }, 625 } 626 req := &pb.WatchRequest{RequestUnion: cr} 627 if err := wc.Send(req); err != nil { 628 lg.Warningf("error when sending request: %v", err) 629 } 630 } 631 632 // watch client failed on Recv; spawn another if possible 633 case err := <-w.errc: 634 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { 635 closeErr = err 636 return 637 } 638 if wc, closeErr = w.newWatchClient(); closeErr != nil { 639 return 640 } 641 if ws := w.nextResume(); ws != nil { 642 if err := wc.Send(ws.initReq.toPB()); err != nil { 643 lg.Warningf("error when sending request: %v", err) 644 } 645 } 646 cancelSet = make(map[int64]struct{}) 647 648 case <-w.ctx.Done(): 649 return 650 651 case ws := <-w.closingc: 652 w.closeSubstream(ws) 653 delete(closing, ws) 654 // no more watchers on this stream, shutdown 655 if len(w.substreams)+len(w.resuming) == 0 { 656 return 657 } 658 } 659 } 660} 661 662// nextResume chooses the next resuming to register with the grpc stream. Abandoned 663// streams are marked as nil in the queue since the head must wait for its inflight registration. 664func (w *watchGrpcStream) nextResume() *watcherStream { 665 for len(w.resuming) != 0 { 666 if w.resuming[0] != nil { 667 return w.resuming[0] 668 } 669 w.resuming = w.resuming[1:len(w.resuming)] 670 } 671 return nil 672} 673 674// dispatchEvent sends a WatchResponse to the appropriate watcher stream 675func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { 676 events := make([]*Event, len(pbresp.Events)) 677 for i, ev := range pbresp.Events { 678 events[i] = (*Event)(ev) 679 } 680 // TODO: return watch ID? 681 wr := &WatchResponse{ 682 Header: *pbresp.Header, 683 Events: events, 684 CompactRevision: pbresp.CompactRevision, 685 Created: pbresp.Created, 686 Canceled: pbresp.Canceled, 687 cancelReason: pbresp.CancelReason, 688 } 689 690 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to 691 // indicate they should be broadcast. 692 if wr.IsProgressNotify() && pbresp.WatchId == -1 { 693 return w.broadcastResponse(wr) 694 } 695 696 return w.unicastResponse(wr, pbresp.WatchId) 697 698} 699 700// broadcastResponse send a watch response to all watch substreams. 701func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { 702 for _, ws := range w.substreams { 703 select { 704 case ws.recvc <- wr: 705 case <-ws.donec: 706 } 707 } 708 return true 709} 710 711// unicastResponse sends a watch response to a specific watch substream. 712func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { 713 ws, ok := w.substreams[watchId] 714 if !ok { 715 return false 716 } 717 select { 718 case ws.recvc <- wr: 719 case <-ws.donec: 720 return false 721 } 722 return true 723} 724 725// serveWatchClient forwards messages from the grpc stream to run() 726func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { 727 for { 728 resp, err := wc.Recv() 729 if err != nil { 730 select { 731 case w.errc <- err: 732 case <-w.donec: 733 } 734 return 735 } 736 select { 737 case w.respc <- resp: 738 case <-w.donec: 739 return 740 } 741 } 742} 743 744// serveSubstream forwards watch responses from run() to the subscriber 745func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { 746 if ws.closing { 747 panic("created substream goroutine but substream is closing") 748 } 749 750 // nextRev is the minimum expected next revision 751 nextRev := ws.initReq.rev 752 resuming := false 753 defer func() { 754 if !resuming { 755 ws.closing = true 756 } 757 close(ws.donec) 758 if !resuming { 759 w.closingc <- ws 760 } 761 w.wg.Done() 762 }() 763 764 emptyWr := &WatchResponse{} 765 for { 766 curWr := emptyWr 767 outc := ws.outc 768 769 if len(ws.buf) > 0 { 770 curWr = ws.buf[0] 771 } else { 772 outc = nil 773 } 774 select { 775 case outc <- *curWr: 776 if ws.buf[0].Err() != nil { 777 return 778 } 779 ws.buf[0] = nil 780 ws.buf = ws.buf[1:] 781 case wr, ok := <-ws.recvc: 782 if !ok { 783 // shutdown from closeSubstream 784 return 785 } 786 787 if wr.Created { 788 if ws.initReq.retc != nil { 789 ws.initReq.retc <- ws.outc 790 // to prevent next write from taking the slot in buffered channel 791 // and posting duplicate create events 792 ws.initReq.retc = nil 793 794 // send first creation event only if requested 795 if ws.initReq.createdNotify { 796 ws.outc <- *wr 797 } 798 // once the watch channel is returned, a current revision 799 // watch must resume at the store revision. This is necessary 800 // for the following case to work as expected: 801 // wch := m1.Watch("a") 802 // m2.Put("a", "b") 803 // <-wch 804 // If the revision is only bound on the first observed event, 805 // if wch is disconnected before the Put is issued, then reconnects 806 // after it is committed, it'll miss the Put. 807 if ws.initReq.rev == 0 { 808 nextRev = wr.Header.Revision 809 } 810 } 811 } else { 812 // current progress of watch; <= store revision 813 nextRev = wr.Header.Revision 814 } 815 816 if len(wr.Events) > 0 { 817 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 818 } 819 ws.initReq.rev = nextRev 820 821 // created event is already sent above, 822 // watcher should not post duplicate events 823 if wr.Created { 824 continue 825 } 826 827 // TODO pause channel if buffer gets too large 828 ws.buf = append(ws.buf, wr) 829 case <-w.ctx.Done(): 830 return 831 case <-ws.initReq.ctx.Done(): 832 return 833 case <-resumec: 834 resuming = true 835 return 836 } 837 } 838 // lazily send cancel message if events on missing id 839} 840 841func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { 842 // mark all substreams as resuming 843 close(w.resumec) 844 w.resumec = make(chan struct{}) 845 w.joinSubstreams() 846 for _, ws := range w.substreams { 847 ws.id = -1 848 w.resuming = append(w.resuming, ws) 849 } 850 // strip out nils, if any 851 var resuming []*watcherStream 852 for _, ws := range w.resuming { 853 if ws != nil { 854 resuming = append(resuming, ws) 855 } 856 } 857 w.resuming = resuming 858 w.substreams = make(map[int64]*watcherStream) 859 860 // connect to grpc stream while accepting watcher cancelation 861 stopc := make(chan struct{}) 862 donec := w.waitCancelSubstreams(stopc) 863 wc, err := w.openWatchClient() 864 close(stopc) 865 <-donec 866 867 // serve all non-closing streams, even if there's a client error 868 // so that the teardown path can shutdown the streams as expected. 869 for _, ws := range w.resuming { 870 if ws.closing { 871 continue 872 } 873 ws.donec = make(chan struct{}) 874 w.wg.Add(1) 875 go w.serveSubstream(ws, w.resumec) 876 } 877 878 if err != nil { 879 return nil, v3rpc.Error(err) 880 } 881 882 // receive data from new grpc stream 883 go w.serveWatchClient(wc) 884 return wc, nil 885} 886 887func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { 888 var wg sync.WaitGroup 889 wg.Add(len(w.resuming)) 890 donec := make(chan struct{}) 891 for i := range w.resuming { 892 go func(ws *watcherStream) { 893 defer wg.Done() 894 if ws.closing { 895 if ws.initReq.ctx.Err() != nil && ws.outc != nil { 896 close(ws.outc) 897 ws.outc = nil 898 } 899 return 900 } 901 select { 902 case <-ws.initReq.ctx.Done(): 903 // closed ws will be removed from resuming 904 ws.closing = true 905 close(ws.outc) 906 ws.outc = nil 907 w.wg.Add(1) 908 go func() { 909 defer w.wg.Done() 910 w.closingc <- ws 911 }() 912 case <-stopc: 913 } 914 }(w.resuming[i]) 915 } 916 go func() { 917 defer close(donec) 918 wg.Wait() 919 }() 920 return donec 921} 922 923// joinSubstreams waits for all substream goroutines to complete. 924func (w *watchGrpcStream) joinSubstreams() { 925 for _, ws := range w.substreams { 926 <-ws.donec 927 } 928 for _, ws := range w.resuming { 929 if ws != nil { 930 <-ws.donec 931 } 932 } 933} 934 935var maxBackoff = 100 * time.Millisecond 936 937// openWatchClient retries opening a watch client until success or halt. 938// manually retry in case "ws==nil && err==nil" 939// TODO: remove FailFast=false 940func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { 941 backoff := time.Millisecond 942 for { 943 select { 944 case <-w.ctx.Done(): 945 if err == nil { 946 return nil, w.ctx.Err() 947 } 948 return nil, err 949 default: 950 } 951 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { 952 break 953 } 954 if isHaltErr(w.ctx, err) { 955 return nil, v3rpc.Error(err) 956 } 957 if isUnavailableErr(w.ctx, err) { 958 // retry, but backoff 959 if backoff < maxBackoff { 960 // 25% backoff factor 961 backoff = backoff + backoff/4 962 if backoff > maxBackoff { 963 backoff = maxBackoff 964 } 965 } 966 time.Sleep(backoff) 967 } 968 } 969 return ws, nil 970} 971 972// toPB converts an internal watch request structure to its protobuf WatchRequest structure. 973func (wr *watchRequest) toPB() *pb.WatchRequest { 974 req := &pb.WatchCreateRequest{ 975 StartRevision: wr.rev, 976 Key: []byte(wr.key), 977 RangeEnd: []byte(wr.end), 978 ProgressNotify: wr.progressNotify, 979 Filters: wr.filters, 980 PrevKv: wr.prevKV, 981 Fragment: wr.fragment, 982 } 983 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} 984 return &pb.WatchRequest{RequestUnion: cr} 985} 986 987// toPB converts an internal progress request structure to its protobuf WatchRequest structure. 988func (pr *progressRequest) toPB() *pb.WatchRequest { 989 req := &pb.WatchProgressRequest{} 990 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} 991 return &pb.WatchRequest{RequestUnion: cr} 992} 993 994func streamKeyFromCtx(ctx context.Context) string { 995 if md, ok := metadata.FromOutgoingContext(ctx); ok { 996 return fmt.Sprintf("%+v", md) 997 } 998 return "" 999} 1000