1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"fmt"
19	"testing"
20	"time"
21
22	"golang.org/x/net/context"
23	"google.golang.org/grpc"
24	"google.golang.org/grpc/metadata"
25
26	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
27	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
28	"github.com/coreos/etcd/mvcc/mvccpb"
29	"github.com/coreos/etcd/pkg/testutil"
30)
31
32// TestV3LeasePrmote ensures the newly elected leader can promote itself
33// to the primary lessor, refresh the leases and start to manage leases.
34// TODO: use customized clock to make this test go faster?
35func TestV3LeasePrmote(t *testing.T) {
36	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
37	defer clus.Terminate(t)
38
39	// create lease
40	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 5})
41	if err != nil {
42		t.Fatal(err)
43	}
44	if lresp.Error != "" {
45		t.Fatal(lresp.Error)
46	}
47
48	// wait until the lease is going to expire.
49	time.Sleep(time.Duration(lresp.TTL-1) * time.Second)
50
51	// kill the current leader, all leases should be refreshed.
52	toStop := clus.waitLeader(t, clus.Members)
53	clus.Members[toStop].Stop(t)
54
55	var toWait []*member
56	for i, m := range clus.Members {
57		if i != toStop {
58			toWait = append(toWait, m)
59		}
60	}
61	clus.waitLeader(t, toWait)
62	clus.Members[toStop].Restart(t)
63	clus.waitLeader(t, clus.Members)
64
65	// ensure lease is refreshed by waiting for a "long" time.
66	// it was going to expire anyway.
67	time.Sleep(3 * time.Second)
68
69	if !leaseExist(t, clus, lresp.ID) {
70		t.Error("unexpected lease not exists")
71	}
72
73	// let lease expires. total lease = 5 seconds and we already
74	// waits for 3 seconds, so 3 seconds more is enough.
75	time.Sleep(3 * time.Second)
76	if leaseExist(t, clus, lresp.ID) {
77		t.Error("unexpected lease exists")
78	}
79}
80
81// TestV3LeaseRevoke ensures a key is deleted once its lease is revoked.
82func TestV3LeaseRevoke(t *testing.T) {
83	defer testutil.AfterTest(t)
84	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
85		lc := toGRPC(clus.RandClient()).Lease
86		_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
87		return err
88	})
89}
90
91// TestV3LeaseGrantById ensures leases may be created by a given id.
92func TestV3LeaseGrantByID(t *testing.T) {
93	defer testutil.AfterTest(t)
94	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
95	defer clus.Terminate(t)
96
97	// create fixed lease
98	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
99		context.TODO(),
100		&pb.LeaseGrantRequest{ID: 1, TTL: 1})
101	if err != nil {
102		t.Errorf("could not create lease 1 (%v)", err)
103	}
104	if lresp.ID != 1 {
105		t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
106	}
107
108	// create duplicate fixed lease
109	_, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
110		context.TODO(),
111		&pb.LeaseGrantRequest{ID: 1, TTL: 1})
112	if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
113		t.Error(err)
114	}
115
116	// create fresh fixed lease
117	lresp, err = toGRPC(clus.RandClient()).Lease.LeaseGrant(
118		context.TODO(),
119		&pb.LeaseGrantRequest{ID: 2, TTL: 1})
120	if err != nil {
121		t.Errorf("could not create lease 2 (%v)", err)
122	}
123	if lresp.ID != 2 {
124		t.Errorf("got id %v, wanted id %v", lresp.ID, 2)
125	}
126}
127
128// TestV3LeaseExpire ensures a key is deleted once a key expires.
129func TestV3LeaseExpire(t *testing.T) {
130	defer testutil.AfterTest(t)
131	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
132		// let lease lapse; wait for deleted key
133
134		ctx, cancel := context.WithCancel(context.Background())
135		defer cancel()
136		wStream, err := toGRPC(clus.RandClient()).Watch.Watch(ctx)
137		if err != nil {
138			return err
139		}
140
141		wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{
142			CreateRequest: &pb.WatchCreateRequest{
143				Key: []byte("foo"), StartRevision: 1}}}
144		if err := wStream.Send(wreq); err != nil {
145			return err
146		}
147		if _, err := wStream.Recv(); err != nil {
148			// the 'created' message
149			return err
150		}
151		if _, err := wStream.Recv(); err != nil {
152			// the 'put' message
153			return err
154		}
155
156		errc := make(chan error, 1)
157		go func() {
158			resp, err := wStream.Recv()
159			switch {
160			case err != nil:
161				errc <- err
162			case len(resp.Events) != 1:
163				fallthrough
164			case resp.Events[0].Type != mvccpb.DELETE:
165				errc <- fmt.Errorf("expected key delete, got %v", resp)
166			default:
167				errc <- nil
168			}
169		}()
170
171		select {
172		case <-time.After(15 * time.Second):
173			return fmt.Errorf("lease expiration too slow")
174		case err := <-errc:
175			return err
176		}
177	})
178}
179
180// TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
181func TestV3LeaseKeepAlive(t *testing.T) {
182	defer testutil.AfterTest(t)
183	testLeaseRemoveLeasedKey(t, func(clus *ClusterV3, leaseID int64) error {
184		lc := toGRPC(clus.RandClient()).Lease
185		lreq := &pb.LeaseKeepAliveRequest{ID: leaseID}
186		ctx, cancel := context.WithCancel(context.Background())
187		defer cancel()
188		lac, err := lc.LeaseKeepAlive(ctx)
189		if err != nil {
190			return err
191		}
192		defer lac.CloseSend()
193
194		// renew long enough so lease would've expired otherwise
195		for i := 0; i < 3; i++ {
196			if err = lac.Send(lreq); err != nil {
197				return err
198			}
199			lresp, rxerr := lac.Recv()
200			if rxerr != nil {
201				return rxerr
202			}
203			if lresp.ID != leaseID {
204				return fmt.Errorf("expected lease ID %v, got %v", leaseID, lresp.ID)
205			}
206			time.Sleep(time.Duration(lresp.TTL/2) * time.Second)
207		}
208		_, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseID})
209		return err
210	})
211}
212
213// TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster.
214func TestV3LeaseExists(t *testing.T) {
215	defer testutil.AfterTest(t)
216	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
217	defer clus.Terminate(t)
218
219	// create lease
220	ctx0, cancel0 := context.WithCancel(context.Background())
221	defer cancel0()
222	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
223		ctx0,
224		&pb.LeaseGrantRequest{TTL: 30})
225	if err != nil {
226		t.Fatal(err)
227	}
228	if lresp.Error != "" {
229		t.Fatal(lresp.Error)
230	}
231
232	if !leaseExist(t, clus, lresp.ID) {
233		t.Error("unexpected lease not exists")
234	}
235}
236
237// TestV3LeaseRenewStress keeps creating lease and renewing it immediately to ensure the renewal goes through.
238// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
239// related issue https://github.com/coreos/etcd/issues/6978
240func TestV3LeaseRenewStress(t *testing.T) {
241	testLeaseStress(t, stressLeaseRenew)
242}
243
244// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
245// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
246// related issue https://github.com/coreos/etcd/issues/6978
247func TestV3LeaseTimeToLiveStress(t *testing.T) {
248	testLeaseStress(t, stressLeaseTimeToLive)
249}
250
251func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
252	defer testutil.AfterTest(t)
253	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
254	defer clus.Terminate(t)
255
256	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
257	defer cancel()
258	errc := make(chan error)
259
260	for i := 0; i < 30; i++ {
261		for j := 0; j < 3; j++ {
262			go func(i int) { errc <- stresser(ctx, toGRPC(clus.Client(i)).Lease) }(j)
263		}
264	}
265
266	for i := 0; i < 90; i++ {
267		if err := <-errc; err != nil {
268			t.Fatal(err)
269		}
270	}
271}
272
273func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
274	defer func() {
275		if tctx.Err() != nil {
276			reterr = nil
277		}
278	}()
279	lac, err := lc.LeaseKeepAlive(tctx)
280	if err != nil {
281		return err
282	}
283	for tctx.Err() == nil {
284		resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
285		if gerr != nil {
286			continue
287		}
288		err = lac.Send(&pb.LeaseKeepAliveRequest{ID: resp.ID})
289		if err != nil {
290			continue
291		}
292		rresp, rxerr := lac.Recv()
293		if rxerr != nil {
294			continue
295		}
296		if rresp.TTL == 0 {
297			return fmt.Errorf("TTL shouldn't be 0 so soon")
298		}
299	}
300	return nil
301}
302
303func stressLeaseTimeToLive(tctx context.Context, lc pb.LeaseClient) (reterr error) {
304	defer func() {
305		if tctx.Err() != nil {
306			reterr = nil
307		}
308	}()
309	for tctx.Err() == nil {
310		resp, gerr := lc.LeaseGrant(tctx, &pb.LeaseGrantRequest{TTL: 60})
311		if gerr != nil {
312			continue
313		}
314		_, kerr := lc.LeaseTimeToLive(tctx, &pb.LeaseTimeToLiveRequest{ID: resp.ID})
315		if rpctypes.Error(kerr) == rpctypes.ErrLeaseNotFound {
316			return kerr
317		}
318	}
319	return nil
320}
321
322func TestV3PutOnNonExistLease(t *testing.T) {
323	defer testutil.AfterTest(t)
324	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
325	defer clus.Terminate(t)
326
327	ctx, cancel := context.WithCancel(context.Background())
328	defer cancel()
329
330	badLeaseID := int64(0x12345678)
331	putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: badLeaseID}
332	_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
333	if !eqErrGRPC(err, rpctypes.ErrGRPCLeaseNotFound) {
334		t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCLeaseNotFound)
335	}
336}
337
338// TestV3GetNonExistLease ensures client retrieving nonexistent lease on a follower doesn't result node panic
339// related issue https://github.com/coreos/etcd/issues/6537
340func TestV3GetNonExistLease(t *testing.T) {
341	defer testutil.AfterTest(t)
342	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
343	defer clus.Terminate(t)
344
345	ctx, cancel := context.WithCancel(context.Background())
346	defer cancel()
347	lc := toGRPC(clus.RandClient()).Lease
348	lresp, err := lc.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 10})
349	if err != nil {
350		t.Errorf("failed to create lease %v", err)
351	}
352	_, err = lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
353	if err != nil {
354		t.Fatal(err)
355	}
356
357	leaseTTLr := &pb.LeaseTimeToLiveRequest{
358		ID:   lresp.ID,
359		Keys: true,
360	}
361
362	for _, client := range clus.clients {
363		// quorum-read to ensure revoke completes before TimeToLive
364		if _, err := toGRPC(client).KV.Range(ctx, &pb.RangeRequest{Key: []byte("_")}); err != nil {
365			t.Fatal(err)
366		}
367		resp, err := toGRPC(client).Lease.LeaseTimeToLive(ctx, leaseTTLr)
368		if err != nil {
369			t.Fatalf("expected non nil error, but go %v", err)
370		}
371		if resp.TTL != -1 {
372			t.Fatalf("expected TTL to be -1, but got %v", resp.TTL)
373		}
374	}
375}
376
377// TestV3LeaseSwitch tests a key can be switched from one lease to another.
378func TestV3LeaseSwitch(t *testing.T) {
379	defer testutil.AfterTest(t)
380	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
381	defer clus.Terminate(t)
382
383	key := "foo"
384
385	// create lease
386	ctx, cancel := context.WithCancel(context.Background())
387	defer cancel()
388	lresp1, err1 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
389	if err1 != nil {
390		t.Fatal(err1)
391	}
392	lresp2, err2 := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: 30})
393	if err2 != nil {
394		t.Fatal(err2)
395	}
396
397	// attach key on lease1 then switch it to lease2
398	put1 := &pb.PutRequest{Key: []byte(key), Lease: lresp1.ID}
399	_, err := toGRPC(clus.RandClient()).KV.Put(ctx, put1)
400	if err != nil {
401		t.Fatal(err)
402	}
403	put2 := &pb.PutRequest{Key: []byte(key), Lease: lresp2.ID}
404	_, err = toGRPC(clus.RandClient()).KV.Put(ctx, put2)
405	if err != nil {
406		t.Fatal(err)
407	}
408
409	// revoke lease1 should not remove key
410	_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp1.ID})
411	if err != nil {
412		t.Fatal(err)
413	}
414	rreq := &pb.RangeRequest{Key: []byte("foo")}
415	rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
416	if err != nil {
417		t.Fatal(err)
418	}
419	if len(rresp.Kvs) != 1 {
420		t.Fatalf("unexpect removal of key")
421	}
422
423	// revoke lease2 should remove key
424	_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp2.ID})
425	if err != nil {
426		t.Fatal(err)
427	}
428	rresp, err = toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
429	if err != nil {
430		t.Fatal(err)
431	}
432	if len(rresp.Kvs) != 0 {
433		t.Fatalf("lease removed but key remains")
434	}
435}
436
437// TestV3LeaseFailover ensures the old leader drops lease keepalive requests within
438// election timeout after it loses its quorum. And the new leader extends the TTL of
439// the lease to at least TTL + election timeout.
440func TestV3LeaseFailover(t *testing.T) {
441	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
442	defer clus.Terminate(t)
443
444	toIsolate := clus.waitLeader(t, clus.Members)
445
446	lc := toGRPC(clus.Client(toIsolate)).Lease
447
448	// create lease
449	lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 5})
450	if err != nil {
451		t.Fatal(err)
452	}
453	if lresp.Error != "" {
454		t.Fatal(lresp.Error)
455	}
456
457	// isolate the current leader with its followers.
458	clus.Members[toIsolate].Pause()
459
460	lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
461
462	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
463	mctx := metadata.NewOutgoingContext(context.Background(), md)
464	ctx, cancel := context.WithCancel(mctx)
465	defer cancel()
466	lac, err := lc.LeaseKeepAlive(ctx)
467	if err != nil {
468		t.Fatal(err)
469	}
470	defer lac.CloseSend()
471
472	// send keep alive to old leader until the old leader starts
473	// to drop lease request.
474	var expectedExp time.Time
475	for {
476		if err = lac.Send(lreq); err != nil {
477			break
478		}
479		lkresp, rxerr := lac.Recv()
480		if rxerr != nil {
481			break
482		}
483		expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
484		time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
485	}
486
487	clus.Members[toIsolate].Resume()
488	clus.waitLeader(t, clus.Members)
489
490	// lease should not expire at the last received expire deadline.
491	time.Sleep(time.Until(expectedExp) - 500*time.Millisecond)
492
493	if !leaseExist(t, clus, lresp.ID) {
494		t.Error("unexpected lease not exists")
495	}
496}
497
498// TestV3LeaseRequireLeader ensures that a Recv will get a leader
499// loss error if there is no leader.
500func TestV3LeaseRequireLeader(t *testing.T) {
501	defer testutil.AfterTest(t)
502
503	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
504	defer clus.Terminate(t)
505
506	lc := toGRPC(clus.Client(0)).Lease
507	clus.Members[1].Stop(t)
508	clus.Members[2].Stop(t)
509
510	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
511	mctx := metadata.NewOutgoingContext(context.Background(), md)
512	ctx, cancel := context.WithCancel(mctx)
513	defer cancel()
514	lac, err := lc.LeaseKeepAlive(ctx)
515	if err != nil {
516		t.Fatal(err)
517	}
518
519	donec := make(chan struct{})
520	go func() {
521		defer close(donec)
522		resp, err := lac.Recv()
523		if err == nil {
524			t.Fatalf("got response %+v, expected error", resp)
525		}
526		if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
527			t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
528		}
529	}()
530	select {
531	case <-time.After(5 * time.Second):
532		t.Fatal("did not receive leader loss error (in 5-sec)")
533	case <-donec:
534	}
535}
536
537const fiveMinTTL int64 = 300
538
539// TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key.
540func TestV3LeaseRecoverAndRevoke(t *testing.T) {
541	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
542	defer clus.Terminate(t)
543
544	kvc := toGRPC(clus.Client(0)).KV
545	lsc := toGRPC(clus.Client(0)).Lease
546
547	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
548	if err != nil {
549		t.Fatal(err)
550	}
551	if lresp.Error != "" {
552		t.Fatal(lresp.Error)
553	}
554	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
555	if err != nil {
556		t.Fatal(err)
557	}
558
559	// restart server and ensure lease still exists
560	clus.Members[0].Stop(t)
561	clus.Members[0].Restart(t)
562	clus.waitLeader(t, clus.Members)
563
564	// overwrite old client with newly dialed connection
565	// otherwise, error with "grpc: RPC failed fast due to transport failure"
566	nc, err := NewClientV3(clus.Members[0])
567	if err != nil {
568		t.Fatal(err)
569	}
570	kvc = toGRPC(nc).KV
571	lsc = toGRPC(nc).Lease
572	defer nc.Close()
573
574	// revoke should delete the key
575	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
576	if err != nil {
577		t.Fatal(err)
578	}
579	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
580	if err != nil {
581		t.Fatal(err)
582	}
583	if len(rresp.Kvs) != 0 {
584		t.Fatalf("lease removed but key remains")
585	}
586}
587
588// TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart.
589func TestV3LeaseRevokeAndRecover(t *testing.T) {
590	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
591	defer clus.Terminate(t)
592
593	kvc := toGRPC(clus.Client(0)).KV
594	lsc := toGRPC(clus.Client(0)).Lease
595
596	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
597	if err != nil {
598		t.Fatal(err)
599	}
600	if lresp.Error != "" {
601		t.Fatal(lresp.Error)
602	}
603	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
604	if err != nil {
605		t.Fatal(err)
606	}
607
608	// revoke should delete the key
609	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
610	if err != nil {
611		t.Fatal(err)
612	}
613
614	// restart server and ensure revoked key doesn't exist
615	clus.Members[0].Stop(t)
616	clus.Members[0].Restart(t)
617	clus.waitLeader(t, clus.Members)
618
619	// overwrite old client with newly dialed connection
620	// otherwise, error with "grpc: RPC failed fast due to transport failure"
621	nc, err := NewClientV3(clus.Members[0])
622	if err != nil {
623		t.Fatal(err)
624	}
625	kvc = toGRPC(nc).KV
626	defer nc.Close()
627
628	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
629	if err != nil {
630		t.Fatal(err)
631	}
632	if len(rresp.Kvs) != 0 {
633		t.Fatalf("lease removed but key remains")
634	}
635}
636
637// TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart
638// does not delete the key.
639func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) {
640	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
641	defer clus.Terminate(t)
642
643	kvc := toGRPC(clus.Client(0)).KV
644	lsc := toGRPC(clus.Client(0)).Lease
645
646	lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
647	if err != nil {
648		t.Fatal(err)
649	}
650	if lresp.Error != "" {
651		t.Fatal(lresp.Error)
652	}
653	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
654	if err != nil {
655		t.Fatal(err)
656	}
657
658	// overwrite lease with none
659	_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
660	if err != nil {
661		t.Fatal(err)
662	}
663
664	// restart server and ensure lease still exists
665	clus.Members[0].Stop(t)
666	clus.Members[0].Restart(t)
667	clus.waitLeader(t, clus.Members)
668
669	// overwrite old client with newly dialed connection
670	// otherwise, error with "grpc: RPC failed fast due to transport failure"
671	nc, err := NewClientV3(clus.Members[0])
672	if err != nil {
673		t.Fatal(err)
674	}
675	kvc = toGRPC(nc).KV
676	lsc = toGRPC(nc).Lease
677	defer nc.Close()
678
679	// revoke the detached lease
680	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
681	if err != nil {
682		t.Fatal(err)
683	}
684	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
685	if err != nil {
686		t.Fatal(err)
687	}
688	if len(rresp.Kvs) != 1 {
689		t.Fatalf("only detached lease removed, key should remain")
690	}
691}
692
693func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
694	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
695	defer clus.Terminate(t)
696
697	kvc := toGRPC(clus.Client(0)).KV
698	lsc := toGRPC(clus.Client(0)).Lease
699
700	var leaseIDs []int64
701	for i := 0; i < 2; i++ {
702		lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL})
703		if err != nil {
704			t.Fatal(err)
705		}
706		if lresp.Error != "" {
707			t.Fatal(lresp.Error)
708		}
709		leaseIDs = append(leaseIDs, lresp.ID)
710
711		_, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID})
712		if err != nil {
713			t.Fatal(err)
714		}
715	}
716
717	// restart server and ensure lease still exists
718	clus.Members[0].Stop(t)
719	clus.Members[0].Restart(t)
720	clus.waitLeader(t, clus.Members)
721	for i, leaseID := range leaseIDs {
722		if !leaseExist(t, clus, leaseID) {
723			t.Errorf("#%d: unexpected lease not exists", i)
724		}
725	}
726
727	// overwrite old client with newly dialed connection
728	// otherwise, error with "grpc: RPC failed fast due to transport failure"
729	nc, err := NewClientV3(clus.Members[0])
730	if err != nil {
731		t.Fatal(err)
732	}
733	kvc = toGRPC(nc).KV
734	lsc = toGRPC(nc).Lease
735	defer nc.Close()
736
737	// revoke the old lease
738	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[0]})
739	if err != nil {
740		t.Fatal(err)
741	}
742	// key should still exist
743	rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
744	if err != nil {
745		t.Fatal(err)
746	}
747	if len(rresp.Kvs) != 1 {
748		t.Fatalf("only detached lease removed, key should remain")
749	}
750
751	// revoke the latest lease
752	_, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[1]})
753	if err != nil {
754		t.Fatal(err)
755	}
756	rresp, err = kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
757	if err != nil {
758		t.Fatal(err)
759	}
760	if len(rresp.Kvs) != 0 {
761		t.Fatalf("lease removed but key remains")
762	}
763}
764
765// acquireLeaseAndKey creates a new lease and creates an attached key.
766func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
767	// create lease
768	lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(
769		context.TODO(),
770		&pb.LeaseGrantRequest{TTL: 1})
771	if err != nil {
772		return 0, err
773	}
774	if lresp.Error != "" {
775		return 0, fmt.Errorf(lresp.Error)
776	}
777	// attach to key
778	put := &pb.PutRequest{Key: []byte(key), Lease: lresp.ID}
779	if _, err := toGRPC(clus.RandClient()).KV.Put(context.TODO(), put); err != nil {
780		return 0, err
781	}
782	return lresp.ID, nil
783}
784
785// testLeaseRemoveLeasedKey performs some action while holding a lease with an
786// attached key "foo", then confirms the key is gone.
787func testLeaseRemoveLeasedKey(t *testing.T, act func(*ClusterV3, int64) error) {
788	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
789	defer clus.Terminate(t)
790
791	leaseID, err := acquireLeaseAndKey(clus, "foo")
792	if err != nil {
793		t.Fatal(err)
794	}
795
796	if err = act(clus, leaseID); err != nil {
797		t.Fatal(err)
798	}
799
800	// confirm no key
801	rreq := &pb.RangeRequest{Key: []byte("foo")}
802	rresp, err := toGRPC(clus.RandClient()).KV.Range(context.TODO(), rreq)
803	if err != nil {
804		t.Fatal(err)
805	}
806	if len(rresp.Kvs) != 0 {
807		t.Fatalf("lease removed but key remains")
808	}
809}
810
811func leaseExist(t *testing.T, clus *ClusterV3, leaseID int64) bool {
812	l := toGRPC(clus.RandClient()).Lease
813
814	_, err := l.LeaseGrant(context.Background(), &pb.LeaseGrantRequest{ID: leaseID, TTL: 5})
815	if err == nil {
816		_, err = l.LeaseRevoke(context.Background(), &pb.LeaseRevokeRequest{ID: leaseID})
817		if err != nil {
818			t.Fatalf("failed to check lease %v", err)
819		}
820		return false
821	}
822
823	if eqErrGRPC(err, rpctypes.ErrGRPCLeaseExist) {
824		return true
825	}
826	t.Fatalf("unexpecter error %v", err)
827
828	return true
829}
830