1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "sync" 22 "time" 23 24 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 25 "go.etcd.io/etcd/api/v3/mvccpb" 26 v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 27 28 "go.uber.org/zap" 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/status" 33) 34 35const ( 36 EventTypeDelete = mvccpb.DELETE 37 EventTypePut = mvccpb.PUT 38 39 closeSendErrTimeout = 250 * time.Millisecond 40) 41 42type Event mvccpb.Event 43 44type WatchChan <-chan WatchResponse 45 46type Watcher interface { 47 // Watch watches on a key or prefix. The watched events will be returned 48 // through the returned channel. If revisions waiting to be sent over the 49 // watch are compacted, then the watch will be canceled by the server, the 50 // client will post a compacted error watch response, and the channel will close. 51 // If the requested revision is 0 or unspecified, the returned channel will 52 // return watch events that happen after the server receives the watch request. 53 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, 54 // and "WatchResponse" from this closed channel has zero events and nil "Err()". 55 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, 56 // to release the associated resources. 57 // 58 // If the context is "context.Background/TODO", returned "WatchChan" will 59 // not be closed and block until event is triggered, except when server 60 // returns a non-recoverable error (e.g. ErrCompacted). 61 // For example, when context passed with "WithRequireLeader" and the 62 // connected server has no leader (e.g. due to network partition), 63 // error "etcdserver: no leader" (ErrNoLeader) will be returned, 64 // and then "WatchChan" is closed with non-nil "Err()". 65 // In order to prevent a watch stream being stuck in a partitioned node, 66 // make sure to wrap context with "WithRequireLeader". 67 // 68 // Otherwise, as long as the context has not been canceled or timed out, 69 // watch will retry on other recoverable errors forever until reconnected. 70 // 71 // TODO: explicitly set context error in the last "WatchResponse" message and close channel? 72 // Currently, client contexts are overwritten with "valCtx" that never closes. 73 // TODO(v3.4): configure watch retry policy, limit maximum retry number 74 // (see https://github.com/etcd-io/etcd/issues/8980) 75 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan 76 77 // RequestProgress requests a progress notify response be sent in all watch channels. 78 RequestProgress(ctx context.Context) error 79 80 // Close closes the watcher and cancels all watch requests. 81 Close() error 82} 83 84type WatchResponse struct { 85 Header pb.ResponseHeader 86 Events []*Event 87 88 // CompactRevision is the minimum revision the watcher may receive. 89 CompactRevision int64 90 91 // Canceled is used to indicate watch failure. 92 // If the watch failed and the stream was about to close, before the channel is closed, 93 // the channel sends a final response that has Canceled set to true with a non-nil Err(). 94 Canceled bool 95 96 // Created is used to indicate the creation of the watcher. 97 Created bool 98 99 closeErr error 100 101 // cancelReason is a reason of canceling watch 102 cancelReason string 103} 104 105// IsCreate returns true if the event tells that the key is newly created. 106func (e *Event) IsCreate() bool { 107 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision 108} 109 110// IsModify returns true if the event tells that a new value is put on existing key. 111func (e *Event) IsModify() bool { 112 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision 113} 114 115// Err is the error value if this WatchResponse holds an error. 116func (wr *WatchResponse) Err() error { 117 switch { 118 case wr.closeErr != nil: 119 return v3rpc.Error(wr.closeErr) 120 case wr.CompactRevision != 0: 121 return v3rpc.ErrCompacted 122 case wr.Canceled: 123 if len(wr.cancelReason) != 0 { 124 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) 125 } 126 return v3rpc.ErrFutureRev 127 } 128 return nil 129} 130 131// IsProgressNotify returns true if the WatchResponse is progress notification. 132func (wr *WatchResponse) IsProgressNotify() bool { 133 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 134} 135 136// watcher implements the Watcher interface 137type watcher struct { 138 remote pb.WatchClient 139 callOpts []grpc.CallOption 140 141 // mu protects the grpc streams map 142 mu sync.Mutex 143 144 // streams holds all the active grpc streams keyed by ctx value. 145 streams map[string]*watchGrpcStream 146 lg *zap.Logger 147} 148 149// watchGrpcStream tracks all watch resources attached to a single grpc stream. 150type watchGrpcStream struct { 151 owner *watcher 152 remote pb.WatchClient 153 callOpts []grpc.CallOption 154 155 // ctx controls internal remote.Watch requests 156 ctx context.Context 157 // ctxKey is the key used when looking up this stream's context 158 ctxKey string 159 cancel context.CancelFunc 160 161 // substreams holds all active watchers on this grpc stream 162 substreams map[int64]*watcherStream 163 // resuming holds all resuming watchers on this grpc stream 164 resuming []*watcherStream 165 166 // reqc sends a watch request from Watch() to the main goroutine 167 reqc chan watchStreamRequest 168 // respc receives data from the watch client 169 respc chan *pb.WatchResponse 170 // donec closes to broadcast shutdown 171 donec chan struct{} 172 // errc transmits errors from grpc Recv to the watch stream reconnect logic 173 errc chan error 174 // closingc gets the watcherStream of closing watchers 175 closingc chan *watcherStream 176 // wg is Done when all substream goroutines have exited 177 wg sync.WaitGroup 178 179 // resumec closes to signal that all substreams should begin resuming 180 resumec chan struct{} 181 // closeErr is the error that closed the watch stream 182 closeErr error 183 184 lg *zap.Logger 185} 186 187// watchStreamRequest is a union of the supported watch request operation types 188type watchStreamRequest interface { 189 toPB() *pb.WatchRequest 190} 191 192// watchRequest is issued by the subscriber to start a new watcher 193type watchRequest struct { 194 ctx context.Context 195 key string 196 end string 197 rev int64 198 199 // send created notification event if this field is true 200 createdNotify bool 201 // progressNotify is for progress updates 202 progressNotify bool 203 // fragmentation should be disabled by default 204 // if true, split watch events when total exceeds 205 // "--max-request-bytes" flag value + 512-byte 206 fragment bool 207 208 // filters is the list of events to filter out 209 filters []pb.WatchCreateRequest_FilterType 210 // get the previous key-value pair before the event happens 211 prevKV bool 212 // retc receives a chan WatchResponse once the watcher is established 213 retc chan chan WatchResponse 214} 215 216// progressRequest is issued by the subscriber to request watch progress 217type progressRequest struct { 218} 219 220// watcherStream represents a registered watcher 221type watcherStream struct { 222 // initReq is the request that initiated this request 223 initReq watchRequest 224 225 // outc publishes watch responses to subscriber 226 outc chan WatchResponse 227 // recvc buffers watch responses before publishing 228 recvc chan *WatchResponse 229 // donec closes when the watcherStream goroutine stops. 230 donec chan struct{} 231 // closing is set to true when stream should be scheduled to shutdown. 232 closing bool 233 // id is the registered watch id on the grpc stream 234 id int64 235 236 // buf holds all events received from etcd but not yet consumed by the client 237 buf []*WatchResponse 238} 239 240func NewWatcher(c *Client) Watcher { 241 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) 242} 243 244func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { 245 w := &watcher{ 246 remote: wc, 247 streams: make(map[string]*watchGrpcStream), 248 } 249 if c != nil { 250 w.callOpts = c.callOpts 251 w.lg = c.lg 252 } 253 return w 254} 255 256// never closes 257var valCtxCh = make(chan struct{}) 258var zeroTime = time.Unix(0, 0) 259 260// ctx with only the values; never Done 261type valCtx struct{ context.Context } 262 263func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } 264func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } 265func (vc *valCtx) Err() error { return nil } 266 267func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { 268 ctx, cancel := context.WithCancel(&valCtx{inctx}) 269 wgs := &watchGrpcStream{ 270 owner: w, 271 remote: w.remote, 272 callOpts: w.callOpts, 273 ctx: ctx, 274 ctxKey: streamKeyFromCtx(inctx), 275 cancel: cancel, 276 substreams: make(map[int64]*watcherStream), 277 respc: make(chan *pb.WatchResponse), 278 reqc: make(chan watchStreamRequest), 279 donec: make(chan struct{}), 280 errc: make(chan error, 1), 281 closingc: make(chan *watcherStream), 282 resumec: make(chan struct{}), 283 lg: w.lg, 284 } 285 go wgs.run() 286 return wgs 287} 288 289// Watch posts a watch request to run() and waits for a new watcher channel 290func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { 291 ow := opWatch(key, opts...) 292 293 var filters []pb.WatchCreateRequest_FilterType 294 if ow.filterPut { 295 filters = append(filters, pb.WatchCreateRequest_NOPUT) 296 } 297 if ow.filterDelete { 298 filters = append(filters, pb.WatchCreateRequest_NODELETE) 299 } 300 301 wr := &watchRequest{ 302 ctx: ctx, 303 createdNotify: ow.createdNotify, 304 key: string(ow.key), 305 end: string(ow.end), 306 rev: ow.rev, 307 progressNotify: ow.progressNotify, 308 fragment: ow.fragment, 309 filters: filters, 310 prevKV: ow.prevKV, 311 retc: make(chan chan WatchResponse, 1), 312 } 313 314 ok := false 315 ctxKey := streamKeyFromCtx(ctx) 316 317 var closeCh chan WatchResponse 318 for { 319 // find or allocate appropriate grpc watch stream 320 w.mu.Lock() 321 if w.streams == nil { 322 // closed 323 w.mu.Unlock() 324 ch := make(chan WatchResponse) 325 close(ch) 326 return ch 327 } 328 wgs := w.streams[ctxKey] 329 if wgs == nil { 330 wgs = w.newWatcherGrpcStream(ctx) 331 w.streams[ctxKey] = wgs 332 } 333 donec := wgs.donec 334 reqc := wgs.reqc 335 w.mu.Unlock() 336 337 // couldn't create channel; return closed channel 338 if closeCh == nil { 339 closeCh = make(chan WatchResponse, 1) 340 } 341 342 // submit request 343 select { 344 case reqc <- wr: 345 ok = true 346 case <-wr.ctx.Done(): 347 ok = false 348 case <-donec: 349 ok = false 350 if wgs.closeErr != nil { 351 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 352 break 353 } 354 // retry; may have dropped stream from no ctxs 355 continue 356 } 357 358 // receive channel 359 if ok { 360 select { 361 case ret := <-wr.retc: 362 return ret 363 case <-ctx.Done(): 364 case <-donec: 365 if wgs.closeErr != nil { 366 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 367 break 368 } 369 // retry; may have dropped stream from no ctxs 370 continue 371 } 372 } 373 break 374 } 375 376 close(closeCh) 377 return closeCh 378} 379 380func (w *watcher) Close() (err error) { 381 w.mu.Lock() 382 streams := w.streams 383 w.streams = nil 384 w.mu.Unlock() 385 for _, wgs := range streams { 386 if werr := wgs.close(); werr != nil { 387 err = werr 388 } 389 } 390 // Consider context.Canceled as a successful close 391 if err == context.Canceled { 392 err = nil 393 } 394 return err 395} 396 397// RequestProgress requests a progress notify response be sent in all watch channels. 398func (w *watcher) RequestProgress(ctx context.Context) (err error) { 399 ctxKey := streamKeyFromCtx(ctx) 400 401 w.mu.Lock() 402 if w.streams == nil { 403 w.mu.Unlock() 404 return fmt.Errorf("no stream found for context") 405 } 406 wgs := w.streams[ctxKey] 407 if wgs == nil { 408 wgs = w.newWatcherGrpcStream(ctx) 409 w.streams[ctxKey] = wgs 410 } 411 donec := wgs.donec 412 reqc := wgs.reqc 413 w.mu.Unlock() 414 415 pr := &progressRequest{} 416 417 select { 418 case reqc <- pr: 419 return nil 420 case <-ctx.Done(): 421 return ctx.Err() 422 case <-donec: 423 if wgs.closeErr != nil { 424 return wgs.closeErr 425 } 426 // retry; may have dropped stream from no ctxs 427 return w.RequestProgress(ctx) 428 } 429} 430 431func (w *watchGrpcStream) close() (err error) { 432 w.cancel() 433 <-w.donec 434 select { 435 case err = <-w.errc: 436 default: 437 } 438 return toErr(w.ctx, err) 439} 440 441func (w *watcher) closeStream(wgs *watchGrpcStream) { 442 w.mu.Lock() 443 close(wgs.donec) 444 wgs.cancel() 445 if w.streams != nil { 446 delete(w.streams, wgs.ctxKey) 447 } 448 w.mu.Unlock() 449} 450 451func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { 452 // check watch ID for backward compatibility (<= v3.3) 453 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { 454 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) 455 // failed; no channel 456 close(ws.recvc) 457 return 458 } 459 ws.id = resp.WatchId 460 w.substreams[ws.id] = ws 461} 462 463func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { 464 select { 465 case ws.outc <- *resp: 466 case <-ws.initReq.ctx.Done(): 467 case <-time.After(closeSendErrTimeout): 468 } 469 close(ws.outc) 470} 471 472func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { 473 // send channel response in case stream was never established 474 select { 475 case ws.initReq.retc <- ws.outc: 476 default: 477 } 478 // close subscriber's channel 479 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { 480 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) 481 } else if ws.outc != nil { 482 close(ws.outc) 483 } 484 if ws.id != -1 { 485 delete(w.substreams, ws.id) 486 return 487 } 488 for i := range w.resuming { 489 if w.resuming[i] == ws { 490 w.resuming[i] = nil 491 return 492 } 493 } 494} 495 496// run is the root of the goroutines for managing a watcher client 497func (w *watchGrpcStream) run() { 498 var wc pb.Watch_WatchClient 499 var closeErr error 500 501 // substreams marked to close but goroutine still running; needed for 502 // avoiding double-closing recvc on grpc stream teardown 503 closing := make(map[*watcherStream]struct{}) 504 505 defer func() { 506 w.closeErr = closeErr 507 // shutdown substreams and resuming substreams 508 for _, ws := range w.substreams { 509 if _, ok := closing[ws]; !ok { 510 close(ws.recvc) 511 closing[ws] = struct{}{} 512 } 513 } 514 for _, ws := range w.resuming { 515 if _, ok := closing[ws]; ws != nil && !ok { 516 close(ws.recvc) 517 closing[ws] = struct{}{} 518 } 519 } 520 w.joinSubstreams() 521 for range closing { 522 w.closeSubstream(<-w.closingc) 523 } 524 w.wg.Wait() 525 w.owner.closeStream(w) 526 }() 527 528 // start a stream with the etcd grpc server 529 if wc, closeErr = w.newWatchClient(); closeErr != nil { 530 return 531 } 532 533 cancelSet := make(map[int64]struct{}) 534 535 var cur *pb.WatchResponse 536 for { 537 select { 538 // Watch() requested 539 case req := <-w.reqc: 540 switch wreq := req.(type) { 541 case *watchRequest: 542 outc := make(chan WatchResponse, 1) 543 // TODO: pass custom watch ID? 544 ws := &watcherStream{ 545 initReq: *wreq, 546 id: -1, 547 outc: outc, 548 // unbuffered so resumes won't cause repeat events 549 recvc: make(chan *WatchResponse), 550 } 551 552 ws.donec = make(chan struct{}) 553 w.wg.Add(1) 554 go w.serveSubstream(ws, w.resumec) 555 556 // queue up for watcher creation/resume 557 w.resuming = append(w.resuming, ws) 558 if len(w.resuming) == 1 { 559 // head of resume queue, can register a new watcher 560 if err := wc.Send(ws.initReq.toPB()); err != nil { 561 w.lg.Debug("error when sending request", zap.Error(err)) 562 } 563 } 564 case *progressRequest: 565 if err := wc.Send(wreq.toPB()); err != nil { 566 w.lg.Debug("error when sending request", zap.Error(err)) 567 } 568 } 569 570 // new events from the watch client 571 case pbresp := <-w.respc: 572 if cur == nil || pbresp.Created || pbresp.Canceled { 573 cur = pbresp 574 } else if cur != nil && cur.WatchId == pbresp.WatchId { 575 // merge new events 576 cur.Events = append(cur.Events, pbresp.Events...) 577 // update "Fragment" field; last response with "Fragment" == false 578 cur.Fragment = pbresp.Fragment 579 } 580 581 switch { 582 case pbresp.Created: 583 // response to head of queue creation 584 if ws := w.resuming[0]; ws != nil { 585 w.addSubstream(pbresp, ws) 586 w.dispatchEvent(pbresp) 587 w.resuming[0] = nil 588 } 589 590 if ws := w.nextResume(); ws != nil { 591 if err := wc.Send(ws.initReq.toPB()); err != nil { 592 w.lg.Debug("error when sending request", zap.Error(err)) 593 } 594 } 595 596 // reset for next iteration 597 cur = nil 598 599 case pbresp.Canceled && pbresp.CompactRevision == 0: 600 delete(cancelSet, pbresp.WatchId) 601 if ws, ok := w.substreams[pbresp.WatchId]; ok { 602 // signal to stream goroutine to update closingc 603 close(ws.recvc) 604 closing[ws] = struct{}{} 605 } 606 607 // reset for next iteration 608 cur = nil 609 610 case cur.Fragment: 611 // watch response events are still fragmented 612 // continue to fetch next fragmented event arrival 613 continue 614 615 default: 616 // dispatch to appropriate watch stream 617 ok := w.dispatchEvent(cur) 618 619 // reset for next iteration 620 cur = nil 621 622 if ok { 623 break 624 } 625 626 // watch response on unexpected watch id; cancel id 627 if _, ok := cancelSet[pbresp.WatchId]; ok { 628 break 629 } 630 631 cancelSet[pbresp.WatchId] = struct{}{} 632 cr := &pb.WatchRequest_CancelRequest{ 633 CancelRequest: &pb.WatchCancelRequest{ 634 WatchId: pbresp.WatchId, 635 }, 636 } 637 req := &pb.WatchRequest{RequestUnion: cr} 638 w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId)) 639 if err := wc.Send(req); err != nil { 640 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) 641 } 642 } 643 644 // watch client failed on Recv; spawn another if possible 645 case err := <-w.errc: 646 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { 647 closeErr = err 648 return 649 } 650 if wc, closeErr = w.newWatchClient(); closeErr != nil { 651 return 652 } 653 if ws := w.nextResume(); ws != nil { 654 if err := wc.Send(ws.initReq.toPB()); err != nil { 655 w.lg.Debug("error when sending request", zap.Error(err)) 656 } 657 } 658 cancelSet = make(map[int64]struct{}) 659 660 case <-w.ctx.Done(): 661 return 662 663 case ws := <-w.closingc: 664 w.closeSubstream(ws) 665 delete(closing, ws) 666 // no more watchers on this stream, shutdown, skip cancellation 667 if len(w.substreams)+len(w.resuming) == 0 { 668 return 669 } 670 if ws.id != -1 { 671 // client is closing an established watch; close it on the server proactively instead of waiting 672 // to close when the next message arrives 673 cancelSet[ws.id] = struct{}{} 674 cr := &pb.WatchRequest_CancelRequest{ 675 CancelRequest: &pb.WatchCancelRequest{ 676 WatchId: ws.id, 677 }, 678 } 679 req := &pb.WatchRequest{RequestUnion: cr} 680 w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id)) 681 if err := wc.Send(req); err != nil { 682 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) 683 } 684 } 685 } 686 } 687} 688 689// nextResume chooses the next resuming to register with the grpc stream. Abandoned 690// streams are marked as nil in the queue since the head must wait for its inflight registration. 691func (w *watchGrpcStream) nextResume() *watcherStream { 692 for len(w.resuming) != 0 { 693 if w.resuming[0] != nil { 694 return w.resuming[0] 695 } 696 w.resuming = w.resuming[1:len(w.resuming)] 697 } 698 return nil 699} 700 701// dispatchEvent sends a WatchResponse to the appropriate watcher stream 702func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { 703 events := make([]*Event, len(pbresp.Events)) 704 for i, ev := range pbresp.Events { 705 events[i] = (*Event)(ev) 706 } 707 // TODO: return watch ID? 708 wr := &WatchResponse{ 709 Header: *pbresp.Header, 710 Events: events, 711 CompactRevision: pbresp.CompactRevision, 712 Created: pbresp.Created, 713 Canceled: pbresp.Canceled, 714 cancelReason: pbresp.CancelReason, 715 } 716 717 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to 718 // indicate they should be broadcast. 719 if wr.IsProgressNotify() && pbresp.WatchId == -1 { 720 return w.broadcastResponse(wr) 721 } 722 723 return w.unicastResponse(wr, pbresp.WatchId) 724 725} 726 727// broadcastResponse send a watch response to all watch substreams. 728func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { 729 for _, ws := range w.substreams { 730 select { 731 case ws.recvc <- wr: 732 case <-ws.donec: 733 } 734 } 735 return true 736} 737 738// unicastResponse sends a watch response to a specific watch substream. 739func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { 740 ws, ok := w.substreams[watchId] 741 if !ok { 742 return false 743 } 744 select { 745 case ws.recvc <- wr: 746 case <-ws.donec: 747 return false 748 } 749 return true 750} 751 752// serveWatchClient forwards messages from the grpc stream to run() 753func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { 754 for { 755 resp, err := wc.Recv() 756 if err != nil { 757 select { 758 case w.errc <- err: 759 case <-w.donec: 760 } 761 return 762 } 763 select { 764 case w.respc <- resp: 765 case <-w.donec: 766 return 767 } 768 } 769} 770 771// serveSubstream forwards watch responses from run() to the subscriber 772func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { 773 if ws.closing { 774 panic("created substream goroutine but substream is closing") 775 } 776 777 // nextRev is the minimum expected next revision 778 nextRev := ws.initReq.rev 779 resuming := false 780 defer func() { 781 if !resuming { 782 ws.closing = true 783 } 784 close(ws.donec) 785 if !resuming { 786 w.closingc <- ws 787 } 788 w.wg.Done() 789 }() 790 791 emptyWr := &WatchResponse{} 792 for { 793 curWr := emptyWr 794 outc := ws.outc 795 796 if len(ws.buf) > 0 { 797 curWr = ws.buf[0] 798 } else { 799 outc = nil 800 } 801 select { 802 case outc <- *curWr: 803 if ws.buf[0].Err() != nil { 804 return 805 } 806 ws.buf[0] = nil 807 ws.buf = ws.buf[1:] 808 case wr, ok := <-ws.recvc: 809 if !ok { 810 // shutdown from closeSubstream 811 return 812 } 813 814 if wr.Created { 815 if ws.initReq.retc != nil { 816 ws.initReq.retc <- ws.outc 817 // to prevent next write from taking the slot in buffered channel 818 // and posting duplicate create events 819 ws.initReq.retc = nil 820 821 // send first creation event only if requested 822 if ws.initReq.createdNotify { 823 ws.outc <- *wr 824 } 825 // once the watch channel is returned, a current revision 826 // watch must resume at the store revision. This is necessary 827 // for the following case to work as expected: 828 // wch := m1.Watch("a") 829 // m2.Put("a", "b") 830 // <-wch 831 // If the revision is only bound on the first observed event, 832 // if wch is disconnected before the Put is issued, then reconnects 833 // after it is committed, it'll miss the Put. 834 if ws.initReq.rev == 0 { 835 nextRev = wr.Header.Revision 836 } 837 } 838 } else { 839 // current progress of watch; <= store revision 840 nextRev = wr.Header.Revision 841 } 842 843 if len(wr.Events) > 0 { 844 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 845 } 846 ws.initReq.rev = nextRev 847 848 // created event is already sent above, 849 // watcher should not post duplicate events 850 if wr.Created { 851 continue 852 } 853 854 // TODO pause channel if buffer gets too large 855 ws.buf = append(ws.buf, wr) 856 case <-w.ctx.Done(): 857 return 858 case <-ws.initReq.ctx.Done(): 859 return 860 case <-resumec: 861 resuming = true 862 return 863 } 864 } 865 // lazily send cancel message if events on missing id 866} 867 868func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { 869 // mark all substreams as resuming 870 close(w.resumec) 871 w.resumec = make(chan struct{}) 872 w.joinSubstreams() 873 for _, ws := range w.substreams { 874 ws.id = -1 875 w.resuming = append(w.resuming, ws) 876 } 877 // strip out nils, if any 878 var resuming []*watcherStream 879 for _, ws := range w.resuming { 880 if ws != nil { 881 resuming = append(resuming, ws) 882 } 883 } 884 w.resuming = resuming 885 w.substreams = make(map[int64]*watcherStream) 886 887 // connect to grpc stream while accepting watcher cancelation 888 stopc := make(chan struct{}) 889 donec := w.waitCancelSubstreams(stopc) 890 wc, err := w.openWatchClient() 891 close(stopc) 892 <-donec 893 894 // serve all non-closing streams, even if there's a client error 895 // so that the teardown path can shutdown the streams as expected. 896 for _, ws := range w.resuming { 897 if ws.closing { 898 continue 899 } 900 ws.donec = make(chan struct{}) 901 w.wg.Add(1) 902 go w.serveSubstream(ws, w.resumec) 903 } 904 905 if err != nil { 906 return nil, v3rpc.Error(err) 907 } 908 909 // receive data from new grpc stream 910 go w.serveWatchClient(wc) 911 return wc, nil 912} 913 914func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { 915 var wg sync.WaitGroup 916 wg.Add(len(w.resuming)) 917 donec := make(chan struct{}) 918 for i := range w.resuming { 919 go func(ws *watcherStream) { 920 defer wg.Done() 921 if ws.closing { 922 if ws.initReq.ctx.Err() != nil && ws.outc != nil { 923 close(ws.outc) 924 ws.outc = nil 925 } 926 return 927 } 928 select { 929 case <-ws.initReq.ctx.Done(): 930 // closed ws will be removed from resuming 931 ws.closing = true 932 close(ws.outc) 933 ws.outc = nil 934 w.wg.Add(1) 935 go func() { 936 defer w.wg.Done() 937 w.closingc <- ws 938 }() 939 case <-stopc: 940 } 941 }(w.resuming[i]) 942 } 943 go func() { 944 defer close(donec) 945 wg.Wait() 946 }() 947 return donec 948} 949 950// joinSubstreams waits for all substream goroutines to complete. 951func (w *watchGrpcStream) joinSubstreams() { 952 for _, ws := range w.substreams { 953 <-ws.donec 954 } 955 for _, ws := range w.resuming { 956 if ws != nil { 957 <-ws.donec 958 } 959 } 960} 961 962var maxBackoff = 100 * time.Millisecond 963 964// openWatchClient retries opening a watch client until success or halt. 965// manually retry in case "ws==nil && err==nil" 966// TODO: remove FailFast=false 967func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { 968 backoff := time.Millisecond 969 for { 970 select { 971 case <-w.ctx.Done(): 972 if err == nil { 973 return nil, w.ctx.Err() 974 } 975 return nil, err 976 default: 977 } 978 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { 979 break 980 } 981 if isHaltErr(w.ctx, err) { 982 return nil, v3rpc.Error(err) 983 } 984 if isUnavailableErr(w.ctx, err) { 985 // retry, but backoff 986 if backoff < maxBackoff { 987 // 25% backoff factor 988 backoff = backoff + backoff/4 989 if backoff > maxBackoff { 990 backoff = maxBackoff 991 } 992 } 993 time.Sleep(backoff) 994 } 995 } 996 return ws, nil 997} 998 999// toPB converts an internal watch request structure to its protobuf WatchRequest structure. 1000func (wr *watchRequest) toPB() *pb.WatchRequest { 1001 req := &pb.WatchCreateRequest{ 1002 StartRevision: wr.rev, 1003 Key: []byte(wr.key), 1004 RangeEnd: []byte(wr.end), 1005 ProgressNotify: wr.progressNotify, 1006 Filters: wr.filters, 1007 PrevKv: wr.prevKV, 1008 Fragment: wr.fragment, 1009 } 1010 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} 1011 return &pb.WatchRequest{RequestUnion: cr} 1012} 1013 1014// toPB converts an internal progress request structure to its protobuf WatchRequest structure. 1015func (pr *progressRequest) toPB() *pb.WatchRequest { 1016 req := &pb.WatchProgressRequest{} 1017 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} 1018 return &pb.WatchRequest{RequestUnion: cr} 1019} 1020 1021func streamKeyFromCtx(ctx context.Context) string { 1022 if md, ok := metadata.FromOutgoingContext(ctx); ok { 1023 return fmt.Sprintf("%+v", md) 1024 } 1025 return "" 1026} 1027