1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"sync"
22	"time"
23
24	v3rpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
25	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
26	mvccpb "github.com/coreos/etcd/mvcc/mvccpb"
27
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/metadata"
31	"google.golang.org/grpc/status"
32)
33
34const (
35	EventTypeDelete = mvccpb.DELETE
36	EventTypePut    = mvccpb.PUT
37
38	closeSendErrTimeout = 250 * time.Millisecond
39)
40
41type Event mvccpb.Event
42
43type WatchChan <-chan WatchResponse
44
45type Watcher interface {
46	// Watch watches on a key or prefix. The watched events will be returned
47	// through the returned channel. If revisions waiting to be sent over the
48	// watch are compacted, then the watch will be canceled by the server, the
49	// client will post a compacted error watch response, and the channel will close.
50	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
51	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
52	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
53	// to release the associated resources.
54	//
55	// If the context is "context.Background/TODO", returned "WatchChan" will
56	// not be closed and block until event is triggered, except when server
57	// returns a non-recoverable error (e.g. ErrCompacted).
58	// For example, when context passed with "WithRequireLeader" and the
59	// connected server has no leader (e.g. due to network partition),
60	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
61	// and then "WatchChan" is closed with non-nil "Err()".
62	// In order to prevent a watch stream being stuck in a partitioned node,
63	// make sure to wrap context with "WithRequireLeader".
64	//
65	// Otherwise, as long as the context has not been canceled or timed out,
66	// watch will retry on other recoverable errors forever until reconnected.
67	//
68	// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
69	// Currently, client contexts are overwritten with "valCtx" that never closes.
70	// TODO(v3.4): configure watch retry policy, limit maximum retry number
71	// (see https://github.com/etcd-io/etcd/issues/8980)
72	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
73
74	// RequestProgress requests a progress notify response be sent in all watch channels.
75	RequestProgress(ctx context.Context) error
76
77	// Close closes the watcher and cancels all watch requests.
78	Close() error
79}
80
81type WatchResponse struct {
82	Header pb.ResponseHeader
83	Events []*Event
84
85	// CompactRevision is the minimum revision the watcher may receive.
86	CompactRevision int64
87
88	// Canceled is used to indicate watch failure.
89	// If the watch failed and the stream was about to close, before the channel is closed,
90	// the channel sends a final response that has Canceled set to true with a non-nil Err().
91	Canceled bool
92
93	// Created is used to indicate the creation of the watcher.
94	Created bool
95
96	closeErr error
97
98	// cancelReason is a reason of canceling watch
99	cancelReason string
100}
101
102// IsCreate returns true if the event tells that the key is newly created.
103func (e *Event) IsCreate() bool {
104	return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
105}
106
107// IsModify returns true if the event tells that a new value is put on existing key.
108func (e *Event) IsModify() bool {
109	return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
110}
111
112// Err is the error value if this WatchResponse holds an error.
113func (wr *WatchResponse) Err() error {
114	switch {
115	case wr.closeErr != nil:
116		return v3rpc.Error(wr.closeErr)
117	case wr.CompactRevision != 0:
118		return v3rpc.ErrCompacted
119	case wr.Canceled:
120		if len(wr.cancelReason) != 0 {
121			return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
122		}
123		return v3rpc.ErrFutureRev
124	}
125	return nil
126}
127
128// IsProgressNotify returns true if the WatchResponse is progress notification.
129func (wr *WatchResponse) IsProgressNotify() bool {
130	return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
131}
132
133// watcher implements the Watcher interface
134type watcher struct {
135	remote   pb.WatchClient
136	callOpts []grpc.CallOption
137
138	// mu protects the grpc streams map
139	mu sync.RWMutex
140
141	// streams holds all the active grpc streams keyed by ctx value.
142	streams map[string]*watchGrpcStream
143}
144
145// watchGrpcStream tracks all watch resources attached to a single grpc stream.
146type watchGrpcStream struct {
147	owner    *watcher
148	remote   pb.WatchClient
149	callOpts []grpc.CallOption
150
151	// ctx controls internal remote.Watch requests
152	ctx context.Context
153	// ctxKey is the key used when looking up this stream's context
154	ctxKey string
155	cancel context.CancelFunc
156
157	// substreams holds all active watchers on this grpc stream
158	substreams map[int64]*watcherStream
159	// resuming holds all resuming watchers on this grpc stream
160	resuming []*watcherStream
161
162	// reqc sends a watch request from Watch() to the main goroutine
163	reqc chan watchStreamRequest
164	// respc receives data from the watch client
165	respc chan *pb.WatchResponse
166	// donec closes to broadcast shutdown
167	donec chan struct{}
168	// errc transmits errors from grpc Recv to the watch stream reconnect logic
169	errc chan error
170	// closingc gets the watcherStream of closing watchers
171	closingc chan *watcherStream
172	// wg is Done when all substream goroutines have exited
173	wg sync.WaitGroup
174
175	// resumec closes to signal that all substreams should begin resuming
176	resumec chan struct{}
177	// closeErr is the error that closed the watch stream
178	closeErr error
179}
180
181// watchStreamRequest is a union of the supported watch request operation types
182type watchStreamRequest interface {
183	toPB() *pb.WatchRequest
184}
185
186// watchRequest is issued by the subscriber to start a new watcher
187type watchRequest struct {
188	ctx context.Context
189	key string
190	end string
191	rev int64
192
193	// send created notification event if this field is true
194	createdNotify bool
195	// progressNotify is for progress updates
196	progressNotify bool
197	// fragmentation should be disabled by default
198	// if true, split watch events when total exceeds
199	// "--max-request-bytes" flag value + 512-byte
200	fragment bool
201
202	// filters is the list of events to filter out
203	filters []pb.WatchCreateRequest_FilterType
204	// get the previous key-value pair before the event happens
205	prevKV bool
206	// retc receives a chan WatchResponse once the watcher is established
207	retc chan chan WatchResponse
208}
209
210// progressRequest is issued by the subscriber to request watch progress
211type progressRequest struct {
212}
213
214// watcherStream represents a registered watcher
215type watcherStream struct {
216	// initReq is the request that initiated this request
217	initReq watchRequest
218
219	// outc publishes watch responses to subscriber
220	outc chan WatchResponse
221	// recvc buffers watch responses before publishing
222	recvc chan *WatchResponse
223	// donec closes when the watcherStream goroutine stops.
224	donec chan struct{}
225	// closing is set to true when stream should be scheduled to shutdown.
226	closing bool
227	// id is the registered watch id on the grpc stream
228	id int64
229
230	// buf holds all events received from etcd but not yet consumed by the client
231	buf []*WatchResponse
232}
233
234func NewWatcher(c *Client) Watcher {
235	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
236}
237
238func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
239	w := &watcher{
240		remote:  wc,
241		streams: make(map[string]*watchGrpcStream),
242	}
243	if c != nil {
244		w.callOpts = c.callOpts
245	}
246	return w
247}
248
249// never closes
250var valCtxCh = make(chan struct{})
251var zeroTime = time.Unix(0, 0)
252
253// ctx with only the values; never Done
254type valCtx struct{ context.Context }
255
256func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
257func (vc *valCtx) Done() <-chan struct{}       { return valCtxCh }
258func (vc *valCtx) Err() error                  { return nil }
259
260func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
261	ctx, cancel := context.WithCancel(&valCtx{inctx})
262	wgs := &watchGrpcStream{
263		owner:      w,
264		remote:     w.remote,
265		callOpts:   w.callOpts,
266		ctx:        ctx,
267		ctxKey:     streamKeyFromCtx(inctx),
268		cancel:     cancel,
269		substreams: make(map[int64]*watcherStream),
270		respc:      make(chan *pb.WatchResponse),
271		reqc:       make(chan watchStreamRequest),
272		donec:      make(chan struct{}),
273		errc:       make(chan error, 1),
274		closingc:   make(chan *watcherStream),
275		resumec:    make(chan struct{}),
276	}
277	go wgs.run()
278	return wgs
279}
280
281// Watch posts a watch request to run() and waits for a new watcher channel
282func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
283	ow := opWatch(key, opts...)
284
285	var filters []pb.WatchCreateRequest_FilterType
286	if ow.filterPut {
287		filters = append(filters, pb.WatchCreateRequest_NOPUT)
288	}
289	if ow.filterDelete {
290		filters = append(filters, pb.WatchCreateRequest_NODELETE)
291	}
292
293	wr := &watchRequest{
294		ctx:            ctx,
295		createdNotify:  ow.createdNotify,
296		key:            string(ow.key),
297		end:            string(ow.end),
298		rev:            ow.rev,
299		progressNotify: ow.progressNotify,
300		fragment:       ow.fragment,
301		filters:        filters,
302		prevKV:         ow.prevKV,
303		retc:           make(chan chan WatchResponse, 1),
304	}
305
306	ok := false
307	ctxKey := streamKeyFromCtx(ctx)
308
309	// find or allocate appropriate grpc watch stream
310	w.mu.Lock()
311	if w.streams == nil {
312		// closed
313		w.mu.Unlock()
314		ch := make(chan WatchResponse)
315		close(ch)
316		return ch
317	}
318	wgs := w.streams[ctxKey]
319	if wgs == nil {
320		wgs = w.newWatcherGrpcStream(ctx)
321		w.streams[ctxKey] = wgs
322	}
323	donec := wgs.donec
324	reqc := wgs.reqc
325	w.mu.Unlock()
326
327	// couldn't create channel; return closed channel
328	closeCh := make(chan WatchResponse, 1)
329
330	// submit request
331	select {
332	case reqc <- wr:
333		ok = true
334	case <-wr.ctx.Done():
335	case <-donec:
336		if wgs.closeErr != nil {
337			closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
338			break
339		}
340		// retry; may have dropped stream from no ctxs
341		return w.Watch(ctx, key, opts...)
342	}
343
344	// receive channel
345	if ok {
346		select {
347		case ret := <-wr.retc:
348			return ret
349		case <-ctx.Done():
350		case <-donec:
351			if wgs.closeErr != nil {
352				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
353				break
354			}
355			// retry; may have dropped stream from no ctxs
356			return w.Watch(ctx, key, opts...)
357		}
358	}
359
360	close(closeCh)
361	return closeCh
362}
363
364func (w *watcher) Close() (err error) {
365	w.mu.Lock()
366	streams := w.streams
367	w.streams = nil
368	w.mu.Unlock()
369	for _, wgs := range streams {
370		if werr := wgs.close(); werr != nil {
371			err = werr
372		}
373	}
374	// Consider context.Canceled as a successful close
375	if err == context.Canceled {
376		err = nil
377	}
378	return err
379}
380
381// RequestProgress requests a progress notify response be sent in all watch channels.
382func (w *watcher) RequestProgress(ctx context.Context) (err error) {
383	ctxKey := streamKeyFromCtx(ctx)
384
385	w.mu.Lock()
386	if w.streams == nil {
387		w.mu.Unlock()
388		return fmt.Errorf("no stream found for context")
389	}
390	wgs := w.streams[ctxKey]
391	if wgs == nil {
392		wgs = w.newWatcherGrpcStream(ctx)
393		w.streams[ctxKey] = wgs
394	}
395	donec := wgs.donec
396	reqc := wgs.reqc
397	w.mu.Unlock()
398
399	pr := &progressRequest{}
400
401	select {
402	case reqc <- pr:
403		return nil
404	case <-ctx.Done():
405		if err == nil {
406			return ctx.Err()
407		}
408		return err
409	case <-donec:
410		if wgs.closeErr != nil {
411			return wgs.closeErr
412		}
413		// retry; may have dropped stream from no ctxs
414		return w.RequestProgress(ctx)
415	}
416}
417
418func (w *watchGrpcStream) close() (err error) {
419	w.cancel()
420	<-w.donec
421	select {
422	case err = <-w.errc:
423	default:
424	}
425	return toErr(w.ctx, err)
426}
427
428func (w *watcher) closeStream(wgs *watchGrpcStream) {
429	w.mu.Lock()
430	close(wgs.donec)
431	wgs.cancel()
432	if w.streams != nil {
433		delete(w.streams, wgs.ctxKey)
434	}
435	w.mu.Unlock()
436}
437
438func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
439	// check watch ID for backward compatibility (<= v3.3)
440	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
441		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
442		// failed; no channel
443		close(ws.recvc)
444		return
445	}
446	ws.id = resp.WatchId
447	w.substreams[ws.id] = ws
448}
449
450func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
451	select {
452	case ws.outc <- *resp:
453	case <-ws.initReq.ctx.Done():
454	case <-time.After(closeSendErrTimeout):
455	}
456	close(ws.outc)
457}
458
459func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
460	// send channel response in case stream was never established
461	select {
462	case ws.initReq.retc <- ws.outc:
463	default:
464	}
465	// close subscriber's channel
466	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
467		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
468	} else if ws.outc != nil {
469		close(ws.outc)
470	}
471	if ws.id != -1 {
472		delete(w.substreams, ws.id)
473		return
474	}
475	for i := range w.resuming {
476		if w.resuming[i] == ws {
477			w.resuming[i] = nil
478			return
479		}
480	}
481}
482
483// run is the root of the goroutines for managing a watcher client
484func (w *watchGrpcStream) run() {
485	var wc pb.Watch_WatchClient
486	var closeErr error
487
488	// substreams marked to close but goroutine still running; needed for
489	// avoiding double-closing recvc on grpc stream teardown
490	closing := make(map[*watcherStream]struct{})
491
492	defer func() {
493		w.closeErr = closeErr
494		// shutdown substreams and resuming substreams
495		for _, ws := range w.substreams {
496			if _, ok := closing[ws]; !ok {
497				close(ws.recvc)
498				closing[ws] = struct{}{}
499			}
500		}
501		for _, ws := range w.resuming {
502			if _, ok := closing[ws]; ws != nil && !ok {
503				close(ws.recvc)
504				closing[ws] = struct{}{}
505			}
506		}
507		w.joinSubstreams()
508		for range closing {
509			w.closeSubstream(<-w.closingc)
510		}
511		w.wg.Wait()
512		w.owner.closeStream(w)
513	}()
514
515	// start a stream with the etcd grpc server
516	if wc, closeErr = w.newWatchClient(); closeErr != nil {
517		return
518	}
519
520	cancelSet := make(map[int64]struct{})
521
522	var cur *pb.WatchResponse
523	for {
524		select {
525		// Watch() requested
526		case req := <-w.reqc:
527			switch wreq := req.(type) {
528			case *watchRequest:
529				outc := make(chan WatchResponse, 1)
530				// TODO: pass custom watch ID?
531				ws := &watcherStream{
532					initReq: *wreq,
533					id:      -1,
534					outc:    outc,
535					// unbuffered so resumes won't cause repeat events
536					recvc: make(chan *WatchResponse),
537				}
538
539				ws.donec = make(chan struct{})
540				w.wg.Add(1)
541				go w.serveSubstream(ws, w.resumec)
542
543				// queue up for watcher creation/resume
544				w.resuming = append(w.resuming, ws)
545				if len(w.resuming) == 1 {
546					// head of resume queue, can register a new watcher
547					wc.Send(ws.initReq.toPB())
548				}
549			case *progressRequest:
550				wc.Send(wreq.toPB())
551			}
552
553		// new events from the watch client
554		case pbresp := <-w.respc:
555			if cur == nil || pbresp.Created || pbresp.Canceled {
556				cur = pbresp
557			} else if cur != nil && cur.WatchId == pbresp.WatchId {
558				// merge new events
559				cur.Events = append(cur.Events, pbresp.Events...)
560				// update "Fragment" field; last response with "Fragment" == false
561				cur.Fragment = pbresp.Fragment
562			}
563
564			switch {
565			case pbresp.Created:
566				// response to head of queue creation
567				if ws := w.resuming[0]; ws != nil {
568					w.addSubstream(pbresp, ws)
569					w.dispatchEvent(pbresp)
570					w.resuming[0] = nil
571				}
572
573				if ws := w.nextResume(); ws != nil {
574					wc.Send(ws.initReq.toPB())
575				}
576
577				// reset for next iteration
578				cur = nil
579
580			case pbresp.Canceled && pbresp.CompactRevision == 0:
581				delete(cancelSet, pbresp.WatchId)
582				if ws, ok := w.substreams[pbresp.WatchId]; ok {
583					// signal to stream goroutine to update closingc
584					close(ws.recvc)
585					closing[ws] = struct{}{}
586				}
587
588				// reset for next iteration
589				cur = nil
590
591			case cur.Fragment:
592				// watch response events are still fragmented
593				// continue to fetch next fragmented event arrival
594				continue
595
596			default:
597				// dispatch to appropriate watch stream
598				ok := w.dispatchEvent(cur)
599
600				// reset for next iteration
601				cur = nil
602
603				if ok {
604					break
605				}
606
607				// watch response on unexpected watch id; cancel id
608				if _, ok := cancelSet[pbresp.WatchId]; ok {
609					break
610				}
611
612				cancelSet[pbresp.WatchId] = struct{}{}
613				cr := &pb.WatchRequest_CancelRequest{
614					CancelRequest: &pb.WatchCancelRequest{
615						WatchId: pbresp.WatchId,
616					},
617				}
618				req := &pb.WatchRequest{RequestUnion: cr}
619				wc.Send(req)
620			}
621
622		// watch client failed on Recv; spawn another if possible
623		case err := <-w.errc:
624			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
625				closeErr = err
626				return
627			}
628			if wc, closeErr = w.newWatchClient(); closeErr != nil {
629				return
630			}
631			if ws := w.nextResume(); ws != nil {
632				wc.Send(ws.initReq.toPB())
633			}
634			cancelSet = make(map[int64]struct{})
635
636		case <-w.ctx.Done():
637			return
638
639		case ws := <-w.closingc:
640			w.closeSubstream(ws)
641			delete(closing, ws)
642			// no more watchers on this stream, shutdown
643			if len(w.substreams)+len(w.resuming) == 0 {
644				return
645			}
646		}
647	}
648}
649
650// nextResume chooses the next resuming to register with the grpc stream. Abandoned
651// streams are marked as nil in the queue since the head must wait for its inflight registration.
652func (w *watchGrpcStream) nextResume() *watcherStream {
653	for len(w.resuming) != 0 {
654		if w.resuming[0] != nil {
655			return w.resuming[0]
656		}
657		w.resuming = w.resuming[1:len(w.resuming)]
658	}
659	return nil
660}
661
662// dispatchEvent sends a WatchResponse to the appropriate watcher stream
663func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
664	events := make([]*Event, len(pbresp.Events))
665	for i, ev := range pbresp.Events {
666		events[i] = (*Event)(ev)
667	}
668	// TODO: return watch ID?
669	wr := &WatchResponse{
670		Header:          *pbresp.Header,
671		Events:          events,
672		CompactRevision: pbresp.CompactRevision,
673		Created:         pbresp.Created,
674		Canceled:        pbresp.Canceled,
675		cancelReason:    pbresp.CancelReason,
676	}
677
678	// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
679	// indicate they should be broadcast.
680	if wr.IsProgressNotify() && pbresp.WatchId == -1 {
681		return w.broadcastResponse(wr)
682	}
683
684	return w.unicastResponse(wr, pbresp.WatchId)
685
686}
687
688// broadcastResponse send a watch response to all watch substreams.
689func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
690	for _, ws := range w.substreams {
691		select {
692		case ws.recvc <- wr:
693		case <-ws.donec:
694		}
695	}
696	return true
697}
698
699// unicastResponse sends a watch response to a specific watch substream.
700func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
701	ws, ok := w.substreams[watchId]
702	if !ok {
703		return false
704	}
705	select {
706	case ws.recvc <- wr:
707	case <-ws.donec:
708		return false
709	}
710	return true
711}
712
713// serveWatchClient forwards messages from the grpc stream to run()
714func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
715	for {
716		resp, err := wc.Recv()
717		if err != nil {
718			select {
719			case w.errc <- err:
720			case <-w.donec:
721			}
722			return
723		}
724		select {
725		case w.respc <- resp:
726		case <-w.donec:
727			return
728		}
729	}
730}
731
732// serveSubstream forwards watch responses from run() to the subscriber
733func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
734	if ws.closing {
735		panic("created substream goroutine but substream is closing")
736	}
737
738	// nextRev is the minimum expected next revision
739	nextRev := ws.initReq.rev
740	resuming := false
741	defer func() {
742		if !resuming {
743			ws.closing = true
744		}
745		close(ws.donec)
746		if !resuming {
747			w.closingc <- ws
748		}
749		w.wg.Done()
750	}()
751
752	emptyWr := &WatchResponse{}
753	for {
754		curWr := emptyWr
755		outc := ws.outc
756
757		if len(ws.buf) > 0 {
758			curWr = ws.buf[0]
759		} else {
760			outc = nil
761		}
762		select {
763		case outc <- *curWr:
764			if ws.buf[0].Err() != nil {
765				return
766			}
767			ws.buf[0] = nil
768			ws.buf = ws.buf[1:]
769		case wr, ok := <-ws.recvc:
770			if !ok {
771				// shutdown from closeSubstream
772				return
773			}
774
775			if wr.Created {
776				if ws.initReq.retc != nil {
777					ws.initReq.retc <- ws.outc
778					// to prevent next write from taking the slot in buffered channel
779					// and posting duplicate create events
780					ws.initReq.retc = nil
781
782					// send first creation event only if requested
783					if ws.initReq.createdNotify {
784						ws.outc <- *wr
785					}
786					// once the watch channel is returned, a current revision
787					// watch must resume at the store revision. This is necessary
788					// for the following case to work as expected:
789					//	wch := m1.Watch("a")
790					//	m2.Put("a", "b")
791					//	<-wch
792					// If the revision is only bound on the first observed event,
793					// if wch is disconnected before the Put is issued, then reconnects
794					// after it is committed, it'll miss the Put.
795					if ws.initReq.rev == 0 {
796						nextRev = wr.Header.Revision
797					}
798				}
799			} else {
800				// current progress of watch; <= store revision
801				nextRev = wr.Header.Revision
802			}
803
804			if len(wr.Events) > 0 {
805				nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
806			}
807			ws.initReq.rev = nextRev
808
809			// created event is already sent above,
810			// watcher should not post duplicate events
811			if wr.Created {
812				continue
813			}
814
815			// TODO pause channel if buffer gets too large
816			ws.buf = append(ws.buf, wr)
817		case <-w.ctx.Done():
818			return
819		case <-ws.initReq.ctx.Done():
820			return
821		case <-resumec:
822			resuming = true
823			return
824		}
825	}
826	// lazily send cancel message if events on missing id
827}
828
829func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
830	// mark all substreams as resuming
831	close(w.resumec)
832	w.resumec = make(chan struct{})
833	w.joinSubstreams()
834	for _, ws := range w.substreams {
835		ws.id = -1
836		w.resuming = append(w.resuming, ws)
837	}
838	// strip out nils, if any
839	var resuming []*watcherStream
840	for _, ws := range w.resuming {
841		if ws != nil {
842			resuming = append(resuming, ws)
843		}
844	}
845	w.resuming = resuming
846	w.substreams = make(map[int64]*watcherStream)
847
848	// connect to grpc stream while accepting watcher cancelation
849	stopc := make(chan struct{})
850	donec := w.waitCancelSubstreams(stopc)
851	wc, err := w.openWatchClient()
852	close(stopc)
853	<-donec
854
855	// serve all non-closing streams, even if there's a client error
856	// so that the teardown path can shutdown the streams as expected.
857	for _, ws := range w.resuming {
858		if ws.closing {
859			continue
860		}
861		ws.donec = make(chan struct{})
862		w.wg.Add(1)
863		go w.serveSubstream(ws, w.resumec)
864	}
865
866	if err != nil {
867		return nil, v3rpc.Error(err)
868	}
869
870	// receive data from new grpc stream
871	go w.serveWatchClient(wc)
872	return wc, nil
873}
874
875func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
876	var wg sync.WaitGroup
877	wg.Add(len(w.resuming))
878	donec := make(chan struct{})
879	for i := range w.resuming {
880		go func(ws *watcherStream) {
881			defer wg.Done()
882			if ws.closing {
883				if ws.initReq.ctx.Err() != nil && ws.outc != nil {
884					close(ws.outc)
885					ws.outc = nil
886				}
887				return
888			}
889			select {
890			case <-ws.initReq.ctx.Done():
891				// closed ws will be removed from resuming
892				ws.closing = true
893				close(ws.outc)
894				ws.outc = nil
895				w.wg.Add(1)
896				go func() {
897					defer w.wg.Done()
898					w.closingc <- ws
899				}()
900			case <-stopc:
901			}
902		}(w.resuming[i])
903	}
904	go func() {
905		defer close(donec)
906		wg.Wait()
907	}()
908	return donec
909}
910
911// joinSubstreams waits for all substream goroutines to complete.
912func (w *watchGrpcStream) joinSubstreams() {
913	for _, ws := range w.substreams {
914		<-ws.donec
915	}
916	for _, ws := range w.resuming {
917		if ws != nil {
918			<-ws.donec
919		}
920	}
921}
922
923var maxBackoff = 100 * time.Millisecond
924
925// openWatchClient retries opening a watch client until success or halt.
926// manually retry in case "ws==nil && err==nil"
927// TODO: remove FailFast=false
928func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
929	backoff := time.Millisecond
930	for {
931		select {
932		case <-w.ctx.Done():
933			if err == nil {
934				return nil, w.ctx.Err()
935			}
936			return nil, err
937		default:
938		}
939		if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
940			break
941		}
942		if isHaltErr(w.ctx, err) {
943			return nil, v3rpc.Error(err)
944		}
945		if isUnavailableErr(w.ctx, err) {
946			// retry, but backoff
947			if backoff < maxBackoff {
948				// 25% backoff factor
949				backoff = backoff + backoff/4
950				if backoff > maxBackoff {
951					backoff = maxBackoff
952				}
953			}
954			time.Sleep(backoff)
955		}
956	}
957	return ws, nil
958}
959
960// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
961func (wr *watchRequest) toPB() *pb.WatchRequest {
962	req := &pb.WatchCreateRequest{
963		StartRevision:  wr.rev,
964		Key:            []byte(wr.key),
965		RangeEnd:       []byte(wr.end),
966		ProgressNotify: wr.progressNotify,
967		Filters:        wr.filters,
968		PrevKv:         wr.prevKV,
969		Fragment:       wr.fragment,
970	}
971	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
972	return &pb.WatchRequest{RequestUnion: cr}
973}
974
975// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
976func (pr *progressRequest) toPB() *pb.WatchRequest {
977	req := &pb.WatchProgressRequest{}
978	cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
979	return &pb.WatchRequest{RequestUnion: cr}
980}
981
982func streamKeyFromCtx(ctx context.Context) string {
983	if md, ok := metadata.FromOutgoingContext(ctx); ok {
984		return fmt.Sprintf("%+v", md)
985	}
986	return ""
987}
988