1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package lease
16
17import (
18	"encoding/binary"
19	"errors"
20	"math"
21	"sort"
22	"sync"
23	"time"
24
25	"github.com/coreos/etcd/lease/leasepb"
26	"github.com/coreos/etcd/mvcc/backend"
27)
28
29// NoLease is a special LeaseID representing the absence of a lease.
30const NoLease = LeaseID(0)
31
32// MaxLeaseTTL is the maximum lease TTL value
33const MaxLeaseTTL = 9000000000
34
35var (
36	forever = time.Time{}
37
38	leaseBucketName = []byte("lease")
39
40	// maximum number of leases to revoke per second; configurable for tests
41	leaseRevokeRate = 1000
42
43	ErrNotPrimary       = errors.New("not a primary lessor")
44	ErrLeaseNotFound    = errors.New("lease not found")
45	ErrLeaseExists      = errors.New("lease already exists")
46	ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
47)
48
49// TxnDelete is a TxnWrite that only permits deletes. Defined here
50// to avoid circular dependency with mvcc.
51type TxnDelete interface {
52	DeleteRange(key, end []byte) (n, rev int64)
53	End()
54}
55
56// RangeDeleter is a TxnDelete constructor.
57type RangeDeleter func() TxnDelete
58
59type LeaseID int64
60
61// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
62type Lessor interface {
63	// SetRangeDeleter lets the lessor create TxnDeletes to the store.
64	// Lessor deletes the items in the revoked or expired lease by creating
65	// new TxnDeletes.
66	SetRangeDeleter(rd RangeDeleter)
67
68	// Grant grants a lease that expires at least after TTL seconds.
69	Grant(id LeaseID, ttl int64) (*Lease, error)
70	// Revoke revokes a lease with given ID. The item attached to the
71	// given lease will be removed. If the ID does not exist, an error
72	// will be returned.
73	Revoke(id LeaseID) error
74
75	// Attach attaches given leaseItem to the lease with given LeaseID.
76	// If the lease does not exist, an error will be returned.
77	Attach(id LeaseID, items []LeaseItem) error
78
79	// GetLease returns LeaseID for given item.
80	// If no lease found, NoLease value will be returned.
81	GetLease(item LeaseItem) LeaseID
82
83	// Detach detaches given leaseItem from the lease with given LeaseID.
84	// If the lease does not exist, an error will be returned.
85	Detach(id LeaseID, items []LeaseItem) error
86
87	// Promote promotes the lessor to be the primary lessor. Primary lessor manages
88	// the expiration and renew of leases.
89	// Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
90	Promote(extend time.Duration)
91
92	// Demote demotes the lessor from being the primary lessor.
93	Demote()
94
95	// Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
96	// an error will be returned.
97	Renew(id LeaseID) (int64, error)
98
99	// Lookup gives the lease at a given lease id, if any
100	Lookup(id LeaseID) *Lease
101
102	// Leases lists all leases.
103	Leases() []*Lease
104
105	// ExpiredLeasesC returns a chan that is used to receive expired leases.
106	ExpiredLeasesC() <-chan []*Lease
107
108	// Recover recovers the lessor state from the given backend and RangeDeleter.
109	Recover(b backend.Backend, rd RangeDeleter)
110
111	// Stop stops the lessor for managing leases. The behavior of calling Stop multiple
112	// times is undefined.
113	Stop()
114}
115
116// lessor implements Lessor interface.
117// TODO: use clockwork for testability.
118type lessor struct {
119	mu sync.Mutex
120
121	// demotec is set when the lessor is the primary.
122	// demotec will be closed if the lessor is demoted.
123	demotec chan struct{}
124
125	// TODO: probably this should be a heap with a secondary
126	// id index.
127	// Now it is O(N) to loop over the leases to find expired ones.
128	// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
129	// Renew O(1).
130	// findExpiredLeases and Renew should be the most frequent operations.
131	leaseMap map[LeaseID]*Lease
132
133	itemMap map[LeaseItem]LeaseID
134
135	// When a lease expires, the lessor will delete the
136	// leased range (or key) by the RangeDeleter.
137	rd RangeDeleter
138
139	// backend to persist leases. We only persist lease ID and expiry for now.
140	// The leased items can be recovered by iterating all the keys in kv.
141	b backend.Backend
142
143	// minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
144	// requests for shorter TTLs are extended to the minimum TTL.
145	minLeaseTTL int64
146
147	expiredC chan []*Lease
148	// stopC is a channel whose closure indicates that the lessor should be stopped.
149	stopC chan struct{}
150	// doneC is a channel whose closure indicates that the lessor is stopped.
151	doneC chan struct{}
152}
153
154func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
155	return newLessor(b, minLeaseTTL)
156}
157
158func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
159	l := &lessor{
160		leaseMap:    make(map[LeaseID]*Lease),
161		itemMap:     make(map[LeaseItem]LeaseID),
162		b:           b,
163		minLeaseTTL: minLeaseTTL,
164		// expiredC is a small buffered chan to avoid unnecessary blocking.
165		expiredC: make(chan []*Lease, 16),
166		stopC:    make(chan struct{}),
167		doneC:    make(chan struct{}),
168	}
169	l.initAndRecover()
170
171	go l.runLoop()
172
173	return l
174}
175
176// isPrimary indicates if this lessor is the primary lessor. The primary
177// lessor manages lease expiration and renew.
178//
179// in etcd, raft leader is the primary. Thus there might be two primary
180// leaders at the same time (raft allows concurrent leader but with different term)
181// for at most a leader election timeout.
182// The old primary leader cannot affect the correctness since its proposal has a
183// smaller term and will not be committed.
184//
185// TODO: raft follower do not forward lease management proposals. There might be a
186// very small window (within second normally which depends on go scheduling) that
187// a raft follow is the primary between the raft leader demotion and lessor demotion.
188// Usually this should not be a problem. Lease should not be that sensitive to timing.
189func (le *lessor) isPrimary() bool {
190	return le.demotec != nil
191}
192
193func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
194	le.mu.Lock()
195	defer le.mu.Unlock()
196
197	le.rd = rd
198}
199
200func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
201	if id == NoLease {
202		return nil, ErrLeaseNotFound
203	}
204
205	if ttl > MaxLeaseTTL {
206		return nil, ErrLeaseTTLTooLarge
207	}
208
209	// TODO: when lessor is under high load, it should give out lease
210	// with longer TTL to reduce renew load.
211	l := &Lease{
212		ID:      id,
213		ttl:     ttl,
214		itemSet: make(map[LeaseItem]struct{}),
215		revokec: make(chan struct{}),
216	}
217
218	le.mu.Lock()
219	defer le.mu.Unlock()
220
221	if _, ok := le.leaseMap[id]; ok {
222		return nil, ErrLeaseExists
223	}
224
225	if l.ttl < le.minLeaseTTL {
226		l.ttl = le.minLeaseTTL
227	}
228
229	if le.isPrimary() {
230		l.refresh(0)
231	} else {
232		l.forever()
233	}
234
235	le.leaseMap[id] = l
236	l.persistTo(le.b)
237
238	return l, nil
239}
240
241func (le *lessor) Revoke(id LeaseID) error {
242	le.mu.Lock()
243
244	l := le.leaseMap[id]
245	if l == nil {
246		le.mu.Unlock()
247		return ErrLeaseNotFound
248	}
249	defer close(l.revokec)
250	// unlock before doing external work
251	le.mu.Unlock()
252
253	if le.rd == nil {
254		return nil
255	}
256
257	txn := le.rd()
258
259	// sort keys so deletes are in same order among all members,
260	// otherwise the backened hashes will be different
261	keys := l.Keys()
262	sort.StringSlice(keys).Sort()
263	for _, key := range keys {
264		txn.DeleteRange([]byte(key), nil)
265	}
266
267	le.mu.Lock()
268	defer le.mu.Unlock()
269	delete(le.leaseMap, l.ID)
270	// lease deletion needs to be in the same backend transaction with the
271	// kv deletion. Or we might end up with not executing the revoke or not
272	// deleting the keys if etcdserver fails in between.
273	le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
274
275	txn.End()
276	return nil
277}
278
279// Renew renews an existing lease. If the given lease does not exist or
280// has expired, an error will be returned.
281func (le *lessor) Renew(id LeaseID) (int64, error) {
282	le.mu.Lock()
283
284	unlock := func() { le.mu.Unlock() }
285	defer func() { unlock() }()
286
287	if !le.isPrimary() {
288		// forward renew request to primary instead of returning error.
289		return -1, ErrNotPrimary
290	}
291
292	demotec := le.demotec
293
294	l := le.leaseMap[id]
295	if l == nil {
296		return -1, ErrLeaseNotFound
297	}
298
299	if l.expired() {
300		le.mu.Unlock()
301		unlock = func() {}
302		select {
303		// A expired lease might be pending for revoking or going through
304		// quorum to be revoked. To be accurate, renew request must wait for the
305		// deletion to complete.
306		case <-l.revokec:
307			return -1, ErrLeaseNotFound
308		// The expired lease might fail to be revoked if the primary changes.
309		// The caller will retry on ErrNotPrimary.
310		case <-demotec:
311			return -1, ErrNotPrimary
312		case <-le.stopC:
313			return -1, ErrNotPrimary
314		}
315	}
316
317	l.refresh(0)
318	return l.ttl, nil
319}
320
321func (le *lessor) Lookup(id LeaseID) *Lease {
322	le.mu.Lock()
323	defer le.mu.Unlock()
324	return le.leaseMap[id]
325}
326
327func (le *lessor) unsafeLeases() []*Lease {
328	leases := make([]*Lease, 0, len(le.leaseMap))
329	for _, l := range le.leaseMap {
330		leases = append(leases, l)
331	}
332	sort.Sort(leasesByExpiry(leases))
333	return leases
334}
335
336func (le *lessor) Leases() []*Lease {
337	le.mu.Lock()
338	ls := le.unsafeLeases()
339	le.mu.Unlock()
340	return ls
341}
342
343func (le *lessor) Promote(extend time.Duration) {
344	le.mu.Lock()
345	defer le.mu.Unlock()
346
347	le.demotec = make(chan struct{})
348
349	// refresh the expiries of all leases.
350	for _, l := range le.leaseMap {
351		l.refresh(extend)
352	}
353
354	if len(le.leaseMap) < leaseRevokeRate {
355		// no possibility of lease pile-up
356		return
357	}
358
359	// adjust expiries in case of overlap
360	leases := le.unsafeLeases()
361
362	baseWindow := leases[0].Remaining()
363	nextWindow := baseWindow + time.Second
364	expires := 0
365	// have fewer expires than the total revoke rate so piled up leases
366	// don't consume the entire revoke limit
367	targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
368	for _, l := range leases {
369		remaining := l.Remaining()
370		if remaining > nextWindow {
371			baseWindow = remaining
372			nextWindow = baseWindow + time.Second
373			expires = 1
374			continue
375		}
376		expires++
377		if expires <= targetExpiresPerSecond {
378			continue
379		}
380		rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
381		// If leases are extended by n seconds, leases n seconds ahead of the
382		// base window should be extended by only one second.
383		rateDelay -= float64(remaining - baseWindow)
384		delay := time.Duration(rateDelay)
385		nextWindow = baseWindow + delay
386		l.refresh(delay + extend)
387	}
388}
389
390type leasesByExpiry []*Lease
391
392func (le leasesByExpiry) Len() int           { return len(le) }
393func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
394func (le leasesByExpiry) Swap(i, j int)      { le[i], le[j] = le[j], le[i] }
395
396func (le *lessor) Demote() {
397	le.mu.Lock()
398	defer le.mu.Unlock()
399
400	// set the expiries of all leases to forever
401	for _, l := range le.leaseMap {
402		l.forever()
403	}
404
405	if le.demotec != nil {
406		close(le.demotec)
407		le.demotec = nil
408	}
409}
410
411// Attach attaches items to the lease with given ID. When the lease
412// expires, the attached items will be automatically removed.
413// If the given lease does not exist, an error will be returned.
414func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
415	le.mu.Lock()
416	defer le.mu.Unlock()
417
418	l := le.leaseMap[id]
419	if l == nil {
420		return ErrLeaseNotFound
421	}
422
423	l.mu.Lock()
424	for _, it := range items {
425		l.itemSet[it] = struct{}{}
426		le.itemMap[it] = id
427	}
428	l.mu.Unlock()
429	return nil
430}
431
432func (le *lessor) GetLease(item LeaseItem) LeaseID {
433	le.mu.Lock()
434	id := le.itemMap[item]
435	le.mu.Unlock()
436	return id
437}
438
439// Detach detaches items from the lease with given ID.
440// If the given lease does not exist, an error will be returned.
441func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
442	le.mu.Lock()
443	defer le.mu.Unlock()
444
445	l := le.leaseMap[id]
446	if l == nil {
447		return ErrLeaseNotFound
448	}
449
450	l.mu.Lock()
451	for _, it := range items {
452		delete(l.itemSet, it)
453		delete(le.itemMap, it)
454	}
455	l.mu.Unlock()
456	return nil
457}
458
459func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
460	le.mu.Lock()
461	defer le.mu.Unlock()
462
463	le.b = b
464	le.rd = rd
465	le.leaseMap = make(map[LeaseID]*Lease)
466	le.itemMap = make(map[LeaseItem]LeaseID)
467	le.initAndRecover()
468}
469
470func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
471	return le.expiredC
472}
473
474func (le *lessor) Stop() {
475	close(le.stopC)
476	<-le.doneC
477}
478
479func (le *lessor) runLoop() {
480	defer close(le.doneC)
481
482	for {
483		var ls []*Lease
484
485		// rate limit
486		revokeLimit := leaseRevokeRate / 2
487
488		le.mu.Lock()
489		if le.isPrimary() {
490			ls = le.findExpiredLeases(revokeLimit)
491		}
492		le.mu.Unlock()
493
494		if len(ls) != 0 {
495			select {
496			case <-le.stopC:
497				return
498			case le.expiredC <- ls:
499			default:
500				// the receiver of expiredC is probably busy handling
501				// other stuff
502				// let's try this next time after 500ms
503			}
504		}
505
506		select {
507		case <-time.After(500 * time.Millisecond):
508		case <-le.stopC:
509			return
510		}
511	}
512}
513
514// findExpiredLeases loops leases in the leaseMap until reaching expired limit
515// and returns the expired leases that needed to be revoked.
516func (le *lessor) findExpiredLeases(limit int) []*Lease {
517	leases := make([]*Lease, 0, 16)
518
519	for _, l := range le.leaseMap {
520		// TODO: probably should change to <= 100-500 millisecond to
521		// make up committing latency.
522		if l.expired() {
523			leases = append(leases, l)
524
525			// reach expired limit
526			if len(leases) == limit {
527				break
528			}
529		}
530	}
531
532	return leases
533}
534
535func (le *lessor) initAndRecover() {
536	tx := le.b.BatchTx()
537	tx.Lock()
538
539	tx.UnsafeCreateBucket(leaseBucketName)
540	_, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
541	// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
542	for i := range vs {
543		var lpb leasepb.Lease
544		err := lpb.Unmarshal(vs[i])
545		if err != nil {
546			tx.Unlock()
547			panic("failed to unmarshal lease proto item")
548		}
549		ID := LeaseID(lpb.ID)
550		if lpb.TTL < le.minLeaseTTL {
551			lpb.TTL = le.minLeaseTTL
552		}
553		le.leaseMap[ID] = &Lease{
554			ID:  ID,
555			ttl: lpb.TTL,
556			// itemSet will be filled in when recover key-value pairs
557			// set expiry to forever, refresh when promoted
558			itemSet: make(map[LeaseItem]struct{}),
559			expiry:  forever,
560			revokec: make(chan struct{}),
561		}
562	}
563	tx.Unlock()
564
565	le.b.ForceCommit()
566}
567
568type Lease struct {
569	ID  LeaseID
570	ttl int64 // time to live in seconds
571	// expiryMu protects concurrent accesses to expiry
572	expiryMu sync.RWMutex
573	// expiry is time when lease should expire. no expiration when expiry.IsZero() is true
574	expiry time.Time
575
576	// mu protects concurrent accesses to itemSet
577	mu      sync.RWMutex
578	itemSet map[LeaseItem]struct{}
579	revokec chan struct{}
580}
581
582func (l *Lease) expired() bool {
583	return l.Remaining() <= 0
584}
585
586func (l *Lease) persistTo(b backend.Backend) {
587	key := int64ToBytes(int64(l.ID))
588
589	lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)}
590	val, err := lpb.Marshal()
591	if err != nil {
592		panic("failed to marshal lease proto item")
593	}
594
595	b.BatchTx().Lock()
596	b.BatchTx().UnsafePut(leaseBucketName, key, val)
597	b.BatchTx().Unlock()
598}
599
600// TTL returns the TTL of the Lease.
601func (l *Lease) TTL() int64 {
602	return l.ttl
603}
604
605// refresh refreshes the expiry of the lease.
606func (l *Lease) refresh(extend time.Duration) {
607	newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second)
608	l.expiryMu.Lock()
609	defer l.expiryMu.Unlock()
610	l.expiry = newExpiry
611}
612
613// forever sets the expiry of lease to be forever.
614func (l *Lease) forever() {
615	l.expiryMu.Lock()
616	defer l.expiryMu.Unlock()
617	l.expiry = forever
618}
619
620// Keys returns all the keys attached to the lease.
621func (l *Lease) Keys() []string {
622	l.mu.RLock()
623	keys := make([]string, 0, len(l.itemSet))
624	for k := range l.itemSet {
625		keys = append(keys, k.Key)
626	}
627	l.mu.RUnlock()
628	return keys
629}
630
631// Remaining returns the remaining time of the lease.
632func (l *Lease) Remaining() time.Duration {
633	l.expiryMu.RLock()
634	defer l.expiryMu.RUnlock()
635	if l.expiry.IsZero() {
636		return time.Duration(math.MaxInt64)
637	}
638	return time.Until(l.expiry)
639}
640
641type LeaseItem struct {
642	Key string
643}
644
645func int64ToBytes(n int64) []byte {
646	bytes := make([]byte, 8)
647	binary.BigEndian.PutUint64(bytes, uint64(n))
648	return bytes
649}
650
651// FakeLessor is a fake implementation of Lessor interface.
652// Used for testing only.
653type FakeLessor struct{}
654
655func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
656
657func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
658
659func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
660
661func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
662
663func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID            { return 0 }
664func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
665
666func (fl *FakeLessor) Promote(extend time.Duration) {}
667
668func (fl *FakeLessor) Demote() {}
669
670func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
671
672func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
673
674func (fl *FakeLessor) Leases() []*Lease { return nil }
675
676func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
677
678func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
679
680func (fl *FakeLessor) Stop() {}
681