1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "sync" 22 "time" 23 24 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 25 "go.etcd.io/etcd/api/v3/mvccpb" 26 v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 27 28 "go.uber.org/zap" 29 "google.golang.org/grpc" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/metadata" 32 "google.golang.org/grpc/status" 33) 34 35const ( 36 EventTypeDelete = mvccpb.DELETE 37 EventTypePut = mvccpb.PUT 38 39 closeSendErrTimeout = 250 * time.Millisecond 40) 41 42type Event mvccpb.Event 43 44type WatchChan <-chan WatchResponse 45 46type Watcher interface { 47 // Watch watches on a key or prefix. The watched events will be returned 48 // through the returned channel. If revisions waiting to be sent over the 49 // watch are compacted, then the watch will be canceled by the server, the 50 // client will post a compacted error watch response, and the channel will close. 51 // If the requested revision is 0 or unspecified, the returned channel will 52 // return watch events that happen after the server receives the watch request. 53 // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed, 54 // and "WatchResponse" from this closed channel has zero events and nil "Err()". 55 // The context "ctx" MUST be canceled, as soon as watcher is no longer being used, 56 // to release the associated resources. 57 // 58 // If the context is "context.Background/TODO", returned "WatchChan" will 59 // not be closed and block until event is triggered, except when server 60 // returns a non-recoverable error (e.g. ErrCompacted). 61 // For example, when context passed with "WithRequireLeader" and the 62 // connected server has no leader (e.g. due to network partition), 63 // error "etcdserver: no leader" (ErrNoLeader) will be returned, 64 // and then "WatchChan" is closed with non-nil "Err()". 65 // In order to prevent a watch stream being stuck in a partitioned node, 66 // make sure to wrap context with "WithRequireLeader". 67 // 68 // Otherwise, as long as the context has not been canceled or timed out, 69 // watch will retry on other recoverable errors forever until reconnected. 70 // 71 // TODO: explicitly set context error in the last "WatchResponse" message and close channel? 72 // Currently, client contexts are overwritten with "valCtx" that never closes. 73 // TODO(v3.4): configure watch retry policy, limit maximum retry number 74 // (see https://github.com/etcd-io/etcd/issues/8980) 75 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan 76 77 // RequestProgress requests a progress notify response be sent in all watch channels. 78 RequestProgress(ctx context.Context) error 79 80 // Close closes the watcher and cancels all watch requests. 81 Close() error 82} 83 84type WatchResponse struct { 85 Header pb.ResponseHeader 86 Events []*Event 87 88 // CompactRevision is the minimum revision the watcher may receive. 89 CompactRevision int64 90 91 // Canceled is used to indicate watch failure. 92 // If the watch failed and the stream was about to close, before the channel is closed, 93 // the channel sends a final response that has Canceled set to true with a non-nil Err(). 94 Canceled bool 95 96 // Created is used to indicate the creation of the watcher. 97 Created bool 98 99 closeErr error 100 101 // cancelReason is a reason of canceling watch 102 cancelReason string 103} 104 105// IsCreate returns true if the event tells that the key is newly created. 106func (e *Event) IsCreate() bool { 107 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision 108} 109 110// IsModify returns true if the event tells that a new value is put on existing key. 111func (e *Event) IsModify() bool { 112 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision 113} 114 115// Err is the error value if this WatchResponse holds an error. 116func (wr *WatchResponse) Err() error { 117 switch { 118 case wr.closeErr != nil: 119 return v3rpc.Error(wr.closeErr) 120 case wr.CompactRevision != 0: 121 return v3rpc.ErrCompacted 122 case wr.Canceled: 123 if len(wr.cancelReason) != 0 { 124 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason)) 125 } 126 return v3rpc.ErrFutureRev 127 } 128 return nil 129} 130 131// IsProgressNotify returns true if the WatchResponse is progress notification. 132func (wr *WatchResponse) IsProgressNotify() bool { 133 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0 134} 135 136// watcher implements the Watcher interface 137type watcher struct { 138 remote pb.WatchClient 139 callOpts []grpc.CallOption 140 141 // mu protects the grpc streams map 142 mu sync.Mutex 143 144 // streams holds all the active grpc streams keyed by ctx value. 145 streams map[string]*watchGrpcStream 146 lg *zap.Logger 147} 148 149// watchGrpcStream tracks all watch resources attached to a single grpc stream. 150type watchGrpcStream struct { 151 owner *watcher 152 remote pb.WatchClient 153 callOpts []grpc.CallOption 154 155 // ctx controls internal remote.Watch requests 156 ctx context.Context 157 // ctxKey is the key used when looking up this stream's context 158 ctxKey string 159 cancel context.CancelFunc 160 161 // substreams holds all active watchers on this grpc stream 162 substreams map[int64]*watcherStream 163 // resuming holds all resuming watchers on this grpc stream 164 resuming []*watcherStream 165 166 // reqc sends a watch request from Watch() to the main goroutine 167 reqc chan watchStreamRequest 168 // respc receives data from the watch client 169 respc chan *pb.WatchResponse 170 // donec closes to broadcast shutdown 171 donec chan struct{} 172 // errc transmits errors from grpc Recv to the watch stream reconnect logic 173 errc chan error 174 // closingc gets the watcherStream of closing watchers 175 closingc chan *watcherStream 176 // wg is Done when all substream goroutines have exited 177 wg sync.WaitGroup 178 179 // resumec closes to signal that all substreams should begin resuming 180 resumec chan struct{} 181 // closeErr is the error that closed the watch stream 182 closeErr error 183 184 lg *zap.Logger 185} 186 187// watchStreamRequest is a union of the supported watch request operation types 188type watchStreamRequest interface { 189 toPB() *pb.WatchRequest 190} 191 192// watchRequest is issued by the subscriber to start a new watcher 193type watchRequest struct { 194 ctx context.Context 195 key string 196 end string 197 rev int64 198 199 // send created notification event if this field is true 200 createdNotify bool 201 // progressNotify is for progress updates 202 progressNotify bool 203 // fragmentation should be disabled by default 204 // if true, split watch events when total exceeds 205 // "--max-request-bytes" flag value + 512-byte 206 fragment bool 207 208 // filters is the list of events to filter out 209 filters []pb.WatchCreateRequest_FilterType 210 // get the previous key-value pair before the event happens 211 prevKV bool 212 // retc receives a chan WatchResponse once the watcher is established 213 retc chan chan WatchResponse 214} 215 216// progressRequest is issued by the subscriber to request watch progress 217type progressRequest struct { 218} 219 220// watcherStream represents a registered watcher 221type watcherStream struct { 222 // initReq is the request that initiated this request 223 initReq watchRequest 224 225 // outc publishes watch responses to subscriber 226 outc chan WatchResponse 227 // recvc buffers watch responses before publishing 228 recvc chan *WatchResponse 229 // donec closes when the watcherStream goroutine stops. 230 donec chan struct{} 231 // closing is set to true when stream should be scheduled to shutdown. 232 closing bool 233 // id is the registered watch id on the grpc stream 234 id int64 235 236 // buf holds all events received from etcd but not yet consumed by the client 237 buf []*WatchResponse 238} 239 240func NewWatcher(c *Client) Watcher { 241 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) 242} 243 244func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { 245 w := &watcher{ 246 remote: wc, 247 streams: make(map[string]*watchGrpcStream), 248 } 249 if c != nil { 250 w.callOpts = c.callOpts 251 w.lg = c.lg 252 } 253 return w 254} 255 256// never closes 257var valCtxCh = make(chan struct{}) 258var zeroTime = time.Unix(0, 0) 259 260// ctx with only the values; never Done 261type valCtx struct{ context.Context } 262 263func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } 264func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } 265func (vc *valCtx) Err() error { return nil } 266 267func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { 268 ctx, cancel := context.WithCancel(&valCtx{inctx}) 269 wgs := &watchGrpcStream{ 270 owner: w, 271 remote: w.remote, 272 callOpts: w.callOpts, 273 ctx: ctx, 274 ctxKey: streamKeyFromCtx(inctx), 275 cancel: cancel, 276 substreams: make(map[int64]*watcherStream), 277 respc: make(chan *pb.WatchResponse), 278 reqc: make(chan watchStreamRequest), 279 donec: make(chan struct{}), 280 errc: make(chan error, 1), 281 closingc: make(chan *watcherStream), 282 resumec: make(chan struct{}), 283 lg: w.lg, 284 } 285 go wgs.run() 286 return wgs 287} 288 289// Watch posts a watch request to run() and waits for a new watcher channel 290func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { 291 ow := opWatch(key, opts...) 292 293 var filters []pb.WatchCreateRequest_FilterType 294 if ow.filterPut { 295 filters = append(filters, pb.WatchCreateRequest_NOPUT) 296 } 297 if ow.filterDelete { 298 filters = append(filters, pb.WatchCreateRequest_NODELETE) 299 } 300 301 wr := &watchRequest{ 302 ctx: ctx, 303 createdNotify: ow.createdNotify, 304 key: string(ow.key), 305 end: string(ow.end), 306 rev: ow.rev, 307 progressNotify: ow.progressNotify, 308 fragment: ow.fragment, 309 filters: filters, 310 prevKV: ow.prevKV, 311 retc: make(chan chan WatchResponse, 1), 312 } 313 314 ok := false 315 ctxKey := streamKeyFromCtx(ctx) 316 317 var closeCh chan WatchResponse 318 for { 319 // find or allocate appropriate grpc watch stream 320 w.mu.Lock() 321 if w.streams == nil { 322 // closed 323 w.mu.Unlock() 324 ch := make(chan WatchResponse) 325 close(ch) 326 return ch 327 } 328 wgs := w.streams[ctxKey] 329 if wgs == nil { 330 wgs = w.newWatcherGrpcStream(ctx) 331 w.streams[ctxKey] = wgs 332 } 333 donec := wgs.donec 334 reqc := wgs.reqc 335 w.mu.Unlock() 336 337 // couldn't create channel; return closed channel 338 if closeCh == nil { 339 closeCh = make(chan WatchResponse, 1) 340 } 341 342 // submit request 343 select { 344 case reqc <- wr: 345 ok = true 346 case <-wr.ctx.Done(): 347 ok = false 348 case <-donec: 349 ok = false 350 if wgs.closeErr != nil { 351 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 352 break 353 } 354 // retry; may have dropped stream from no ctxs 355 continue 356 } 357 358 // receive channel 359 if ok { 360 select { 361 case ret := <-wr.retc: 362 return ret 363 case <-ctx.Done(): 364 case <-donec: 365 if wgs.closeErr != nil { 366 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} 367 break 368 } 369 // retry; may have dropped stream from no ctxs 370 continue 371 } 372 } 373 break 374 } 375 376 close(closeCh) 377 return closeCh 378} 379 380func (w *watcher) Close() (err error) { 381 w.mu.Lock() 382 streams := w.streams 383 w.streams = nil 384 w.mu.Unlock() 385 for _, wgs := range streams { 386 if werr := wgs.close(); werr != nil { 387 err = werr 388 } 389 } 390 // Consider context.Canceled as a successful close 391 if err == context.Canceled { 392 err = nil 393 } 394 return err 395} 396 397// RequestProgress requests a progress notify response be sent in all watch channels. 398func (w *watcher) RequestProgress(ctx context.Context) (err error) { 399 ctxKey := streamKeyFromCtx(ctx) 400 401 w.mu.Lock() 402 if w.streams == nil { 403 w.mu.Unlock() 404 return fmt.Errorf("no stream found for context") 405 } 406 wgs := w.streams[ctxKey] 407 if wgs == nil { 408 wgs = w.newWatcherGrpcStream(ctx) 409 w.streams[ctxKey] = wgs 410 } 411 donec := wgs.donec 412 reqc := wgs.reqc 413 w.mu.Unlock() 414 415 pr := &progressRequest{} 416 417 select { 418 case reqc <- pr: 419 return nil 420 case <-ctx.Done(): 421 return ctx.Err() 422 case <-donec: 423 if wgs.closeErr != nil { 424 return wgs.closeErr 425 } 426 // retry; may have dropped stream from no ctxs 427 return w.RequestProgress(ctx) 428 } 429} 430 431func (w *watchGrpcStream) close() (err error) { 432 w.cancel() 433 <-w.donec 434 select { 435 case err = <-w.errc: 436 default: 437 } 438 return toErr(w.ctx, err) 439} 440 441func (w *watcher) closeStream(wgs *watchGrpcStream) { 442 w.mu.Lock() 443 close(wgs.donec) 444 wgs.cancel() 445 if w.streams != nil { 446 delete(w.streams, wgs.ctxKey) 447 } 448 w.mu.Unlock() 449} 450 451func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { 452 // check watch ID for backward compatibility (<= v3.3) 453 if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { 454 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) 455 // failed; no channel 456 close(ws.recvc) 457 return 458 } 459 ws.id = resp.WatchId 460 w.substreams[ws.id] = ws 461} 462 463func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) { 464 select { 465 case ws.outc <- *resp: 466 case <-ws.initReq.ctx.Done(): 467 case <-time.After(closeSendErrTimeout): 468 } 469 close(ws.outc) 470} 471 472func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { 473 // send channel response in case stream was never established 474 select { 475 case ws.initReq.retc <- ws.outc: 476 default: 477 } 478 // close subscriber's channel 479 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { 480 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr}) 481 } else if ws.outc != nil { 482 close(ws.outc) 483 } 484 if ws.id != -1 { 485 delete(w.substreams, ws.id) 486 return 487 } 488 for i := range w.resuming { 489 if w.resuming[i] == ws { 490 w.resuming[i] = nil 491 return 492 } 493 } 494} 495 496// run is the root of the goroutines for managing a watcher client 497func (w *watchGrpcStream) run() { 498 var wc pb.Watch_WatchClient 499 var closeErr error 500 501 // substreams marked to close but goroutine still running; needed for 502 // avoiding double-closing recvc on grpc stream teardown 503 closing := make(map[*watcherStream]struct{}) 504 505 defer func() { 506 w.closeErr = closeErr 507 // shutdown substreams and resuming substreams 508 for _, ws := range w.substreams { 509 if _, ok := closing[ws]; !ok { 510 close(ws.recvc) 511 closing[ws] = struct{}{} 512 } 513 } 514 for _, ws := range w.resuming { 515 if _, ok := closing[ws]; ws != nil && !ok { 516 close(ws.recvc) 517 closing[ws] = struct{}{} 518 } 519 } 520 w.joinSubstreams() 521 for range closing { 522 w.closeSubstream(<-w.closingc) 523 } 524 w.wg.Wait() 525 w.owner.closeStream(w) 526 }() 527 528 // start a stream with the etcd grpc server 529 if wc, closeErr = w.newWatchClient(); closeErr != nil { 530 return 531 } 532 533 cancelSet := make(map[int64]struct{}) 534 535 var cur *pb.WatchResponse 536 for { 537 select { 538 // Watch() requested 539 case req := <-w.reqc: 540 switch wreq := req.(type) { 541 case *watchRequest: 542 outc := make(chan WatchResponse, 1) 543 // TODO: pass custom watch ID? 544 ws := &watcherStream{ 545 initReq: *wreq, 546 id: -1, 547 outc: outc, 548 // unbuffered so resumes won't cause repeat events 549 recvc: make(chan *WatchResponse), 550 } 551 552 ws.donec = make(chan struct{}) 553 w.wg.Add(1) 554 go w.serveSubstream(ws, w.resumec) 555 556 // queue up for watcher creation/resume 557 w.resuming = append(w.resuming, ws) 558 if len(w.resuming) == 1 { 559 // head of resume queue, can register a new watcher 560 if err := wc.Send(ws.initReq.toPB()); err != nil { 561 w.lg.Debug("error when sending request", zap.Error(err)) 562 } 563 } 564 case *progressRequest: 565 if err := wc.Send(wreq.toPB()); err != nil { 566 w.lg.Debug("error when sending request", zap.Error(err)) 567 } 568 } 569 570 // new events from the watch client 571 case pbresp := <-w.respc: 572 if cur == nil || pbresp.Created || pbresp.Canceled { 573 cur = pbresp 574 } else if cur != nil && cur.WatchId == pbresp.WatchId { 575 // merge new events 576 cur.Events = append(cur.Events, pbresp.Events...) 577 // update "Fragment" field; last response with "Fragment" == false 578 cur.Fragment = pbresp.Fragment 579 } 580 581 switch { 582 case pbresp.Created: 583 // response to head of queue creation 584 if len(w.resuming) != 0 { 585 if ws := w.resuming[0]; ws != nil { 586 w.addSubstream(pbresp, ws) 587 w.dispatchEvent(pbresp) 588 w.resuming[0] = nil 589 } 590 } 591 592 if ws := w.nextResume(); ws != nil { 593 if err := wc.Send(ws.initReq.toPB()); err != nil { 594 w.lg.Debug("error when sending request", zap.Error(err)) 595 } 596 } 597 598 // reset for next iteration 599 cur = nil 600 601 case pbresp.Canceled && pbresp.CompactRevision == 0: 602 delete(cancelSet, pbresp.WatchId) 603 if ws, ok := w.substreams[pbresp.WatchId]; ok { 604 // signal to stream goroutine to update closingc 605 close(ws.recvc) 606 closing[ws] = struct{}{} 607 } 608 609 // reset for next iteration 610 cur = nil 611 612 case cur.Fragment: 613 // watch response events are still fragmented 614 // continue to fetch next fragmented event arrival 615 continue 616 617 default: 618 // dispatch to appropriate watch stream 619 ok := w.dispatchEvent(cur) 620 621 // reset for next iteration 622 cur = nil 623 624 if ok { 625 break 626 } 627 628 // watch response on unexpected watch id; cancel id 629 if _, ok := cancelSet[pbresp.WatchId]; ok { 630 break 631 } 632 633 cancelSet[pbresp.WatchId] = struct{}{} 634 cr := &pb.WatchRequest_CancelRequest{ 635 CancelRequest: &pb.WatchCancelRequest{ 636 WatchId: pbresp.WatchId, 637 }, 638 } 639 req := &pb.WatchRequest{RequestUnion: cr} 640 w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId)) 641 if err := wc.Send(req); err != nil { 642 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err)) 643 } 644 } 645 646 // watch client failed on Recv; spawn another if possible 647 case err := <-w.errc: 648 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader { 649 closeErr = err 650 return 651 } 652 if wc, closeErr = w.newWatchClient(); closeErr != nil { 653 return 654 } 655 if ws := w.nextResume(); ws != nil { 656 if err := wc.Send(ws.initReq.toPB()); err != nil { 657 w.lg.Debug("error when sending request", zap.Error(err)) 658 } 659 } 660 cancelSet = make(map[int64]struct{}) 661 662 case <-w.ctx.Done(): 663 return 664 665 case ws := <-w.closingc: 666 w.closeSubstream(ws) 667 delete(closing, ws) 668 // no more watchers on this stream, shutdown, skip cancellation 669 if len(w.substreams)+len(w.resuming) == 0 { 670 return 671 } 672 if ws.id != -1 { 673 // client is closing an established watch; close it on the server proactively instead of waiting 674 // to close when the next message arrives 675 cancelSet[ws.id] = struct{}{} 676 cr := &pb.WatchRequest_CancelRequest{ 677 CancelRequest: &pb.WatchCancelRequest{ 678 WatchId: ws.id, 679 }, 680 } 681 req := &pb.WatchRequest{RequestUnion: cr} 682 w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id)) 683 if err := wc.Send(req); err != nil { 684 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) 685 } 686 } 687 } 688 } 689} 690 691// nextResume chooses the next resuming to register with the grpc stream. Abandoned 692// streams are marked as nil in the queue since the head must wait for its inflight registration. 693func (w *watchGrpcStream) nextResume() *watcherStream { 694 for len(w.resuming) != 0 { 695 if w.resuming[0] != nil { 696 return w.resuming[0] 697 } 698 w.resuming = w.resuming[1:len(w.resuming)] 699 } 700 return nil 701} 702 703// dispatchEvent sends a WatchResponse to the appropriate watcher stream 704func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { 705 events := make([]*Event, len(pbresp.Events)) 706 for i, ev := range pbresp.Events { 707 events[i] = (*Event)(ev) 708 } 709 // TODO: return watch ID? 710 wr := &WatchResponse{ 711 Header: *pbresp.Header, 712 Events: events, 713 CompactRevision: pbresp.CompactRevision, 714 Created: pbresp.Created, 715 Canceled: pbresp.Canceled, 716 cancelReason: pbresp.CancelReason, 717 } 718 719 // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to 720 // indicate they should be broadcast. 721 if wr.IsProgressNotify() && pbresp.WatchId == -1 { 722 return w.broadcastResponse(wr) 723 } 724 725 return w.unicastResponse(wr, pbresp.WatchId) 726 727} 728 729// broadcastResponse send a watch response to all watch substreams. 730func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool { 731 for _, ws := range w.substreams { 732 select { 733 case ws.recvc <- wr: 734 case <-ws.donec: 735 } 736 } 737 return true 738} 739 740// unicastResponse sends a watch response to a specific watch substream. 741func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool { 742 ws, ok := w.substreams[watchId] 743 if !ok { 744 return false 745 } 746 select { 747 case ws.recvc <- wr: 748 case <-ws.donec: 749 return false 750 } 751 return true 752} 753 754// serveWatchClient forwards messages from the grpc stream to run() 755func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) { 756 for { 757 resp, err := wc.Recv() 758 if err != nil { 759 select { 760 case w.errc <- err: 761 case <-w.donec: 762 } 763 return 764 } 765 select { 766 case w.respc <- resp: 767 case <-w.donec: 768 return 769 } 770 } 771} 772 773// serveSubstream forwards watch responses from run() to the subscriber 774func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) { 775 if ws.closing { 776 panic("created substream goroutine but substream is closing") 777 } 778 779 // nextRev is the minimum expected next revision 780 nextRev := ws.initReq.rev 781 resuming := false 782 defer func() { 783 if !resuming { 784 ws.closing = true 785 } 786 close(ws.donec) 787 if !resuming { 788 w.closingc <- ws 789 } 790 w.wg.Done() 791 }() 792 793 emptyWr := &WatchResponse{} 794 for { 795 curWr := emptyWr 796 outc := ws.outc 797 798 if len(ws.buf) > 0 { 799 curWr = ws.buf[0] 800 } else { 801 outc = nil 802 } 803 select { 804 case outc <- *curWr: 805 if ws.buf[0].Err() != nil { 806 return 807 } 808 ws.buf[0] = nil 809 ws.buf = ws.buf[1:] 810 case wr, ok := <-ws.recvc: 811 if !ok { 812 // shutdown from closeSubstream 813 return 814 } 815 816 if wr.Created { 817 if ws.initReq.retc != nil { 818 ws.initReq.retc <- ws.outc 819 // to prevent next write from taking the slot in buffered channel 820 // and posting duplicate create events 821 ws.initReq.retc = nil 822 823 // send first creation event only if requested 824 if ws.initReq.createdNotify { 825 ws.outc <- *wr 826 } 827 // once the watch channel is returned, a current revision 828 // watch must resume at the store revision. This is necessary 829 // for the following case to work as expected: 830 // wch := m1.Watch("a") 831 // m2.Put("a", "b") 832 // <-wch 833 // If the revision is only bound on the first observed event, 834 // if wch is disconnected before the Put is issued, then reconnects 835 // after it is committed, it'll miss the Put. 836 if ws.initReq.rev == 0 { 837 nextRev = wr.Header.Revision 838 } 839 } 840 } else { 841 // current progress of watch; <= store revision 842 nextRev = wr.Header.Revision 843 } 844 845 if len(wr.Events) > 0 { 846 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1 847 } 848 ws.initReq.rev = nextRev 849 850 // created event is already sent above, 851 // watcher should not post duplicate events 852 if wr.Created { 853 continue 854 } 855 856 // TODO pause channel if buffer gets too large 857 ws.buf = append(ws.buf, wr) 858 case <-w.ctx.Done(): 859 return 860 case <-ws.initReq.ctx.Done(): 861 return 862 case <-resumec: 863 resuming = true 864 return 865 } 866 } 867 // lazily send cancel message if events on missing id 868} 869 870func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { 871 // mark all substreams as resuming 872 close(w.resumec) 873 w.resumec = make(chan struct{}) 874 w.joinSubstreams() 875 for _, ws := range w.substreams { 876 ws.id = -1 877 w.resuming = append(w.resuming, ws) 878 } 879 // strip out nils, if any 880 var resuming []*watcherStream 881 for _, ws := range w.resuming { 882 if ws != nil { 883 resuming = append(resuming, ws) 884 } 885 } 886 w.resuming = resuming 887 w.substreams = make(map[int64]*watcherStream) 888 889 // connect to grpc stream while accepting watcher cancelation 890 stopc := make(chan struct{}) 891 donec := w.waitCancelSubstreams(stopc) 892 wc, err := w.openWatchClient() 893 close(stopc) 894 <-donec 895 896 // serve all non-closing streams, even if there's a client error 897 // so that the teardown path can shutdown the streams as expected. 898 for _, ws := range w.resuming { 899 if ws.closing { 900 continue 901 } 902 ws.donec = make(chan struct{}) 903 w.wg.Add(1) 904 go w.serveSubstream(ws, w.resumec) 905 } 906 907 if err != nil { 908 return nil, v3rpc.Error(err) 909 } 910 911 // receive data from new grpc stream 912 go w.serveWatchClient(wc) 913 return wc, nil 914} 915 916func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} { 917 var wg sync.WaitGroup 918 wg.Add(len(w.resuming)) 919 donec := make(chan struct{}) 920 for i := range w.resuming { 921 go func(ws *watcherStream) { 922 defer wg.Done() 923 if ws.closing { 924 if ws.initReq.ctx.Err() != nil && ws.outc != nil { 925 close(ws.outc) 926 ws.outc = nil 927 } 928 return 929 } 930 select { 931 case <-ws.initReq.ctx.Done(): 932 // closed ws will be removed from resuming 933 ws.closing = true 934 close(ws.outc) 935 ws.outc = nil 936 w.wg.Add(1) 937 go func() { 938 defer w.wg.Done() 939 w.closingc <- ws 940 }() 941 case <-stopc: 942 } 943 }(w.resuming[i]) 944 } 945 go func() { 946 defer close(donec) 947 wg.Wait() 948 }() 949 return donec 950} 951 952// joinSubstreams waits for all substream goroutines to complete. 953func (w *watchGrpcStream) joinSubstreams() { 954 for _, ws := range w.substreams { 955 <-ws.donec 956 } 957 for _, ws := range w.resuming { 958 if ws != nil { 959 <-ws.donec 960 } 961 } 962} 963 964var maxBackoff = 100 * time.Millisecond 965 966// openWatchClient retries opening a watch client until success or halt. 967// manually retry in case "ws==nil && err==nil" 968// TODO: remove FailFast=false 969func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { 970 backoff := time.Millisecond 971 for { 972 select { 973 case <-w.ctx.Done(): 974 if err == nil { 975 return nil, w.ctx.Err() 976 } 977 return nil, err 978 default: 979 } 980 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { 981 break 982 } 983 if isHaltErr(w.ctx, err) { 984 return nil, v3rpc.Error(err) 985 } 986 if isUnavailableErr(w.ctx, err) { 987 // retry, but backoff 988 if backoff < maxBackoff { 989 // 25% backoff factor 990 backoff = backoff + backoff/4 991 if backoff > maxBackoff { 992 backoff = maxBackoff 993 } 994 } 995 time.Sleep(backoff) 996 } 997 } 998 return ws, nil 999} 1000 1001// toPB converts an internal watch request structure to its protobuf WatchRequest structure. 1002func (wr *watchRequest) toPB() *pb.WatchRequest { 1003 req := &pb.WatchCreateRequest{ 1004 StartRevision: wr.rev, 1005 Key: []byte(wr.key), 1006 RangeEnd: []byte(wr.end), 1007 ProgressNotify: wr.progressNotify, 1008 Filters: wr.filters, 1009 PrevKv: wr.prevKV, 1010 Fragment: wr.fragment, 1011 } 1012 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req} 1013 return &pb.WatchRequest{RequestUnion: cr} 1014} 1015 1016// toPB converts an internal progress request structure to its protobuf WatchRequest structure. 1017func (pr *progressRequest) toPB() *pb.WatchRequest { 1018 req := &pb.WatchProgressRequest{} 1019 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req} 1020 return &pb.WatchRequest{RequestUnion: cr} 1021} 1022 1023func streamKeyFromCtx(ctx context.Context) string { 1024 if md, ok := metadata.FromOutgoingContext(ctx); ok { 1025 return fmt.Sprintf("%+v", md) 1026 } 1027 return "" 1028} 1029