1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18	"context"
19	"sync"
20	"time"
21
22	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
24
25	"go.uber.org/zap"
26	"google.golang.org/grpc"
27	"google.golang.org/grpc/metadata"
28)
29
30type (
31	LeaseRevokeResponse pb.LeaseRevokeResponse
32	LeaseID             int64
33)
34
35// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
36type LeaseGrantResponse struct {
37	*pb.ResponseHeader
38	ID    LeaseID
39	TTL   int64
40	Error string
41}
42
43// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
44type LeaseKeepAliveResponse struct {
45	*pb.ResponseHeader
46	ID  LeaseID
47	TTL int64
48}
49
50// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
51type LeaseTimeToLiveResponse struct {
52	*pb.ResponseHeader
53	ID LeaseID `json:"id"`
54
55	// TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
56	TTL int64 `json:"ttl"`
57
58	// GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
59	GrantedTTL int64 `json:"granted-ttl"`
60
61	// Keys is the list of keys attached to this lease.
62	Keys [][]byte `json:"keys"`
63}
64
65// LeaseStatus represents a lease status.
66type LeaseStatus struct {
67	ID LeaseID `json:"id"`
68	// TODO: TTL int64
69}
70
71// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
72type LeaseLeasesResponse struct {
73	*pb.ResponseHeader
74	Leases []LeaseStatus `json:"leases"`
75}
76
77const (
78	// defaultTTL is the assumed lease TTL used for the first keepalive
79	// deadline before the actual TTL is known to the client.
80	defaultTTL = 5 * time.Second
81	// NoLease is a lease ID for the absence of a lease.
82	NoLease LeaseID = 0
83
84	// retryConnWait is how long to wait before retrying request due to an error
85	retryConnWait = 500 * time.Millisecond
86)
87
88// LeaseResponseChSize is the size of buffer to store unsent lease responses.
89// WARNING: DO NOT UPDATE.
90// Only for testing purposes.
91var LeaseResponseChSize = 16
92
93// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
94//
95// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
96type ErrKeepAliveHalted struct {
97	Reason error
98}
99
100func (e ErrKeepAliveHalted) Error() string {
101	s := "etcdclient: leases keep alive halted"
102	if e.Reason != nil {
103		s += ": " + e.Reason.Error()
104	}
105	return s
106}
107
108type Lease interface {
109	// Grant creates a new lease.
110	Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
111
112	// Revoke revokes the given lease.
113	Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
114
115	// TimeToLive retrieves the lease information of the given lease ID.
116	TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
117
118	// Leases retrieves all leases.
119	Leases(ctx context.Context) (*LeaseLeasesResponse, error)
120
121	// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
122	// to the channel are not consumed promptly the channel may become full. When full, the lease
123	// client will continue sending keep alive requests to the etcd server, but will drop responses
124	// until there is capacity on the channel to send more responses.
125	//
126	// If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
127	// canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
128	// containing the error reason.
129	//
130	// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
131	// alive stream is interrupted in some way the client cannot handle itself;
132	// given context "ctx" is canceled or timed out.
133	//
134	// TODO(v4.0): post errors to last keep alive message before closing
135	// (see https://github.com/etcd-io/etcd/pull/7866)
136	KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
137
138	// KeepAliveOnce renews the lease once. The response corresponds to the
139	// first message from calling KeepAlive. If the response has a recoverable
140	// error, KeepAliveOnce will retry the RPC with a new keep alive message.
141	//
142	// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
143	KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
144
145	// Close releases all resources Lease keeps for efficient communication
146	// with the etcd server.
147	Close() error
148}
149
150type lessor struct {
151	mu sync.Mutex // guards all fields
152
153	// donec is closed and loopErr is set when recvKeepAliveLoop stops
154	donec   chan struct{}
155	loopErr error
156
157	remote pb.LeaseClient
158
159	stream       pb.Lease_LeaseKeepAliveClient
160	streamCancel context.CancelFunc
161
162	stopCtx    context.Context
163	stopCancel context.CancelFunc
164
165	keepAlives map[LeaseID]*keepAlive
166
167	// firstKeepAliveTimeout is the timeout for the first keepalive request
168	// before the actual TTL is known to the lease client
169	firstKeepAliveTimeout time.Duration
170
171	// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
172	firstKeepAliveOnce sync.Once
173
174	callOpts []grpc.CallOption
175
176	lg *zap.Logger
177}
178
179// keepAlive multiplexes a keepalive for a lease over multiple channels
180type keepAlive struct {
181	chs  []chan<- *LeaseKeepAliveResponse
182	ctxs []context.Context
183	// deadline is the time the keep alive channels close if no response
184	deadline time.Time
185	// nextKeepAlive is when to send the next keep alive message
186	nextKeepAlive time.Time
187	// donec is closed on lease revoke, expiration, or cancel.
188	donec chan struct{}
189}
190
191func NewLease(c *Client) Lease {
192	return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
193}
194
195func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
196	l := &lessor{
197		donec:                 make(chan struct{}),
198		keepAlives:            make(map[LeaseID]*keepAlive),
199		remote:                remote,
200		firstKeepAliveTimeout: keepAliveTimeout,
201		lg:                    c.lg,
202	}
203	if l.firstKeepAliveTimeout == time.Second {
204		l.firstKeepAliveTimeout = defaultTTL
205	}
206	if c != nil {
207		l.callOpts = c.callOpts
208	}
209	reqLeaderCtx := WithRequireLeader(context.Background())
210	l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
211	return l
212}
213
214func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
215	r := &pb.LeaseGrantRequest{TTL: ttl}
216	resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
217	if err == nil {
218		gresp := &LeaseGrantResponse{
219			ResponseHeader: resp.GetHeader(),
220			ID:             LeaseID(resp.ID),
221			TTL:            resp.TTL,
222			Error:          resp.Error,
223		}
224		return gresp, nil
225	}
226	return nil, toErr(ctx, err)
227}
228
229func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
230	r := &pb.LeaseRevokeRequest{ID: int64(id)}
231	resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
232	if err == nil {
233		return (*LeaseRevokeResponse)(resp), nil
234	}
235	return nil, toErr(ctx, err)
236}
237
238func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
239	r := toLeaseTimeToLiveRequest(id, opts...)
240	resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
241	if err == nil {
242		gresp := &LeaseTimeToLiveResponse{
243			ResponseHeader: resp.GetHeader(),
244			ID:             LeaseID(resp.ID),
245			TTL:            resp.TTL,
246			GrantedTTL:     resp.GrantedTTL,
247			Keys:           resp.Keys,
248		}
249		return gresp, nil
250	}
251	return nil, toErr(ctx, err)
252}
253
254func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
255	resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
256	if err == nil {
257		leases := make([]LeaseStatus, len(resp.Leases))
258		for i := range resp.Leases {
259			leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
260		}
261		return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
262	}
263	return nil, toErr(ctx, err)
264}
265
266func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
267	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
268
269	l.mu.Lock()
270	// ensure that recvKeepAliveLoop is still running
271	select {
272	case <-l.donec:
273		err := l.loopErr
274		l.mu.Unlock()
275		close(ch)
276		return ch, ErrKeepAliveHalted{Reason: err}
277	default:
278	}
279	ka, ok := l.keepAlives[id]
280	if !ok {
281		// create fresh keep alive
282		ka = &keepAlive{
283			chs:           []chan<- *LeaseKeepAliveResponse{ch},
284			ctxs:          []context.Context{ctx},
285			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
286			nextKeepAlive: time.Now(),
287			donec:         make(chan struct{}),
288		}
289		l.keepAlives[id] = ka
290	} else {
291		// add channel and context to existing keep alive
292		ka.ctxs = append(ka.ctxs, ctx)
293		ka.chs = append(ka.chs, ch)
294	}
295	l.mu.Unlock()
296
297	go l.keepAliveCtxCloser(ctx, id, ka.donec)
298	l.firstKeepAliveOnce.Do(func() {
299		go l.recvKeepAliveLoop()
300		go l.deadlineLoop()
301	})
302
303	return ch, nil
304}
305
306func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
307	for {
308		resp, err := l.keepAliveOnce(ctx, id)
309		if err == nil {
310			if resp.TTL <= 0 {
311				err = rpctypes.ErrLeaseNotFound
312			}
313			return resp, err
314		}
315		if isHaltErr(ctx, err) {
316			return nil, toErr(ctx, err)
317		}
318	}
319}
320
321func (l *lessor) Close() error {
322	l.stopCancel()
323	// close for synchronous teardown if stream goroutines never launched
324	l.firstKeepAliveOnce.Do(func() { close(l.donec) })
325	<-l.donec
326	return nil
327}
328
329func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
330	select {
331	case <-donec:
332		return
333	case <-l.donec:
334		return
335	case <-ctx.Done():
336	}
337
338	l.mu.Lock()
339	defer l.mu.Unlock()
340
341	ka, ok := l.keepAlives[id]
342	if !ok {
343		return
344	}
345
346	// close channel and remove context if still associated with keep alive
347	for i, c := range ka.ctxs {
348		if c == ctx {
349			close(ka.chs[i])
350			ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
351			ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
352			break
353		}
354	}
355	// remove if no one more listeners
356	if len(ka.chs) == 0 {
357		delete(l.keepAlives, id)
358	}
359}
360
361// closeRequireLeader scans keepAlives for ctxs that have require leader
362// and closes the associated channels.
363func (l *lessor) closeRequireLeader() {
364	l.mu.Lock()
365	defer l.mu.Unlock()
366	for _, ka := range l.keepAlives {
367		reqIdxs := 0
368		// find all required leader channels, close, mark as nil
369		for i, ctx := range ka.ctxs {
370			md, ok := metadata.FromOutgoingContext(ctx)
371			if !ok {
372				continue
373			}
374			ks := md[rpctypes.MetadataRequireLeaderKey]
375			if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
376				continue
377			}
378			close(ka.chs[i])
379			ka.chs[i] = nil
380			reqIdxs++
381		}
382		if reqIdxs == 0 {
383			continue
384		}
385		// remove all channels that required a leader from keepalive
386		newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
387		newCtxs := make([]context.Context, len(newChs))
388		newIdx := 0
389		for i := range ka.chs {
390			if ka.chs[i] == nil {
391				continue
392			}
393			newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
394			newIdx++
395		}
396		ka.chs, ka.ctxs = newChs, newCtxs
397	}
398}
399
400func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
401	cctx, cancel := context.WithCancel(ctx)
402	defer cancel()
403
404	stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
405	if err != nil {
406		return nil, toErr(ctx, err)
407	}
408
409	err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
410	if err != nil {
411		return nil, toErr(ctx, err)
412	}
413
414	resp, rerr := stream.Recv()
415	if rerr != nil {
416		return nil, toErr(ctx, rerr)
417	}
418
419	karesp := &LeaseKeepAliveResponse{
420		ResponseHeader: resp.GetHeader(),
421		ID:             LeaseID(resp.ID),
422		TTL:            resp.TTL,
423	}
424	return karesp, nil
425}
426
427func (l *lessor) recvKeepAliveLoop() (gerr error) {
428	defer func() {
429		l.mu.Lock()
430		close(l.donec)
431		l.loopErr = gerr
432		for _, ka := range l.keepAlives {
433			ka.close()
434		}
435		l.keepAlives = make(map[LeaseID]*keepAlive)
436		l.mu.Unlock()
437	}()
438
439	for {
440		stream, err := l.resetRecv()
441		if err != nil {
442			if canceledByCaller(l.stopCtx, err) {
443				return err
444			}
445		} else {
446			for {
447				resp, err := stream.Recv()
448				if err != nil {
449					if canceledByCaller(l.stopCtx, err) {
450						return err
451					}
452
453					if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
454						l.closeRequireLeader()
455					}
456					break
457				}
458
459				l.recvKeepAlive(resp)
460			}
461		}
462
463		select {
464		case <-time.After(retryConnWait):
465		case <-l.stopCtx.Done():
466			return l.stopCtx.Err()
467		}
468	}
469}
470
471// resetRecv opens a new lease stream and starts sending keep alive requests.
472func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
473	sctx, cancel := context.WithCancel(l.stopCtx)
474	stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
475	if err != nil {
476		cancel()
477		return nil, err
478	}
479
480	l.mu.Lock()
481	defer l.mu.Unlock()
482	if l.stream != nil && l.streamCancel != nil {
483		l.streamCancel()
484	}
485
486	l.streamCancel = cancel
487	l.stream = stream
488
489	go l.sendKeepAliveLoop(stream)
490	return stream, nil
491}
492
493// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
494func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
495	karesp := &LeaseKeepAliveResponse{
496		ResponseHeader: resp.GetHeader(),
497		ID:             LeaseID(resp.ID),
498		TTL:            resp.TTL,
499	}
500
501	l.mu.Lock()
502	defer l.mu.Unlock()
503
504	ka, ok := l.keepAlives[karesp.ID]
505	if !ok {
506		return
507	}
508
509	if karesp.TTL <= 0 {
510		// lease expired; close all keep alive channels
511		delete(l.keepAlives, karesp.ID)
512		ka.close()
513		return
514	}
515
516	// send update to all channels
517	nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
518	ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
519	for _, ch := range ka.chs {
520		select {
521		case ch <- karesp:
522		default:
523			if l.lg != nil {
524				l.lg.Warn("lease keepalive response queue is full; dropping response send",
525					zap.Int("queue-size", len(ch)),
526					zap.Int("queue-capacity", cap(ch)),
527				)
528			}
529		}
530		// still advance in order to rate-limit keep-alive sends
531		ka.nextKeepAlive = nextKeepAlive
532	}
533}
534
535// deadlineLoop reaps any keep alive channels that have not received a response
536// within the lease TTL
537func (l *lessor) deadlineLoop() {
538	for {
539		select {
540		case <-time.After(time.Second):
541		case <-l.donec:
542			return
543		}
544		now := time.Now()
545		l.mu.Lock()
546		for id, ka := range l.keepAlives {
547			if ka.deadline.Before(now) {
548				// waited too long for response; lease may be expired
549				ka.close()
550				delete(l.keepAlives, id)
551			}
552		}
553		l.mu.Unlock()
554	}
555}
556
557// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
558func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
559	for {
560		var tosend []LeaseID
561
562		now := time.Now()
563		l.mu.Lock()
564		for id, ka := range l.keepAlives {
565			if ka.nextKeepAlive.Before(now) {
566				tosend = append(tosend, id)
567			}
568		}
569		l.mu.Unlock()
570
571		for _, id := range tosend {
572			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
573			if err := stream.Send(r); err != nil {
574				// TODO do something with this error?
575				return
576			}
577		}
578
579		select {
580		case <-time.After(retryConnWait):
581		case <-stream.Context().Done():
582			return
583		case <-l.donec:
584			return
585		case <-l.stopCtx.Done():
586			return
587		}
588	}
589}
590
591func (ka *keepAlive) close() {
592	close(ka.donec)
593	for _, ch := range ka.chs {
594		close(ch)
595	}
596}
597