1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"sync"
22	"time"
23
24	v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
26	"go.etcd.io/etcd/mvcc/mvccpb"
27
28	"go.uber.org/zap"
29	"google.golang.org/grpc"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/metadata"
32	"google.golang.org/grpc/status"
33)
34
35const (
36	EventTypeDelete = mvccpb.DELETE
37	EventTypePut    = mvccpb.PUT
38
39	closeSendErrTimeout = 250 * time.Millisecond
40)
41
42type Event mvccpb.Event
43
44type WatchChan <-chan WatchResponse
45
46type Watcher interface {
47	// Watch watches on a key or prefix. The watched events will be returned
48	// through the returned channel. If revisions waiting to be sent over the
49	// watch are compacted, then the watch will be canceled by the server, the
50	// client will post a compacted error watch response, and the channel will close.
51	// If the requested revision is 0 or unspecified, the returned channel will
52	// return watch events that happen after the server receives the watch request.
53	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
54	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
55	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
56	// to release the associated resources.
57	//
58	// If the context is "context.Background/TODO", returned "WatchChan" will
59	// not be closed and block until event is triggered, except when server
60	// returns a non-recoverable error (e.g. ErrCompacted).
61	// For example, when context passed with "WithRequireLeader" and the
62	// connected server has no leader (e.g. due to network partition),
63	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
64	// and then "WatchChan" is closed with non-nil "Err()".
65	// In order to prevent a watch stream being stuck in a partitioned node,
66	// make sure to wrap context with "WithRequireLeader".
67	//
68	// Otherwise, as long as the context has not been canceled or timed out,
69	// watch will retry on other recoverable errors forever until reconnected.
70	//
71	// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
72	// Currently, client contexts are overwritten with "valCtx" that never closes.
73	// TODO(v3.4): configure watch retry policy, limit maximum retry number
74	// (see https://github.com/etcd-io/etcd/issues/8980)
75	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
76
77	// RequestProgress requests a progress notify response be sent in all watch channels.
78	RequestProgress(ctx context.Context) error
79
80	// Close closes the watcher and cancels all watch requests.
81	Close() error
82}
83
84type WatchResponse struct {
85	Header pb.ResponseHeader
86	Events []*Event
87
88	// CompactRevision is the minimum revision the watcher may receive.
89	CompactRevision int64
90
91	// Canceled is used to indicate watch failure.
92	// If the watch failed and the stream was about to close, before the channel is closed,
93	// the channel sends a final response that has Canceled set to true with a non-nil Err().
94	Canceled bool
95
96	// Created is used to indicate the creation of the watcher.
97	Created bool
98
99	closeErr error
100
101	// cancelReason is a reason of canceling watch
102	cancelReason string
103}
104
105// IsCreate returns true if the event tells that the key is newly created.
106func (e *Event) IsCreate() bool {
107	return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
108}
109
110// IsModify returns true if the event tells that a new value is put on existing key.
111func (e *Event) IsModify() bool {
112	return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
113}
114
115// Err is the error value if this WatchResponse holds an error.
116func (wr *WatchResponse) Err() error {
117	switch {
118	case wr.closeErr != nil:
119		return v3rpc.Error(wr.closeErr)
120	case wr.CompactRevision != 0:
121		return v3rpc.ErrCompacted
122	case wr.Canceled:
123		if len(wr.cancelReason) != 0 {
124			return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
125		}
126		return v3rpc.ErrFutureRev
127	}
128	return nil
129}
130
131// IsProgressNotify returns true if the WatchResponse is progress notification.
132func (wr *WatchResponse) IsProgressNotify() bool {
133	return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
134}
135
136// watcher implements the Watcher interface
137type watcher struct {
138	remote   pb.WatchClient
139	callOpts []grpc.CallOption
140
141	// mu protects the grpc streams map
142	mu sync.RWMutex
143
144	// streams holds all the active grpc streams keyed by ctx value.
145	streams map[string]*watchGrpcStream
146	lg      *zap.Logger
147}
148
149// watchGrpcStream tracks all watch resources attached to a single grpc stream.
150type watchGrpcStream struct {
151	owner    *watcher
152	remote   pb.WatchClient
153	callOpts []grpc.CallOption
154
155	// ctx controls internal remote.Watch requests
156	ctx context.Context
157	// ctxKey is the key used when looking up this stream's context
158	ctxKey string
159	cancel context.CancelFunc
160
161	// substreams holds all active watchers on this grpc stream
162	substreams map[int64]*watcherStream
163	// resuming holds all resuming watchers on this grpc stream
164	resuming []*watcherStream
165
166	// reqc sends a watch request from Watch() to the main goroutine
167	reqc chan watchStreamRequest
168	// respc receives data from the watch client
169	respc chan *pb.WatchResponse
170	// donec closes to broadcast shutdown
171	donec chan struct{}
172	// errc transmits errors from grpc Recv to the watch stream reconnect logic
173	errc chan error
174	// closingc gets the watcherStream of closing watchers
175	closingc chan *watcherStream
176	// wg is Done when all substream goroutines have exited
177	wg sync.WaitGroup
178
179	// resumec closes to signal that all substreams should begin resuming
180	resumec chan struct{}
181	// closeErr is the error that closed the watch stream
182	closeErr error
183}
184
185// watchStreamRequest is a union of the supported watch request operation types
186type watchStreamRequest interface {
187	toPB() *pb.WatchRequest
188}
189
190// watchRequest is issued by the subscriber to start a new watcher
191type watchRequest struct {
192	ctx context.Context
193	key string
194	end string
195	rev int64
196
197	// send created notification event if this field is true
198	createdNotify bool
199	// progressNotify is for progress updates
200	progressNotify bool
201	// fragmentation should be disabled by default
202	// if true, split watch events when total exceeds
203	// "--max-request-bytes" flag value + 512-byte
204	fragment bool
205
206	// filters is the list of events to filter out
207	filters []pb.WatchCreateRequest_FilterType
208	// get the previous key-value pair before the event happens
209	prevKV bool
210	// retc receives a chan WatchResponse once the watcher is established
211	retc chan chan WatchResponse
212}
213
214// progressRequest is issued by the subscriber to request watch progress
215type progressRequest struct {
216}
217
218// watcherStream represents a registered watcher
219type watcherStream struct {
220	// initReq is the request that initiated this request
221	initReq watchRequest
222
223	// outc publishes watch responses to subscriber
224	outc chan WatchResponse
225	// recvc buffers watch responses before publishing
226	recvc chan *WatchResponse
227	// donec closes when the watcherStream goroutine stops.
228	donec chan struct{}
229	// closing is set to true when stream should be scheduled to shutdown.
230	closing bool
231	// id is the registered watch id on the grpc stream
232	id int64
233
234	// buf holds all events received from etcd but not yet consumed by the client
235	buf []*WatchResponse
236}
237
238func NewWatcher(c *Client) Watcher {
239	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
240}
241
242func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
243	w := &watcher{
244		remote:  wc,
245		streams: make(map[string]*watchGrpcStream),
246	}
247	if c != nil {
248		w.callOpts = c.callOpts
249		w.lg = c.lg
250	}
251	return w
252}
253
254// never closes
255var valCtxCh = make(chan struct{})
256var zeroTime = time.Unix(0, 0)
257
258// ctx with only the values; never Done
259type valCtx struct{ context.Context }
260
261func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
262func (vc *valCtx) Done() <-chan struct{}       { return valCtxCh }
263func (vc *valCtx) Err() error                  { return nil }
264
265func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
266	ctx, cancel := context.WithCancel(&valCtx{inctx})
267	wgs := &watchGrpcStream{
268		owner:      w,
269		remote:     w.remote,
270		callOpts:   w.callOpts,
271		ctx:        ctx,
272		ctxKey:     streamKeyFromCtx(inctx),
273		cancel:     cancel,
274		substreams: make(map[int64]*watcherStream),
275		respc:      make(chan *pb.WatchResponse),
276		reqc:       make(chan watchStreamRequest),
277		donec:      make(chan struct{}),
278		errc:       make(chan error, 1),
279		closingc:   make(chan *watcherStream),
280		resumec:    make(chan struct{}),
281	}
282	go wgs.run()
283	return wgs
284}
285
286// Watch posts a watch request to run() and waits for a new watcher channel
287func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
288	ow := opWatch(key, opts...)
289
290	var filters []pb.WatchCreateRequest_FilterType
291	if ow.filterPut {
292		filters = append(filters, pb.WatchCreateRequest_NOPUT)
293	}
294	if ow.filterDelete {
295		filters = append(filters, pb.WatchCreateRequest_NODELETE)
296	}
297
298	wr := &watchRequest{
299		ctx:            ctx,
300		createdNotify:  ow.createdNotify,
301		key:            string(ow.key),
302		end:            string(ow.end),
303		rev:            ow.rev,
304		progressNotify: ow.progressNotify,
305		fragment:       ow.fragment,
306		filters:        filters,
307		prevKV:         ow.prevKV,
308		retc:           make(chan chan WatchResponse, 1),
309	}
310
311	ok := false
312	ctxKey := streamKeyFromCtx(ctx)
313
314	// find or allocate appropriate grpc watch stream
315	w.mu.Lock()
316	if w.streams == nil {
317		// closed
318		w.mu.Unlock()
319		ch := make(chan WatchResponse)
320		close(ch)
321		return ch
322	}
323	wgs := w.streams[ctxKey]
324	if wgs == nil {
325		wgs = w.newWatcherGrpcStream(ctx)
326		w.streams[ctxKey] = wgs
327	}
328	donec := wgs.donec
329	reqc := wgs.reqc
330	w.mu.Unlock()
331
332	// couldn't create channel; return closed channel
333	closeCh := make(chan WatchResponse, 1)
334
335	// submit request
336	select {
337	case reqc <- wr:
338		ok = true
339	case <-wr.ctx.Done():
340	case <-donec:
341		if wgs.closeErr != nil {
342			closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
343			break
344		}
345		// retry; may have dropped stream from no ctxs
346		return w.Watch(ctx, key, opts...)
347	}
348
349	// receive channel
350	if ok {
351		select {
352		case ret := <-wr.retc:
353			return ret
354		case <-ctx.Done():
355		case <-donec:
356			if wgs.closeErr != nil {
357				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
358				break
359			}
360			// retry; may have dropped stream from no ctxs
361			return w.Watch(ctx, key, opts...)
362		}
363	}
364
365	close(closeCh)
366	return closeCh
367}
368
369func (w *watcher) Close() (err error) {
370	w.mu.Lock()
371	streams := w.streams
372	w.streams = nil
373	w.mu.Unlock()
374	for _, wgs := range streams {
375		if werr := wgs.close(); werr != nil {
376			err = werr
377		}
378	}
379	// Consider context.Canceled as a successful close
380	if err == context.Canceled {
381		err = nil
382	}
383	return err
384}
385
386// RequestProgress requests a progress notify response be sent in all watch channels.
387func (w *watcher) RequestProgress(ctx context.Context) (err error) {
388	ctxKey := streamKeyFromCtx(ctx)
389
390	w.mu.Lock()
391	if w.streams == nil {
392		w.mu.Unlock()
393		return fmt.Errorf("no stream found for context")
394	}
395	wgs := w.streams[ctxKey]
396	if wgs == nil {
397		wgs = w.newWatcherGrpcStream(ctx)
398		w.streams[ctxKey] = wgs
399	}
400	donec := wgs.donec
401	reqc := wgs.reqc
402	w.mu.Unlock()
403
404	pr := &progressRequest{}
405
406	select {
407	case reqc <- pr:
408		return nil
409	case <-ctx.Done():
410		return ctx.Err()
411	case <-donec:
412		if wgs.closeErr != nil {
413			return wgs.closeErr
414		}
415		// retry; may have dropped stream from no ctxs
416		return w.RequestProgress(ctx)
417	}
418}
419
420func (w *watchGrpcStream) close() (err error) {
421	w.cancel()
422	<-w.donec
423	select {
424	case err = <-w.errc:
425	default:
426	}
427	return toErr(w.ctx, err)
428}
429
430func (w *watcher) closeStream(wgs *watchGrpcStream) {
431	w.mu.Lock()
432	close(wgs.donec)
433	wgs.cancel()
434	if w.streams != nil {
435		delete(w.streams, wgs.ctxKey)
436	}
437	w.mu.Unlock()
438}
439
440func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
441	// check watch ID for backward compatibility (<= v3.3)
442	if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") {
443		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
444		// failed; no channel
445		close(ws.recvc)
446		return
447	}
448	ws.id = resp.WatchId
449	w.substreams[ws.id] = ws
450}
451
452func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
453	select {
454	case ws.outc <- *resp:
455	case <-ws.initReq.ctx.Done():
456	case <-time.After(closeSendErrTimeout):
457	}
458	close(ws.outc)
459}
460
461func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
462	// send channel response in case stream was never established
463	select {
464	case ws.initReq.retc <- ws.outc:
465	default:
466	}
467	// close subscriber's channel
468	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
469		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
470	} else if ws.outc != nil {
471		close(ws.outc)
472	}
473	if ws.id != -1 {
474		delete(w.substreams, ws.id)
475		return
476	}
477	for i := range w.resuming {
478		if w.resuming[i] == ws {
479			w.resuming[i] = nil
480			return
481		}
482	}
483}
484
485// run is the root of the goroutines for managing a watcher client
486func (w *watchGrpcStream) run() {
487	var wc pb.Watch_WatchClient
488	var closeErr error
489
490	// substreams marked to close but goroutine still running; needed for
491	// avoiding double-closing recvc on grpc stream teardown
492	closing := make(map[*watcherStream]struct{})
493
494	defer func() {
495		w.closeErr = closeErr
496		// shutdown substreams and resuming substreams
497		for _, ws := range w.substreams {
498			if _, ok := closing[ws]; !ok {
499				close(ws.recvc)
500				closing[ws] = struct{}{}
501			}
502		}
503		for _, ws := range w.resuming {
504			if _, ok := closing[ws]; ws != nil && !ok {
505				close(ws.recvc)
506				closing[ws] = struct{}{}
507			}
508		}
509		w.joinSubstreams()
510		for range closing {
511			w.closeSubstream(<-w.closingc)
512		}
513		w.wg.Wait()
514		w.owner.closeStream(w)
515	}()
516
517	// start a stream with the etcd grpc server
518	if wc, closeErr = w.newWatchClient(); closeErr != nil {
519		return
520	}
521
522	cancelSet := make(map[int64]struct{})
523
524	var cur *pb.WatchResponse
525	for {
526		select {
527		// Watch() requested
528		case req := <-w.reqc:
529			switch wreq := req.(type) {
530			case *watchRequest:
531				outc := make(chan WatchResponse, 1)
532				// TODO: pass custom watch ID?
533				ws := &watcherStream{
534					initReq: *wreq,
535					id:      -1,
536					outc:    outc,
537					// unbuffered so resumes won't cause repeat events
538					recvc: make(chan *WatchResponse),
539				}
540
541				ws.donec = make(chan struct{})
542				w.wg.Add(1)
543				go w.serveSubstream(ws, w.resumec)
544
545				// queue up for watcher creation/resume
546				w.resuming = append(w.resuming, ws)
547				if len(w.resuming) == 1 {
548					// head of resume queue, can register a new watcher
549					if err := wc.Send(ws.initReq.toPB()); err != nil {
550						lg.Warningf("error when sending request: %v", err)
551					}
552				}
553			case *progressRequest:
554				if err := wc.Send(wreq.toPB()); err != nil {
555					lg.Warningf("error when sending request: %v", err)
556				}
557			}
558
559		// new events from the watch client
560		case pbresp := <-w.respc:
561			if cur == nil || pbresp.Created || pbresp.Canceled {
562				cur = pbresp
563			} else if cur != nil && cur.WatchId == pbresp.WatchId {
564				// merge new events
565				cur.Events = append(cur.Events, pbresp.Events...)
566				// update "Fragment" field; last response with "Fragment" == false
567				cur.Fragment = pbresp.Fragment
568			}
569
570			switch {
571			case pbresp.Created:
572				// response to head of queue creation
573				if ws := w.resuming[0]; ws != nil {
574					w.addSubstream(pbresp, ws)
575					w.dispatchEvent(pbresp)
576					w.resuming[0] = nil
577				}
578
579				if ws := w.nextResume(); ws != nil {
580					if err := wc.Send(ws.initReq.toPB()); err != nil {
581						lg.Warningf("error when sending request: %v", err)
582					}
583				}
584
585				// reset for next iteration
586				cur = nil
587
588			case pbresp.Canceled && pbresp.CompactRevision == 0:
589				delete(cancelSet, pbresp.WatchId)
590				if ws, ok := w.substreams[pbresp.WatchId]; ok {
591					// signal to stream goroutine to update closingc
592					close(ws.recvc)
593					closing[ws] = struct{}{}
594				}
595
596				// reset for next iteration
597				cur = nil
598
599			case cur.Fragment:
600				// watch response events are still fragmented
601				// continue to fetch next fragmented event arrival
602				continue
603
604			default:
605				// dispatch to appropriate watch stream
606				ok := w.dispatchEvent(cur)
607
608				// reset for next iteration
609				cur = nil
610
611				if ok {
612					break
613				}
614
615				// watch response on unexpected watch id; cancel id
616				if _, ok := cancelSet[pbresp.WatchId]; ok {
617					break
618				}
619
620				cancelSet[pbresp.WatchId] = struct{}{}
621				cr := &pb.WatchRequest_CancelRequest{
622					CancelRequest: &pb.WatchCancelRequest{
623						WatchId: pbresp.WatchId,
624					},
625				}
626				req := &pb.WatchRequest{RequestUnion: cr}
627				if err := wc.Send(req); err != nil {
628					lg.Warningf("error when sending request: %v", err)
629				}
630			}
631
632		// watch client failed on Recv; spawn another if possible
633		case err := <-w.errc:
634			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
635				closeErr = err
636				return
637			}
638			if wc, closeErr = w.newWatchClient(); closeErr != nil {
639				return
640			}
641			if ws := w.nextResume(); ws != nil {
642				if err := wc.Send(ws.initReq.toPB()); err != nil {
643					lg.Warningf("error when sending request: %v", err)
644				}
645			}
646			cancelSet = make(map[int64]struct{})
647
648		case <-w.ctx.Done():
649			return
650
651		case ws := <-w.closingc:
652			w.closeSubstream(ws)
653			delete(closing, ws)
654			// no more watchers on this stream, shutdown
655			if len(w.substreams)+len(w.resuming) == 0 {
656				return
657			}
658		}
659	}
660}
661
662// nextResume chooses the next resuming to register with the grpc stream. Abandoned
663// streams are marked as nil in the queue since the head must wait for its inflight registration.
664func (w *watchGrpcStream) nextResume() *watcherStream {
665	for len(w.resuming) != 0 {
666		if w.resuming[0] != nil {
667			return w.resuming[0]
668		}
669		w.resuming = w.resuming[1:len(w.resuming)]
670	}
671	return nil
672}
673
674// dispatchEvent sends a WatchResponse to the appropriate watcher stream
675func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
676	events := make([]*Event, len(pbresp.Events))
677	for i, ev := range pbresp.Events {
678		events[i] = (*Event)(ev)
679	}
680	// TODO: return watch ID?
681	wr := &WatchResponse{
682		Header:          *pbresp.Header,
683		Events:          events,
684		CompactRevision: pbresp.CompactRevision,
685		Created:         pbresp.Created,
686		Canceled:        pbresp.Canceled,
687		cancelReason:    pbresp.CancelReason,
688	}
689
690	// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to
691	// indicate they should be broadcast.
692	if wr.IsProgressNotify() && pbresp.WatchId == -1 {
693		return w.broadcastResponse(wr)
694	}
695
696	return w.unicastResponse(wr, pbresp.WatchId)
697
698}
699
700// broadcastResponse send a watch response to all watch substreams.
701func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
702	for _, ws := range w.substreams {
703		select {
704		case ws.recvc <- wr:
705		case <-ws.donec:
706		}
707	}
708	return true
709}
710
711// unicastResponse sends a watch response to a specific watch substream.
712func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
713	ws, ok := w.substreams[watchId]
714	if !ok {
715		return false
716	}
717	select {
718	case ws.recvc <- wr:
719	case <-ws.donec:
720		return false
721	}
722	return true
723}
724
725// serveWatchClient forwards messages from the grpc stream to run()
726func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
727	for {
728		resp, err := wc.Recv()
729		if err != nil {
730			select {
731			case w.errc <- err:
732			case <-w.donec:
733			}
734			return
735		}
736		select {
737		case w.respc <- resp:
738		case <-w.donec:
739			return
740		}
741	}
742}
743
744// serveSubstream forwards watch responses from run() to the subscriber
745func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
746	if ws.closing {
747		panic("created substream goroutine but substream is closing")
748	}
749
750	// nextRev is the minimum expected next revision
751	nextRev := ws.initReq.rev
752	resuming := false
753	defer func() {
754		if !resuming {
755			ws.closing = true
756		}
757		close(ws.donec)
758		if !resuming {
759			w.closingc <- ws
760		}
761		w.wg.Done()
762	}()
763
764	emptyWr := &WatchResponse{}
765	for {
766		curWr := emptyWr
767		outc := ws.outc
768
769		if len(ws.buf) > 0 {
770			curWr = ws.buf[0]
771		} else {
772			outc = nil
773		}
774		select {
775		case outc <- *curWr:
776			if ws.buf[0].Err() != nil {
777				return
778			}
779			ws.buf[0] = nil
780			ws.buf = ws.buf[1:]
781		case wr, ok := <-ws.recvc:
782			if !ok {
783				// shutdown from closeSubstream
784				return
785			}
786
787			if wr.Created {
788				if ws.initReq.retc != nil {
789					ws.initReq.retc <- ws.outc
790					// to prevent next write from taking the slot in buffered channel
791					// and posting duplicate create events
792					ws.initReq.retc = nil
793
794					// send first creation event only if requested
795					if ws.initReq.createdNotify {
796						ws.outc <- *wr
797					}
798					// once the watch channel is returned, a current revision
799					// watch must resume at the store revision. This is necessary
800					// for the following case to work as expected:
801					//	wch := m1.Watch("a")
802					//	m2.Put("a", "b")
803					//	<-wch
804					// If the revision is only bound on the first observed event,
805					// if wch is disconnected before the Put is issued, then reconnects
806					// after it is committed, it'll miss the Put.
807					if ws.initReq.rev == 0 {
808						nextRev = wr.Header.Revision
809					}
810				}
811			} else {
812				// current progress of watch; <= store revision
813				nextRev = wr.Header.Revision
814			}
815
816			if len(wr.Events) > 0 {
817				nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
818			}
819			ws.initReq.rev = nextRev
820
821			// created event is already sent above,
822			// watcher should not post duplicate events
823			if wr.Created {
824				continue
825			}
826
827			// TODO pause channel if buffer gets too large
828			ws.buf = append(ws.buf, wr)
829		case <-w.ctx.Done():
830			return
831		case <-ws.initReq.ctx.Done():
832			return
833		case <-resumec:
834			resuming = true
835			return
836		}
837	}
838	// lazily send cancel message if events on missing id
839}
840
841func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
842	// mark all substreams as resuming
843	close(w.resumec)
844	w.resumec = make(chan struct{})
845	w.joinSubstreams()
846	for _, ws := range w.substreams {
847		ws.id = -1
848		w.resuming = append(w.resuming, ws)
849	}
850	// strip out nils, if any
851	var resuming []*watcherStream
852	for _, ws := range w.resuming {
853		if ws != nil {
854			resuming = append(resuming, ws)
855		}
856	}
857	w.resuming = resuming
858	w.substreams = make(map[int64]*watcherStream)
859
860	// connect to grpc stream while accepting watcher cancelation
861	stopc := make(chan struct{})
862	donec := w.waitCancelSubstreams(stopc)
863	wc, err := w.openWatchClient()
864	close(stopc)
865	<-donec
866
867	// serve all non-closing streams, even if there's a client error
868	// so that the teardown path can shutdown the streams as expected.
869	for _, ws := range w.resuming {
870		if ws.closing {
871			continue
872		}
873		ws.donec = make(chan struct{})
874		w.wg.Add(1)
875		go w.serveSubstream(ws, w.resumec)
876	}
877
878	if err != nil {
879		return nil, v3rpc.Error(err)
880	}
881
882	// receive data from new grpc stream
883	go w.serveWatchClient(wc)
884	return wc, nil
885}
886
887func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
888	var wg sync.WaitGroup
889	wg.Add(len(w.resuming))
890	donec := make(chan struct{})
891	for i := range w.resuming {
892		go func(ws *watcherStream) {
893			defer wg.Done()
894			if ws.closing {
895				if ws.initReq.ctx.Err() != nil && ws.outc != nil {
896					close(ws.outc)
897					ws.outc = nil
898				}
899				return
900			}
901			select {
902			case <-ws.initReq.ctx.Done():
903				// closed ws will be removed from resuming
904				ws.closing = true
905				close(ws.outc)
906				ws.outc = nil
907				w.wg.Add(1)
908				go func() {
909					defer w.wg.Done()
910					w.closingc <- ws
911				}()
912			case <-stopc:
913			}
914		}(w.resuming[i])
915	}
916	go func() {
917		defer close(donec)
918		wg.Wait()
919	}()
920	return donec
921}
922
923// joinSubstreams waits for all substream goroutines to complete.
924func (w *watchGrpcStream) joinSubstreams() {
925	for _, ws := range w.substreams {
926		<-ws.donec
927	}
928	for _, ws := range w.resuming {
929		if ws != nil {
930			<-ws.donec
931		}
932	}
933}
934
935var maxBackoff = 100 * time.Millisecond
936
937// openWatchClient retries opening a watch client until success or halt.
938// manually retry in case "ws==nil && err==nil"
939// TODO: remove FailFast=false
940func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
941	backoff := time.Millisecond
942	for {
943		select {
944		case <-w.ctx.Done():
945			if err == nil {
946				return nil, w.ctx.Err()
947			}
948			return nil, err
949		default:
950		}
951		if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
952			break
953		}
954		if isHaltErr(w.ctx, err) {
955			return nil, v3rpc.Error(err)
956		}
957		if isUnavailableErr(w.ctx, err) {
958			// retry, but backoff
959			if backoff < maxBackoff {
960				// 25% backoff factor
961				backoff = backoff + backoff/4
962				if backoff > maxBackoff {
963					backoff = maxBackoff
964				}
965			}
966			time.Sleep(backoff)
967		}
968	}
969	return ws, nil
970}
971
972// toPB converts an internal watch request structure to its protobuf WatchRequest structure.
973func (wr *watchRequest) toPB() *pb.WatchRequest {
974	req := &pb.WatchCreateRequest{
975		StartRevision:  wr.rev,
976		Key:            []byte(wr.key),
977		RangeEnd:       []byte(wr.end),
978		ProgressNotify: wr.progressNotify,
979		Filters:        wr.filters,
980		PrevKv:         wr.prevKV,
981		Fragment:       wr.fragment,
982	}
983	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
984	return &pb.WatchRequest{RequestUnion: cr}
985}
986
987// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
988func (pr *progressRequest) toPB() *pb.WatchRequest {
989	req := &pb.WatchProgressRequest{}
990	cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
991	return &pb.WatchRequest{RequestUnion: cr}
992}
993
994func streamKeyFromCtx(ctx context.Context) string {
995	if md, ok := metadata.FromOutgoingContext(ctx); ok {
996		return fmt.Sprintf("%+v", md)
997	}
998	return ""
999}
1000