1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"bytes"
19	"context"
20	"fmt"
21	"io/ioutil"
22	"math/rand"
23	"os"
24	"reflect"
25	"testing"
26	"time"
27
28	"github.com/coreos/etcd/clientv3"
29	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
30	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
31	"github.com/coreos/etcd/pkg/testutil"
32	"github.com/coreos/etcd/pkg/transport"
33
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/status"
38)
39
40// TestV3PutOverwrite puts a key with the v3 api to a random cluster member,
41// overwrites it, then checks that the change was applied.
42func TestV3PutOverwrite(t *testing.T) {
43	defer testutil.AfterTest(t)
44	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
45	defer clus.Terminate(t)
46
47	kvc := toGRPC(clus.RandClient()).KV
48	key := []byte("foo")
49	reqput := &pb.PutRequest{Key: key, Value: []byte("bar"), PrevKv: true}
50
51	respput, err := kvc.Put(context.TODO(), reqput)
52	if err != nil {
53		t.Fatalf("couldn't put key (%v)", err)
54	}
55
56	// overwrite
57	reqput.Value = []byte("baz")
58	respput2, err := kvc.Put(context.TODO(), reqput)
59	if err != nil {
60		t.Fatalf("couldn't put key (%v)", err)
61	}
62	if respput2.Header.Revision <= respput.Header.Revision {
63		t.Fatalf("expected newer revision on overwrite, got %v <= %v",
64			respput2.Header.Revision, respput.Header.Revision)
65	}
66	if pkv := respput2.PrevKv; pkv == nil || string(pkv.Value) != "bar" {
67		t.Fatalf("expected PrevKv=bar, got response %+v", respput2)
68	}
69
70	reqrange := &pb.RangeRequest{Key: key}
71	resprange, err := kvc.Range(context.TODO(), reqrange)
72	if err != nil {
73		t.Fatalf("couldn't get key (%v)", err)
74	}
75	if len(resprange.Kvs) != 1 {
76		t.Fatalf("expected 1 key, got %v", len(resprange.Kvs))
77	}
78
79	kv := resprange.Kvs[0]
80	if kv.ModRevision <= kv.CreateRevision {
81		t.Errorf("expected modRev > createRev, got %d <= %d",
82			kv.ModRevision, kv.CreateRevision)
83	}
84	if !reflect.DeepEqual(reqput.Value, kv.Value) {
85		t.Errorf("expected value %v, got %v", reqput.Value, kv.Value)
86	}
87}
88
89// TestPutRestart checks if a put after an unrelated member restart succeeds
90func TestV3PutRestart(t *testing.T) {
91	defer testutil.AfterTest(t)
92	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
93	defer clus.Terminate(t)
94
95	kvIdx := rand.Intn(3)
96	kvc := toGRPC(clus.Client(kvIdx)).KV
97
98	stopIdx := kvIdx
99	for stopIdx == kvIdx {
100		stopIdx = rand.Intn(3)
101	}
102
103	clus.clients[stopIdx].Close()
104	clus.Members[stopIdx].Stop(t)
105	clus.Members[stopIdx].Restart(t)
106	c, cerr := NewClientV3(clus.Members[stopIdx])
107	if cerr != nil {
108		t.Fatalf("cannot create client: %v", cerr)
109	}
110	clus.clients[stopIdx] = c
111
112	ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
113	defer cancel()
114	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
115	_, err := kvc.Put(ctx, reqput)
116	if err != nil && err == ctx.Err() {
117		t.Fatalf("expected grpc error, got local ctx error (%v)", err)
118	}
119}
120
121// TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
122func TestV3CompactCurrentRev(t *testing.T) {
123	defer testutil.AfterTest(t)
124	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
125	defer clus.Terminate(t)
126
127	kvc := toGRPC(clus.RandClient()).KV
128	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
129	for i := 0; i < 3; i++ {
130		if _, err := kvc.Put(context.Background(), preq); err != nil {
131			t.Fatalf("couldn't put key (%v)", err)
132		}
133	}
134	// get key to add to proxy cache, if any
135	if _, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}); err != nil {
136		t.Fatal(err)
137	}
138	// compact on current revision
139	_, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 4})
140	if err != nil {
141		t.Fatalf("couldn't compact kv space (%v)", err)
142	}
143	// key still exists when linearized?
144	_, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo")})
145	if err != nil {
146		t.Fatalf("couldn't get key after compaction (%v)", err)
147	}
148	// key still exists when serialized?
149	_, err = kvc.Range(context.Background(), &pb.RangeRequest{Key: []byte("foo"), Serializable: true})
150	if err != nil {
151		t.Fatalf("couldn't get serialized key after compaction (%v)", err)
152	}
153}
154
155// TestV3HashKV ensures that multiple calls of HashKV on same node return same hash and compact rev.
156func TestV3HashKV(t *testing.T) {
157	defer testutil.AfterTest(t)
158	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
159	defer clus.Terminate(t)
160
161	kvc := toGRPC(clus.RandClient()).KV
162	mvc := toGRPC(clus.RandClient()).Maintenance
163
164	for i := 0; i < 10; i++ {
165		resp, err := kvc.Put(context.Background(), &pb.PutRequest{Key: []byte("foo"), Value: []byte(fmt.Sprintf("bar%d", i))})
166		if err != nil {
167			t.Fatal(err)
168		}
169
170		rev := resp.Header.Revision
171		hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{})
172		if err != nil {
173			t.Fatal(err)
174		}
175		if rev != hresp.Header.Revision {
176			t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
177		}
178
179		prevHash := hresp.Hash
180		prevCompactRev := hresp.CompactRevision
181		for i := 0; i < 10; i++ {
182			hresp, err := mvc.HashKV(context.Background(), &pb.HashKVRequest{})
183			if err != nil {
184				t.Fatal(err)
185			}
186			if rev != hresp.Header.Revision {
187				t.Fatalf("Put rev %v != HashKV rev %v", rev, hresp.Header.Revision)
188			}
189
190			if prevHash != hresp.Hash {
191				t.Fatalf("prevHash %v != Hash %v", prevHash, hresp.Hash)
192			}
193
194			if prevCompactRev != hresp.CompactRevision {
195				t.Fatalf("prevCompactRev %v != CompactRevision %v", prevHash, hresp.Hash)
196			}
197
198			prevHash = hresp.Hash
199			prevCompactRev = hresp.CompactRevision
200		}
201	}
202}
203
204func TestV3TxnTooManyOps(t *testing.T) {
205	defer testutil.AfterTest(t)
206	maxTxnOps := uint(128)
207	clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps})
208	defer clus.Terminate(t)
209
210	kvc := toGRPC(clus.RandClient()).KV
211
212	// unique keys
213	i := new(int)
214	keyf := func() []byte {
215		*i++
216		return []byte(fmt.Sprintf("key-%d", i))
217	}
218
219	addCompareOps := func(txn *pb.TxnRequest) {
220		txn.Compare = append(txn.Compare,
221			&pb.Compare{
222				Result: pb.Compare_GREATER,
223				Target: pb.Compare_CREATE,
224				Key:    keyf(),
225			})
226	}
227	addSuccessOps := func(txn *pb.TxnRequest) {
228		txn.Success = append(txn.Success,
229			&pb.RequestOp{
230				Request: &pb.RequestOp_RequestPut{
231					RequestPut: &pb.PutRequest{
232						Key:   keyf(),
233						Value: []byte("bar"),
234					},
235				},
236			})
237	}
238	addFailureOps := func(txn *pb.TxnRequest) {
239		txn.Failure = append(txn.Failure,
240			&pb.RequestOp{
241				Request: &pb.RequestOp_RequestPut{
242					RequestPut: &pb.PutRequest{
243						Key:   keyf(),
244						Value: []byte("bar"),
245					},
246				},
247			})
248	}
249	addTxnOps := func(txn *pb.TxnRequest) {
250		newTxn := &pb.TxnRequest{}
251		addSuccessOps(newTxn)
252		txn.Success = append(txn.Success,
253			&pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
254				RequestTxn: newTxn,
255			},
256			},
257		)
258	}
259
260	tests := []func(txn *pb.TxnRequest){
261		addCompareOps,
262		addSuccessOps,
263		addFailureOps,
264		addTxnOps,
265	}
266
267	for i, tt := range tests {
268		txn := &pb.TxnRequest{}
269		for j := 0; j < int(maxTxnOps+1); j++ {
270			tt(txn)
271		}
272
273		_, err := kvc.Txn(context.Background(), txn)
274		if !eqErrGRPC(err, rpctypes.ErrGRPCTooManyOps) {
275			t.Errorf("#%d: err = %v, want %v", i, err, rpctypes.ErrGRPCTooManyOps)
276		}
277	}
278}
279
280func TestV3TxnDuplicateKeys(t *testing.T) {
281	defer testutil.AfterTest(t)
282	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
283	defer clus.Terminate(t)
284
285	putreq := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}}}
286	delKeyReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
287		RequestDeleteRange: &pb.DeleteRangeRequest{
288			Key: []byte("abc"),
289		},
290	},
291	}
292	delInRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
293		RequestDeleteRange: &pb.DeleteRangeRequest{
294			Key: []byte("a"), RangeEnd: []byte("b"),
295		},
296	},
297	}
298	delOutOfRangeReq := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{
299		RequestDeleteRange: &pb.DeleteRangeRequest{
300			Key: []byte("abb"), RangeEnd: []byte("abc"),
301		},
302	},
303	}
304	txnDelReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
305		RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{delInRangeReq}},
306	},
307	}
308	txnDelReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
309		RequestTxn: &pb.TxnRequest{
310			Success: []*pb.RequestOp{delInRangeReq},
311			Failure: []*pb.RequestOp{delInRangeReq}},
312	},
313	}
314
315	txnPutReq := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
316		RequestTxn: &pb.TxnRequest{Success: []*pb.RequestOp{putreq}},
317	},
318	}
319	txnPutReqTwoSide := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{
320		RequestTxn: &pb.TxnRequest{
321			Success: []*pb.RequestOp{putreq},
322			Failure: []*pb.RequestOp{putreq}},
323	},
324	}
325
326	kvc := toGRPC(clus.RandClient()).KV
327	tests := []struct {
328		txnSuccess []*pb.RequestOp
329
330		werr error
331	}{
332		{
333			txnSuccess: []*pb.RequestOp{putreq, putreq},
334
335			werr: rpctypes.ErrGRPCDuplicateKey,
336		},
337		{
338			txnSuccess: []*pb.RequestOp{putreq, delKeyReq},
339
340			werr: rpctypes.ErrGRPCDuplicateKey,
341		},
342		{
343			txnSuccess: []*pb.RequestOp{putreq, delInRangeReq},
344
345			werr: rpctypes.ErrGRPCDuplicateKey,
346		},
347		// Then(Put(a), Then(Del(a)))
348		{
349			txnSuccess: []*pb.RequestOp{putreq, txnDelReq},
350
351			werr: rpctypes.ErrGRPCDuplicateKey,
352		},
353		// Then(Del(a), Then(Put(a)))
354		{
355			txnSuccess: []*pb.RequestOp{delInRangeReq, txnPutReq},
356
357			werr: rpctypes.ErrGRPCDuplicateKey,
358		},
359		// Then((Then(Put(a)), Else(Put(a))), (Then(Put(a)), Else(Put(a)))
360		{
361			txnSuccess: []*pb.RequestOp{txnPutReqTwoSide, txnPutReqTwoSide},
362
363			werr: rpctypes.ErrGRPCDuplicateKey,
364		},
365		// Then(Del(x), (Then(Put(a)), Else(Put(a))))
366		{
367			txnSuccess: []*pb.RequestOp{delOutOfRangeReq, txnPutReqTwoSide},
368
369			werr: nil,
370		},
371		// Then(Then(Del(a)), (Then(Del(a)), Else(Del(a))))
372		{
373			txnSuccess: []*pb.RequestOp{txnDelReq, txnDelReqTwoSide},
374
375			werr: nil,
376		},
377		{
378			txnSuccess: []*pb.RequestOp{delKeyReq, delInRangeReq, delKeyReq, delInRangeReq},
379
380			werr: nil,
381		},
382		{
383			txnSuccess: []*pb.RequestOp{putreq, delOutOfRangeReq},
384
385			werr: nil,
386		},
387	}
388	for i, tt := range tests {
389		txn := &pb.TxnRequest{Success: tt.txnSuccess}
390		_, err := kvc.Txn(context.Background(), txn)
391		if !eqErrGRPC(err, tt.werr) {
392			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
393		}
394	}
395}
396
397// Testv3TxnRevision tests that the transaction header revision is set as expected.
398func TestV3TxnRevision(t *testing.T) {
399	defer testutil.AfterTest(t)
400	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
401	defer clus.Terminate(t)
402
403	kvc := toGRPC(clus.RandClient()).KV
404	pr := &pb.PutRequest{Key: []byte("abc"), Value: []byte("def")}
405	presp, err := kvc.Put(context.TODO(), pr)
406	if err != nil {
407		t.Fatal(err)
408	}
409
410	txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte("abc")}}}
411	txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
412	tresp, err := kvc.Txn(context.TODO(), txn)
413	if err != nil {
414		t.Fatal(err)
415	}
416
417	// did not update revision
418	if presp.Header.Revision != tresp.Header.Revision {
419		t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
420	}
421
422	txndr := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte("def")}}}
423	txn = &pb.TxnRequest{Success: []*pb.RequestOp{txndr}}
424	tresp, err = kvc.Txn(context.TODO(), txn)
425	if err != nil {
426		t.Fatal(err)
427	}
428
429	// did not update revision
430	if presp.Header.Revision != tresp.Header.Revision {
431		t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision)
432	}
433
434	txnput := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte("abc"), Value: []byte("123")}}}
435	txn = &pb.TxnRequest{Success: []*pb.RequestOp{txnput}}
436	tresp, err = kvc.Txn(context.TODO(), txn)
437	if err != nil {
438		t.Fatal(err)
439	}
440
441	// updated revision
442	if tresp.Header.Revision != presp.Header.Revision+1 {
443		t.Fatalf("got rev %d, wanted rev %d", tresp.Header.Revision, presp.Header.Revision+1)
444	}
445}
446
447// Testv3TxnCmpHeaderRev tests that the txn header revision is set as expected
448// when compared to the Succeeded field in the txn response.
449func TestV3TxnCmpHeaderRev(t *testing.T) {
450	defer testutil.AfterTest(t)
451	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
452	defer clus.Terminate(t)
453
454	kvc := toGRPC(clus.RandClient()).KV
455
456	for i := 0; i < 10; i++ {
457		// Concurrently put a key with a txn comparing on it.
458		revc := make(chan int64, 1)
459		go func() {
460			defer close(revc)
461			pr := &pb.PutRequest{Key: []byte("k"), Value: []byte("v")}
462			presp, err := kvc.Put(context.TODO(), pr)
463			if err != nil {
464				t.Fatal(err)
465			}
466			revc <- presp.Header.Revision
467		}()
468
469		// The read-only txn uses the optimized readindex server path.
470		txnget := &pb.RequestOp{Request: &pb.RequestOp_RequestRange{
471			RequestRange: &pb.RangeRequest{Key: []byte("k")}}}
472		txn := &pb.TxnRequest{Success: []*pb.RequestOp{txnget}}
473		// i = 0 /\ Succeeded => put followed txn
474		cmp := &pb.Compare{
475			Result:      pb.Compare_EQUAL,
476			Target:      pb.Compare_VERSION,
477			Key:         []byte("k"),
478			TargetUnion: &pb.Compare_Version{Version: int64(i)},
479		}
480		txn.Compare = append(txn.Compare, cmp)
481
482		tresp, err := kvc.Txn(context.TODO(), txn)
483		if err != nil {
484			t.Fatal(err)
485		}
486
487		prev := <-revc
488		// put followed txn; should eval to false
489		if prev > tresp.Header.Revision && !tresp.Succeeded {
490			t.Errorf("#%d: got else but put rev %d followed txn rev (%+v)", i, prev, tresp)
491		}
492		// txn follows put; should eval to true
493		if tresp.Header.Revision >= prev && tresp.Succeeded {
494			t.Errorf("#%d: got then but put rev %d preceded txn (%+v)", i, prev, tresp)
495		}
496	}
497}
498
499// TestV3TxnRangeCompare tests range comparisons in txns
500func TestV3TxnRangeCompare(t *testing.T) {
501	defer testutil.AfterTest(t)
502	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
503	defer clus.Terminate(t)
504
505	// put keys, named by expected revision
506	for _, k := range []string{"/a/2", "/a/3", "/a/4", "/f/5"} {
507		if _, err := clus.Client(0).Put(context.TODO(), k, "x"); err != nil {
508			t.Fatal(err)
509		}
510	}
511
512	tests := []struct {
513		cmp pb.Compare
514
515		wSuccess bool
516	}{
517		{
518			// >= /a/; all create revs fit
519			pb.Compare{
520				Key:         []byte("/a/"),
521				RangeEnd:    []byte{0},
522				Target:      pb.Compare_CREATE,
523				Result:      pb.Compare_LESS,
524				TargetUnion: &pb.Compare_CreateRevision{6},
525			},
526			true,
527		},
528		{
529			// >= /a/; one create rev doesn't fit
530			pb.Compare{
531				Key:         []byte("/a/"),
532				RangeEnd:    []byte{0},
533				Target:      pb.Compare_CREATE,
534				Result:      pb.Compare_LESS,
535				TargetUnion: &pb.Compare_CreateRevision{5},
536			},
537			false,
538		},
539		{
540			// prefix /a/*; all create revs fit
541			pb.Compare{
542				Key:         []byte("/a/"),
543				RangeEnd:    []byte("/a0"),
544				Target:      pb.Compare_CREATE,
545				Result:      pb.Compare_LESS,
546				TargetUnion: &pb.Compare_CreateRevision{5},
547			},
548			true,
549		},
550		{
551			// prefix /a/*; one create rev doesn't fit
552			pb.Compare{
553				Key:         []byte("/a/"),
554				RangeEnd:    []byte("/a0"),
555				Target:      pb.Compare_CREATE,
556				Result:      pb.Compare_LESS,
557				TargetUnion: &pb.Compare_CreateRevision{4},
558			},
559			false,
560		},
561		{
562			// does not exist, does not succeed
563			pb.Compare{
564				Key:         []byte("/b/"),
565				RangeEnd:    []byte("/b0"),
566				Target:      pb.Compare_VALUE,
567				Result:      pb.Compare_EQUAL,
568				TargetUnion: &pb.Compare_Value{[]byte("x")},
569			},
570			false,
571		},
572		{
573			// all keys are leased
574			pb.Compare{
575				Key:         []byte("/a/"),
576				RangeEnd:    []byte("/a0"),
577				Target:      pb.Compare_LEASE,
578				Result:      pb.Compare_GREATER,
579				TargetUnion: &pb.Compare_Lease{0},
580			},
581			false,
582		},
583		{
584			// no keys are leased
585			pb.Compare{
586				Key:         []byte("/a/"),
587				RangeEnd:    []byte("/a0"),
588				Target:      pb.Compare_LEASE,
589				Result:      pb.Compare_EQUAL,
590				TargetUnion: &pb.Compare_Lease{0},
591			},
592			true,
593		},
594	}
595
596	kvc := toGRPC(clus.Client(0)).KV
597	for i, tt := range tests {
598		txn := &pb.TxnRequest{}
599		txn.Compare = append(txn.Compare, &tt.cmp)
600		tresp, err := kvc.Txn(context.TODO(), txn)
601		if err != nil {
602			t.Fatal(err)
603		}
604		if tt.wSuccess != tresp.Succeeded {
605			t.Errorf("#%d: expected %v, got %v", i, tt.wSuccess, tresp.Succeeded)
606		}
607	}
608}
609
610// TestV3TxnNested tests nested txns follow paths as expected.
611func TestV3TxnNestedPath(t *testing.T) {
612	defer testutil.AfterTest(t)
613	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
614	defer clus.Terminate(t)
615
616	kvc := toGRPC(clus.RandClient()).KV
617
618	cmpTrue := &pb.Compare{
619		Result:      pb.Compare_EQUAL,
620		Target:      pb.Compare_VERSION,
621		Key:         []byte("k"),
622		TargetUnion: &pb.Compare_Version{Version: int64(0)},
623	}
624	cmpFalse := &pb.Compare{
625		Result:      pb.Compare_EQUAL,
626		Target:      pb.Compare_VERSION,
627		Key:         []byte("k"),
628		TargetUnion: &pb.Compare_Version{Version: int64(1)},
629	}
630
631	// generate random path to eval txns
632	topTxn := &pb.TxnRequest{}
633	txn := topTxn
634	txnPath := make([]bool, 10)
635	for i := range txnPath {
636		nextTxn := &pb.TxnRequest{}
637		op := &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: nextTxn}}
638		txnPath[i] = rand.Intn(2) == 0
639		if txnPath[i] {
640			txn.Compare = append(txn.Compare, cmpTrue)
641			txn.Success = append(txn.Success, op)
642		} else {
643			txn.Compare = append(txn.Compare, cmpFalse)
644			txn.Failure = append(txn.Failure, op)
645		}
646		txn = nextTxn
647	}
648
649	tresp, err := kvc.Txn(context.TODO(), topTxn)
650	if err != nil {
651		t.Fatal(err)
652	}
653
654	curTxnResp := tresp
655	for i := range txnPath {
656		if curTxnResp.Succeeded != txnPath[i] {
657			t.Fatalf("expected path %+v, got response %+v", txnPath, *tresp)
658		}
659		curTxnResp = curTxnResp.Responses[0].Response.(*pb.ResponseOp_ResponseTxn).ResponseTxn
660	}
661}
662
663// TestV3PutIgnoreValue ensures that writes with ignore_value overwrites with previous key-value pair.
664func TestV3PutIgnoreValue(t *testing.T) {
665	defer testutil.AfterTest(t)
666
667	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
668	defer clus.Terminate(t)
669
670	kvc := toGRPC(clus.RandClient()).KV
671	key, val := []byte("foo"), []byte("bar")
672	putReq := pb.PutRequest{Key: key, Value: val}
673
674	// create lease
675	lc := toGRPC(clus.RandClient()).Lease
676	lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
677	if err != nil {
678		t.Fatal(err)
679	}
680	if lresp.Error != "" {
681		t.Fatal(lresp.Error)
682	}
683
684	tests := []struct {
685		putFunc  func() error
686		putErr   error
687		wleaseID int64
688	}{
689		{ // put failure for non-existent key
690			func() error {
691				preq := putReq
692				preq.IgnoreValue = true
693				_, err := kvc.Put(context.TODO(), &preq)
694				return err
695			},
696			rpctypes.ErrGRPCKeyNotFound,
697			0,
698		},
699		{ // txn failure for non-existent key
700			func() error {
701				preq := putReq
702				preq.Value = nil
703				preq.IgnoreValue = true
704				txn := &pb.TxnRequest{}
705				txn.Success = append(txn.Success, &pb.RequestOp{
706					Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
707				_, err := kvc.Txn(context.TODO(), txn)
708				return err
709			},
710			rpctypes.ErrGRPCKeyNotFound,
711			0,
712		},
713		{ // put success
714			func() error {
715				_, err := kvc.Put(context.TODO(), &putReq)
716				return err
717			},
718			nil,
719			0,
720		},
721		{ // txn success, attach lease
722			func() error {
723				preq := putReq
724				preq.Value = nil
725				preq.Lease = lresp.ID
726				preq.IgnoreValue = true
727				txn := &pb.TxnRequest{}
728				txn.Success = append(txn.Success, &pb.RequestOp{
729					Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
730				_, err := kvc.Txn(context.TODO(), txn)
731				return err
732			},
733			nil,
734			lresp.ID,
735		},
736		{ // non-empty value with ignore_value should error
737			func() error {
738				preq := putReq
739				preq.IgnoreValue = true
740				_, err := kvc.Put(context.TODO(), &preq)
741				return err
742			},
743			rpctypes.ErrGRPCValueProvided,
744			0,
745		},
746		{ // overwrite with previous value, ensure no prev-kv is returned and lease is detached
747			func() error {
748				preq := putReq
749				preq.Value = nil
750				preq.IgnoreValue = true
751				presp, err := kvc.Put(context.TODO(), &preq)
752				if err != nil {
753					return err
754				}
755				if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
756					return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
757				}
758				return nil
759			},
760			nil,
761			0,
762		},
763		{ // revoke lease, ensure detached key doesn't get deleted
764			func() error {
765				_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
766				return err
767			},
768			nil,
769			0,
770		},
771	}
772
773	for i, tt := range tests {
774		if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
775			t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
776		}
777		if tt.putErr != nil {
778			continue
779		}
780		rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
781		if err != nil {
782			t.Fatalf("#%d: %v", i, err)
783		}
784		if len(rr.Kvs) != 1 {
785			t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
786		}
787		if !bytes.Equal(rr.Kvs[0].Value, val) {
788			t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
789		}
790		if rr.Kvs[0].Lease != tt.wleaseID {
791			t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
792		}
793	}
794}
795
796// TestV3PutIgnoreLease ensures that writes with ignore_lease uses previous lease for the key overwrites.
797func TestV3PutIgnoreLease(t *testing.T) {
798	defer testutil.AfterTest(t)
799
800	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
801	defer clus.Terminate(t)
802
803	kvc := toGRPC(clus.RandClient()).KV
804
805	// create lease
806	lc := toGRPC(clus.RandClient()).Lease
807	lresp, err := lc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: 30})
808	if err != nil {
809		t.Fatal(err)
810	}
811	if lresp.Error != "" {
812		t.Fatal(lresp.Error)
813	}
814
815	key, val, val1 := []byte("zoo"), []byte("bar"), []byte("bar1")
816	putReq := pb.PutRequest{Key: key, Value: val}
817
818	tests := []struct {
819		putFunc  func() error
820		putErr   error
821		wleaseID int64
822		wvalue   []byte
823	}{
824		{ // put failure for non-existent key
825			func() error {
826				preq := putReq
827				preq.IgnoreLease = true
828				_, err := kvc.Put(context.TODO(), &preq)
829				return err
830			},
831			rpctypes.ErrGRPCKeyNotFound,
832			0,
833			nil,
834		},
835		{ // txn failure for non-existent key
836			func() error {
837				preq := putReq
838				preq.IgnoreLease = true
839				txn := &pb.TxnRequest{}
840				txn.Success = append(txn.Success, &pb.RequestOp{
841					Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
842				_, err := kvc.Txn(context.TODO(), txn)
843				return err
844			},
845			rpctypes.ErrGRPCKeyNotFound,
846			0,
847			nil,
848		},
849		{ // put success
850			func() error {
851				preq := putReq
852				preq.Lease = lresp.ID
853				_, err := kvc.Put(context.TODO(), &preq)
854				return err
855			},
856			nil,
857			lresp.ID,
858			val,
859		},
860		{ // txn success, modify value using 'ignore_lease' and ensure lease is not detached
861			func() error {
862				preq := putReq
863				preq.Value = val1
864				preq.IgnoreLease = true
865				txn := &pb.TxnRequest{}
866				txn.Success = append(txn.Success, &pb.RequestOp{
867					Request: &pb.RequestOp_RequestPut{RequestPut: &preq}})
868				_, err := kvc.Txn(context.TODO(), txn)
869				return err
870			},
871			nil,
872			lresp.ID,
873			val1,
874		},
875		{ // non-empty lease with ignore_lease should error
876			func() error {
877				preq := putReq
878				preq.Lease = lresp.ID
879				preq.IgnoreLease = true
880				_, err := kvc.Put(context.TODO(), &preq)
881				return err
882			},
883			rpctypes.ErrGRPCLeaseProvided,
884			0,
885			nil,
886		},
887		{ // overwrite with previous value, ensure no prev-kv is returned and lease is detached
888			func() error {
889				presp, err := kvc.Put(context.TODO(), &putReq)
890				if err != nil {
891					return err
892				}
893				if presp.PrevKv != nil && len(presp.PrevKv.Key) != 0 {
894					return fmt.Errorf("unexexpected previous key-value %v", presp.PrevKv)
895				}
896				return nil
897			},
898			nil,
899			0,
900			val,
901		},
902		{ // revoke lease, ensure detached key doesn't get deleted
903			func() error {
904				_, err := lc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID})
905				return err
906			},
907			nil,
908			0,
909			val,
910		},
911	}
912
913	for i, tt := range tests {
914		if err := tt.putFunc(); !eqErrGRPC(err, tt.putErr) {
915			t.Fatalf("#%d: err expected %v, got %v", i, tt.putErr, err)
916		}
917		if tt.putErr != nil {
918			continue
919		}
920		rr, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: key})
921		if err != nil {
922			t.Fatalf("#%d: %v", i, err)
923		}
924		if len(rr.Kvs) != 1 {
925			t.Fatalf("#%d: len(rr.KVs) expected 1, got %d", i, len(rr.Kvs))
926		}
927		if !bytes.Equal(rr.Kvs[0].Value, tt.wvalue) {
928			t.Fatalf("#%d: value expected %q, got %q", i, val, rr.Kvs[0].Value)
929		}
930		if rr.Kvs[0].Lease != tt.wleaseID {
931			t.Fatalf("#%d: lease ID expected %d, got %d", i, tt.wleaseID, rr.Kvs[0].Lease)
932		}
933	}
934}
935
936// TestV3PutMissingLease ensures that a Put on a key with a bogus lease fails.
937func TestV3PutMissingLease(t *testing.T) {
938	defer testutil.AfterTest(t)
939	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
940	defer clus.Terminate(t)
941
942	kvc := toGRPC(clus.RandClient()).KV
943	key := []byte("foo")
944	preq := &pb.PutRequest{Key: key, Lease: 123456}
945	tests := []func(){
946		// put case
947		func() {
948			if presp, err := kvc.Put(context.TODO(), preq); err == nil {
949				t.Errorf("succeeded put key. req: %v. resp: %v", preq, presp)
950			}
951		},
952		// txn success case
953		func() {
954			txn := &pb.TxnRequest{}
955			txn.Success = append(txn.Success, &pb.RequestOp{
956				Request: &pb.RequestOp_RequestPut{
957					RequestPut: preq}})
958			if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
959				t.Errorf("succeeded txn success. req: %v. resp: %v", txn, tresp)
960			}
961		},
962		// txn failure case
963		func() {
964			txn := &pb.TxnRequest{}
965			txn.Failure = append(txn.Failure, &pb.RequestOp{
966				Request: &pb.RequestOp_RequestPut{
967					RequestPut: preq}})
968			cmp := &pb.Compare{
969				Result: pb.Compare_GREATER,
970				Target: pb.Compare_CREATE,
971				Key:    []byte("bar"),
972			}
973			txn.Compare = append(txn.Compare, cmp)
974			if tresp, err := kvc.Txn(context.TODO(), txn); err == nil {
975				t.Errorf("succeeded txn failure. req: %v. resp: %v", txn, tresp)
976			}
977		},
978		// ignore bad lease in failure on success txn
979		func() {
980			txn := &pb.TxnRequest{}
981			rreq := &pb.RangeRequest{Key: []byte("bar")}
982			txn.Success = append(txn.Success, &pb.RequestOp{
983				Request: &pb.RequestOp_RequestRange{
984					RequestRange: rreq}})
985			txn.Failure = append(txn.Failure, &pb.RequestOp{
986				Request: &pb.RequestOp_RequestPut{
987					RequestPut: preq}})
988			if tresp, err := kvc.Txn(context.TODO(), txn); err != nil {
989				t.Errorf("failed good txn. req: %v. resp: %v", txn, tresp)
990			}
991		},
992	}
993
994	for i, f := range tests {
995		f()
996		// key shouldn't have been stored
997		rreq := &pb.RangeRequest{Key: key}
998		rresp, err := kvc.Range(context.TODO(), rreq)
999		if err != nil {
1000			t.Errorf("#%d. could not rangereq (%v)", i, err)
1001		} else if len(rresp.Kvs) != 0 {
1002			t.Errorf("#%d. expected no keys, got %v", i, rresp)
1003		}
1004	}
1005}
1006
1007// TestV3DeleteRange tests various edge cases in the DeleteRange API.
1008func TestV3DeleteRange(t *testing.T) {
1009	defer testutil.AfterTest(t)
1010	tests := []struct {
1011		keySet []string
1012		begin  string
1013		end    string
1014		prevKV bool
1015
1016		wantSet [][]byte
1017		deleted int64
1018	}{
1019		// delete middle
1020		{
1021			[]string{"foo", "foo/abc", "fop"},
1022			"foo/", "fop", false,
1023			[][]byte{[]byte("foo"), []byte("fop")}, 1,
1024		},
1025		// no delete
1026		{
1027			[]string{"foo", "foo/abc", "fop"},
1028			"foo/", "foo/", false,
1029			[][]byte{[]byte("foo"), []byte("foo/abc"), []byte("fop")}, 0,
1030		},
1031		// delete first
1032		{
1033			[]string{"foo", "foo/abc", "fop"},
1034			"fo", "fop", false,
1035			[][]byte{[]byte("fop")}, 2,
1036		},
1037		// delete tail
1038		{
1039			[]string{"foo", "foo/abc", "fop"},
1040			"foo/", "fos", false,
1041			[][]byte{[]byte("foo")}, 2,
1042		},
1043		// delete exact
1044		{
1045			[]string{"foo", "foo/abc", "fop"},
1046			"foo/abc", "", false,
1047			[][]byte{[]byte("foo"), []byte("fop")}, 1,
1048		},
1049		// delete none, [x,x)
1050		{
1051			[]string{"foo"},
1052			"foo", "foo", false,
1053			[][]byte{[]byte("foo")}, 0,
1054		},
1055		// delete middle with preserveKVs set
1056		{
1057			[]string{"foo", "foo/abc", "fop"},
1058			"foo/", "fop", true,
1059			[][]byte{[]byte("foo"), []byte("fop")}, 1,
1060		},
1061	}
1062
1063	for i, tt := range tests {
1064		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1065		kvc := toGRPC(clus.RandClient()).KV
1066
1067		ks := tt.keySet
1068		for j := range ks {
1069			reqput := &pb.PutRequest{Key: []byte(ks[j]), Value: []byte{}}
1070			_, err := kvc.Put(context.TODO(), reqput)
1071			if err != nil {
1072				t.Fatalf("couldn't put key (%v)", err)
1073			}
1074		}
1075
1076		dreq := &pb.DeleteRangeRequest{
1077			Key:      []byte(tt.begin),
1078			RangeEnd: []byte(tt.end),
1079			PrevKv:   tt.prevKV,
1080		}
1081		dresp, err := kvc.DeleteRange(context.TODO(), dreq)
1082		if err != nil {
1083			t.Fatalf("couldn't delete range on test %d (%v)", i, err)
1084		}
1085		if tt.deleted != dresp.Deleted {
1086			t.Errorf("expected %d on test %v, got %d", tt.deleted, i, dresp.Deleted)
1087		}
1088		if tt.prevKV {
1089			if len(dresp.PrevKvs) != int(dresp.Deleted) {
1090				t.Errorf("preserve %d keys, want %d", len(dresp.PrevKvs), dresp.Deleted)
1091			}
1092		}
1093
1094		rreq := &pb.RangeRequest{Key: []byte{0x0}, RangeEnd: []byte{0xff}}
1095		rresp, err := kvc.Range(context.TODO(), rreq)
1096		if err != nil {
1097			t.Errorf("couldn't get range on test %v (%v)", i, err)
1098		}
1099		if dresp.Header.Revision != rresp.Header.Revision {
1100			t.Errorf("expected revision %v, got %v",
1101				dresp.Header.Revision, rresp.Header.Revision)
1102		}
1103
1104		keys := [][]byte{}
1105		for j := range rresp.Kvs {
1106			keys = append(keys, rresp.Kvs[j].Key)
1107		}
1108		if !reflect.DeepEqual(tt.wantSet, keys) {
1109			t.Errorf("expected %v on test %v, got %v", tt.wantSet, i, keys)
1110		}
1111		// can't defer because tcp ports will be in use
1112		clus.Terminate(t)
1113	}
1114}
1115
1116// TestV3TxnInvalidRange tests that invalid ranges are rejected in txns.
1117func TestV3TxnInvalidRange(t *testing.T) {
1118	defer testutil.AfterTest(t)
1119	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1120	defer clus.Terminate(t)
1121
1122	kvc := toGRPC(clus.RandClient()).KV
1123	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
1124
1125	for i := 0; i < 3; i++ {
1126		_, err := kvc.Put(context.Background(), preq)
1127		if err != nil {
1128			t.Fatalf("couldn't put key (%v)", err)
1129		}
1130	}
1131
1132	_, err := kvc.Compact(context.Background(), &pb.CompactionRequest{Revision: 2})
1133	if err != nil {
1134		t.Fatalf("couldn't compact kv space (%v)", err)
1135	}
1136
1137	// future rev
1138	txn := &pb.TxnRequest{}
1139	txn.Success = append(txn.Success, &pb.RequestOp{
1140		Request: &pb.RequestOp_RequestPut{
1141			RequestPut: preq}})
1142
1143	rreq := &pb.RangeRequest{Key: []byte("foo"), Revision: 100}
1144	txn.Success = append(txn.Success, &pb.RequestOp{
1145		Request: &pb.RequestOp_RequestRange{
1146			RequestRange: rreq}})
1147
1148	if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCFutureRev) {
1149		t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCFutureRev)
1150	}
1151
1152	// compacted rev
1153	tv, _ := txn.Success[1].Request.(*pb.RequestOp_RequestRange)
1154	tv.RequestRange.Revision = 1
1155	if _, err := kvc.Txn(context.TODO(), txn); !eqErrGRPC(err, rpctypes.ErrGRPCCompacted) {
1156		t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCCompacted)
1157	}
1158}
1159
1160func TestV3TooLargeRequest(t *testing.T) {
1161	defer testutil.AfterTest(t)
1162
1163	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1164	defer clus.Terminate(t)
1165
1166	kvc := toGRPC(clus.RandClient()).KV
1167
1168	// 2MB request value
1169	largeV := make([]byte, 2*1024*1024)
1170	preq := &pb.PutRequest{Key: []byte("foo"), Value: largeV}
1171
1172	_, err := kvc.Put(context.Background(), preq)
1173	if !eqErrGRPC(err, rpctypes.ErrGRPCRequestTooLarge) {
1174		t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCRequestTooLarge)
1175	}
1176}
1177
1178// TestV3Hash tests hash.
1179func TestV3Hash(t *testing.T) {
1180	defer testutil.AfterTest(t)
1181	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1182	defer clus.Terminate(t)
1183
1184	cli := clus.RandClient()
1185	kvc := toGRPC(cli).KV
1186	m := toGRPC(cli).Maintenance
1187
1188	preq := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
1189
1190	for i := 0; i < 3; i++ {
1191		_, err := kvc.Put(context.Background(), preq)
1192		if err != nil {
1193			t.Fatalf("couldn't put key (%v)", err)
1194		}
1195	}
1196
1197	resp, err := m.Hash(context.Background(), &pb.HashRequest{})
1198	if err != nil || resp.Hash == 0 {
1199		t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
1200	}
1201}
1202
1203// TestV3HashRestart ensures that hash stays the same after restart.
1204func TestV3HashRestart(t *testing.T) {
1205	defer testutil.AfterTest(t)
1206	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
1207	defer clus.Terminate(t)
1208
1209	cli := clus.RandClient()
1210	resp, err := toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
1211	if err != nil || resp.Hash == 0 {
1212		t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
1213	}
1214	hash1 := resp.Hash
1215
1216	clus.Members[0].Stop(t)
1217	clus.Members[0].Restart(t)
1218	clus.waitLeader(t, clus.Members)
1219	kvc := toGRPC(clus.Client(0)).KV
1220	waitForRestart(t, kvc)
1221
1222	cli = clus.RandClient()
1223	resp, err = toGRPC(cli).Maintenance.Hash(context.Background(), &pb.HashRequest{})
1224	if err != nil || resp.Hash == 0 {
1225		t.Fatalf("couldn't hash (%v, hash %d)", err, resp.Hash)
1226	}
1227	hash2 := resp.Hash
1228
1229	if hash1 != hash2 {
1230		t.Fatalf("hash expected %d, got %d", hash1, hash2)
1231	}
1232}
1233
1234// TestV3StorageQuotaAPI tests the V3 server respects quotas at the API layer
1235func TestV3StorageQuotaAPI(t *testing.T) {
1236	defer testutil.AfterTest(t)
1237	quotasize := int64(16 * os.Getpagesize())
1238
1239	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1240
1241	// Set a quota on one node
1242	clus.Members[0].QuotaBackendBytes = quotasize
1243	clus.Members[0].Stop(t)
1244	clus.Members[0].Restart(t)
1245
1246	defer clus.Terminate(t)
1247	kvc := toGRPC(clus.Client(0)).KV
1248	waitForRestart(t, kvc)
1249
1250	key := []byte("abc")
1251
1252	// test small put that fits in quota
1253	smallbuf := make([]byte, 512)
1254	if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
1255		t.Fatal(err)
1256	}
1257
1258	// test big put
1259	bigbuf := make([]byte, quotasize)
1260	_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
1261	if !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
1262		t.Fatalf("big put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
1263	}
1264
1265	// test big txn
1266	puttxn := &pb.RequestOp{
1267		Request: &pb.RequestOp_RequestPut{
1268			RequestPut: &pb.PutRequest{
1269				Key:   key,
1270				Value: bigbuf,
1271			},
1272		},
1273	}
1274	txnreq := &pb.TxnRequest{}
1275	txnreq.Success = append(txnreq.Success, puttxn)
1276	_, txnerr := kvc.Txn(context.TODO(), txnreq)
1277	if !eqErrGRPC(txnerr, rpctypes.ErrGRPCNoSpace) {
1278		t.Fatalf("big txn got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
1279	}
1280}
1281
1282func TestV3RangeRequest(t *testing.T) {
1283	defer testutil.AfterTest(t)
1284	tests := []struct {
1285		putKeys []string
1286		reqs    []pb.RangeRequest
1287
1288		wresps [][]string
1289		wmores []bool
1290	}{
1291		// single key
1292		{
1293			[]string{"foo", "bar"},
1294			[]pb.RangeRequest{
1295				// exists
1296				{Key: []byte("foo")},
1297				// doesn't exist
1298				{Key: []byte("baz")},
1299			},
1300
1301			[][]string{
1302				{"foo"},
1303				{},
1304			},
1305			[]bool{false, false},
1306		},
1307		// multi-key
1308		{
1309			[]string{"a", "b", "c", "d", "e"},
1310			[]pb.RangeRequest{
1311				// all in range
1312				{Key: []byte("a"), RangeEnd: []byte("z")},
1313				// [b, d)
1314				{Key: []byte("b"), RangeEnd: []byte("d")},
1315				// out of range
1316				{Key: []byte("f"), RangeEnd: []byte("z")},
1317				// [c,c) = empty
1318				{Key: []byte("c"), RangeEnd: []byte("c")},
1319				// [d, b) = empty
1320				{Key: []byte("d"), RangeEnd: []byte("b")},
1321				// ["\0", "\0") => all in range
1322				{Key: []byte{0}, RangeEnd: []byte{0}},
1323			},
1324
1325			[][]string{
1326				{"a", "b", "c", "d", "e"},
1327				{"b", "c"},
1328				{},
1329				{},
1330				{},
1331				{"a", "b", "c", "d", "e"},
1332			},
1333			[]bool{false, false, false, false, false, false},
1334		},
1335		// revision
1336		{
1337			[]string{"a", "b", "c", "d", "e"},
1338			[]pb.RangeRequest{
1339				{Key: []byte("a"), RangeEnd: []byte("z"), Revision: 0},
1340				{Key: []byte("a"), RangeEnd: []byte("z"), Revision: 1},
1341				{Key: []byte("a"), RangeEnd: []byte("z"), Revision: 2},
1342				{Key: []byte("a"), RangeEnd: []byte("z"), Revision: 3},
1343			},
1344
1345			[][]string{
1346				{"a", "b", "c", "d", "e"},
1347				{},
1348				{"a"},
1349				{"a", "b"},
1350			},
1351			[]bool{false, false, false, false},
1352		},
1353		// limit
1354		{
1355			[]string{"foo", "bar"},
1356			[]pb.RangeRequest{
1357				// more
1358				{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 1},
1359				// no more
1360				{Key: []byte("a"), RangeEnd: []byte("z"), Limit: 2},
1361			},
1362
1363			[][]string{
1364				{"bar"},
1365				{"bar", "foo"},
1366			},
1367			[]bool{true, false},
1368		},
1369		// sort
1370		{
1371			[]string{"b", "a", "c", "d", "c"},
1372			[]pb.RangeRequest{
1373				{
1374					Key: []byte("a"), RangeEnd: []byte("z"),
1375					Limit:      1,
1376					SortOrder:  pb.RangeRequest_ASCEND,
1377					SortTarget: pb.RangeRequest_KEY,
1378				},
1379				{
1380					Key: []byte("a"), RangeEnd: []byte("z"),
1381					Limit:      1,
1382					SortOrder:  pb.RangeRequest_DESCEND,
1383					SortTarget: pb.RangeRequest_KEY,
1384				},
1385				{
1386					Key: []byte("a"), RangeEnd: []byte("z"),
1387					Limit:      1,
1388					SortOrder:  pb.RangeRequest_ASCEND,
1389					SortTarget: pb.RangeRequest_CREATE,
1390				},
1391				{
1392					Key: []byte("a"), RangeEnd: []byte("z"),
1393					Limit:      1,
1394					SortOrder:  pb.RangeRequest_DESCEND,
1395					SortTarget: pb.RangeRequest_MOD,
1396				},
1397				{
1398					Key: []byte("z"), RangeEnd: []byte("z"),
1399					Limit:      1,
1400					SortOrder:  pb.RangeRequest_DESCEND,
1401					SortTarget: pb.RangeRequest_CREATE,
1402				},
1403				{ // sort ASCEND by default
1404					Key: []byte("a"), RangeEnd: []byte("z"),
1405					Limit:      10,
1406					SortOrder:  pb.RangeRequest_NONE,
1407					SortTarget: pb.RangeRequest_CREATE,
1408				},
1409			},
1410
1411			[][]string{
1412				{"a"},
1413				{"d"},
1414				{"b"},
1415				{"c"},
1416				{},
1417				{"b", "a", "c", "d"},
1418			},
1419			[]bool{true, true, true, true, false, false},
1420		},
1421		// min/max mod rev
1422		{
1423			[]string{"rev2", "rev3", "rev4", "rev5", "rev6"},
1424			[]pb.RangeRequest{
1425				{
1426					Key: []byte{0}, RangeEnd: []byte{0},
1427					MinModRevision: 3,
1428				},
1429				{
1430					Key: []byte{0}, RangeEnd: []byte{0},
1431					MaxModRevision: 3,
1432				},
1433				{
1434					Key: []byte{0}, RangeEnd: []byte{0},
1435					MinModRevision: 3,
1436					MaxModRevision: 5,
1437				},
1438				{
1439					Key: []byte{0}, RangeEnd: []byte{0},
1440					MaxModRevision: 10,
1441				},
1442			},
1443
1444			[][]string{
1445				{"rev3", "rev4", "rev5", "rev6"},
1446				{"rev2", "rev3"},
1447				{"rev3", "rev4", "rev5"},
1448				{"rev2", "rev3", "rev4", "rev5", "rev6"},
1449			},
1450			[]bool{false, false, false, false},
1451		},
1452		// min/max create rev
1453		{
1454			[]string{"rev2", "rev3", "rev2", "rev2", "rev6", "rev3"},
1455			[]pb.RangeRequest{
1456				{
1457					Key: []byte{0}, RangeEnd: []byte{0},
1458					MinCreateRevision: 3,
1459				},
1460				{
1461					Key: []byte{0}, RangeEnd: []byte{0},
1462					MaxCreateRevision: 3,
1463				},
1464				{
1465					Key: []byte{0}, RangeEnd: []byte{0},
1466					MinCreateRevision: 3,
1467					MaxCreateRevision: 5,
1468				},
1469				{
1470					Key: []byte{0}, RangeEnd: []byte{0},
1471					MaxCreateRevision: 10,
1472				},
1473			},
1474
1475			[][]string{
1476				{"rev3", "rev6"},
1477				{"rev2", "rev3"},
1478				{"rev3"},
1479				{"rev2", "rev3", "rev6"},
1480			},
1481			[]bool{false, false, false, false},
1482		},
1483	}
1484
1485	for i, tt := range tests {
1486		clus := NewClusterV3(t, &ClusterConfig{Size: 3})
1487		for _, k := range tt.putKeys {
1488			kvc := toGRPC(clus.RandClient()).KV
1489			req := &pb.PutRequest{Key: []byte(k), Value: []byte("bar")}
1490			if _, err := kvc.Put(context.TODO(), req); err != nil {
1491				t.Fatalf("#%d: couldn't put key (%v)", i, err)
1492			}
1493		}
1494
1495		for j, req := range tt.reqs {
1496			kvc := toGRPC(clus.RandClient()).KV
1497			resp, err := kvc.Range(context.TODO(), &req)
1498			if err != nil {
1499				t.Errorf("#%d.%d: Range error: %v", i, j, err)
1500				continue
1501			}
1502			if len(resp.Kvs) != len(tt.wresps[j]) {
1503				t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
1504				continue
1505			}
1506			for k, wKey := range tt.wresps[j] {
1507				respKey := string(resp.Kvs[k].Key)
1508				if respKey != wKey {
1509					t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
1510				}
1511			}
1512			if resp.More != tt.wmores[j] {
1513				t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
1514			}
1515			wrev := int64(len(tt.putKeys) + 1)
1516			if resp.Header.Revision != wrev {
1517				t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
1518			}
1519		}
1520		clus.Terminate(t)
1521	}
1522}
1523
1524func newClusterV3NoClients(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
1525	cfg.UseGRPC = true
1526	clus := &ClusterV3{cluster: NewClusterByConfig(t, cfg)}
1527	clus.Launch(t)
1528	return clus
1529}
1530
1531// TestTLSGRPCRejectInsecureClient checks that connection is rejected if server is TLS but not client.
1532func TestTLSGRPCRejectInsecureClient(t *testing.T) {
1533	defer testutil.AfterTest(t)
1534
1535	cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
1536	clus := newClusterV3NoClients(t, &cfg)
1537	defer clus.Terminate(t)
1538
1539	// nil out TLS field so client will use an insecure connection
1540	clus.Members[0].ClientTLSInfo = nil
1541	client, err := NewClientV3(clus.Members[0])
1542	if err != nil && err != context.DeadlineExceeded {
1543		t.Fatalf("unexpected error (%v)", err)
1544	} else if client == nil {
1545		// Ideally, no client would be returned. However, grpc will
1546		// return a connection without trying to handshake first so
1547		// the connection appears OK.
1548		return
1549	}
1550	defer client.Close()
1551
1552	donec := make(chan error, 1)
1553	go func() {
1554		ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
1555		reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
1556		_, perr := toGRPC(client).KV.Put(ctx, reqput)
1557		cancel()
1558		donec <- perr
1559	}()
1560
1561	if perr := <-donec; perr == nil {
1562		t.Fatalf("expected client error on put")
1563	}
1564}
1565
1566// TestTLSGRPCRejectSecureClient checks that connection is rejected if client is TLS but not server.
1567func TestTLSGRPCRejectSecureClient(t *testing.T) {
1568	defer testutil.AfterTest(t)
1569
1570	cfg := ClusterConfig{Size: 3}
1571	clus := newClusterV3NoClients(t, &cfg)
1572	defer clus.Terminate(t)
1573
1574	clus.Members[0].ClientTLSInfo = &testTLSInfo
1575	client, err := NewClientV3(clus.Members[0])
1576	if client != nil || err == nil {
1577		t.Fatalf("expected no client")
1578	} else if err != context.DeadlineExceeded {
1579		t.Fatalf("unexpected error (%v)", err)
1580	}
1581}
1582
1583// TestTLSGRPCAcceptSecureAll checks that connection is accepted if both client and server are TLS
1584func TestTLSGRPCAcceptSecureAll(t *testing.T) {
1585	defer testutil.AfterTest(t)
1586
1587	cfg := ClusterConfig{Size: 3, ClientTLS: &testTLSInfo}
1588	clus := newClusterV3NoClients(t, &cfg)
1589	defer clus.Terminate(t)
1590
1591	client, err := NewClientV3(clus.Members[0])
1592	if err != nil {
1593		t.Fatalf("expected tls client (%v)", err)
1594	}
1595	defer client.Close()
1596
1597	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
1598	if _, err := toGRPC(client).KV.Put(context.TODO(), reqput); err != nil {
1599		t.Fatalf("unexpected error on put over tls (%v)", err)
1600	}
1601}
1602
1603// TestTLSReloadAtomicReplace ensures server reloads expired/valid certs
1604// when all certs are atomically replaced by directory renaming.
1605// And expects server to reject client requests, and vice versa.
1606func TestTLSReloadAtomicReplace(t *testing.T) {
1607	tmpDir, err := ioutil.TempDir(os.TempDir(), "fixtures-tmp")
1608	if err != nil {
1609		t.Fatal(err)
1610	}
1611	os.RemoveAll(tmpDir)
1612	defer os.RemoveAll(tmpDir)
1613
1614	certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
1615	if err != nil {
1616		t.Fatal(err)
1617	}
1618	defer os.RemoveAll(certsDir)
1619
1620	certsDirExp, err := ioutil.TempDir(os.TempDir(), "fixtures-expired")
1621	if err != nil {
1622		t.Fatal(err)
1623	}
1624	defer os.RemoveAll(certsDirExp)
1625
1626	cloneFunc := func() transport.TLSInfo {
1627		tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
1628		if terr != nil {
1629			t.Fatal(terr)
1630		}
1631		if _, err = copyTLSFiles(testTLSInfoExpired, certsDirExp); err != nil {
1632			t.Fatal(err)
1633		}
1634		return tlsInfo
1635	}
1636	replaceFunc := func() {
1637		if err = os.Rename(certsDir, tmpDir); err != nil {
1638			t.Fatal(err)
1639		}
1640		if err = os.Rename(certsDirExp, certsDir); err != nil {
1641			t.Fatal(err)
1642		}
1643		// after rename,
1644		// 'certsDir' contains expired certs
1645		// 'tmpDir' contains valid certs
1646		// 'certsDirExp' does not exist
1647	}
1648	revertFunc := func() {
1649		if err = os.Rename(tmpDir, certsDirExp); err != nil {
1650			t.Fatal(err)
1651		}
1652		if err = os.Rename(certsDir, tmpDir); err != nil {
1653			t.Fatal(err)
1654		}
1655		if err = os.Rename(certsDirExp, certsDir); err != nil {
1656			t.Fatal(err)
1657		}
1658	}
1659	testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
1660}
1661
1662// TestTLSReloadCopy ensures server reloads expired/valid certs
1663// when new certs are copied over, one by one. And expects server
1664// to reject client requests, and vice versa.
1665func TestTLSReloadCopy(t *testing.T) {
1666	certsDir, err := ioutil.TempDir(os.TempDir(), "fixtures-to-load")
1667	if err != nil {
1668		t.Fatal(err)
1669	}
1670	defer os.RemoveAll(certsDir)
1671
1672	cloneFunc := func() transport.TLSInfo {
1673		tlsInfo, terr := copyTLSFiles(testTLSInfo, certsDir)
1674		if terr != nil {
1675			t.Fatal(terr)
1676		}
1677		return tlsInfo
1678	}
1679	replaceFunc := func() {
1680		if _, err = copyTLSFiles(testTLSInfoExpired, certsDir); err != nil {
1681			t.Fatal(err)
1682		}
1683	}
1684	revertFunc := func() {
1685		if _, err = copyTLSFiles(testTLSInfo, certsDir); err != nil {
1686			t.Fatal(err)
1687		}
1688	}
1689	testTLSReload(t, cloneFunc, replaceFunc, revertFunc)
1690}
1691
1692func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc func(), revertFunc func()) {
1693	defer testutil.AfterTest(t)
1694
1695	// 1. separate copies for TLS assets modification
1696	tlsInfo := cloneFunc()
1697
1698	// 2. start cluster with valid certs
1699	clus := NewClusterV3(t, &ClusterConfig{Size: 1, PeerTLS: &tlsInfo, ClientTLS: &tlsInfo})
1700	defer clus.Terminate(t)
1701
1702	// 3. concurrent client dialing while certs become expired
1703	errc := make(chan error, 1)
1704	go func() {
1705		for {
1706			cc, err := tlsInfo.ClientConfig()
1707			if err != nil {
1708				// errors in 'go/src/crypto/tls/tls.go'
1709				// tls: private key does not match public key
1710				// tls: failed to find any PEM data in key input
1711				// tls: failed to find any PEM data in certificate input
1712				// Or 'does not exist', 'not found', etc
1713				t.Log(err)
1714				continue
1715			}
1716			cli, cerr := clientv3.New(clientv3.Config{
1717				Endpoints:   []string{clus.Members[0].GRPCAddr()},
1718				DialTimeout: time.Second,
1719				DialOptions: []grpc.DialOption{grpc.WithBlock()},
1720				TLS:         cc,
1721			})
1722			if cerr != nil {
1723				errc <- cerr
1724				return
1725			}
1726			cli.Close()
1727		}
1728	}()
1729
1730	// 4. replace certs with expired ones
1731	replaceFunc()
1732
1733	// 5. expect dial time-out when loading expired certs
1734	select {
1735	case gerr := <-errc:
1736		if gerr != context.DeadlineExceeded {
1737			t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
1738		}
1739	case <-time.After(5 * time.Second):
1740		t.Fatal("failed to receive dial timeout error")
1741	}
1742
1743	// 6. replace expired certs back with valid ones
1744	revertFunc()
1745
1746	// 7. new requests should trigger listener to reload valid certs
1747	tls, terr := tlsInfo.ClientConfig()
1748	if terr != nil {
1749		t.Fatal(terr)
1750	}
1751	cl, cerr := clientv3.New(clientv3.Config{
1752		Endpoints:   []string{clus.Members[0].GRPCAddr()},
1753		DialTimeout: 5 * time.Second,
1754		DialOptions: []grpc.DialOption{grpc.WithBlock()},
1755		TLS:         tls,
1756	})
1757	if cerr != nil {
1758		t.Fatalf("expected no error, got %v", cerr)
1759	}
1760	cl.Close()
1761}
1762
1763func TestGRPCRequireLeader(t *testing.T) {
1764	defer testutil.AfterTest(t)
1765
1766	cfg := ClusterConfig{Size: 3}
1767	clus := newClusterV3NoClients(t, &cfg)
1768	defer clus.Terminate(t)
1769
1770	clus.Members[1].Stop(t)
1771	clus.Members[2].Stop(t)
1772
1773	client, err := NewClientV3(clus.Members[0])
1774	if err != nil {
1775		t.Fatalf("cannot create client: %v", err)
1776	}
1777	defer client.Close()
1778
1779	// wait for election timeout, then member[0] will not have a leader.
1780	time.Sleep(time.Duration(3*electionTicks) * tickDuration)
1781
1782	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
1783	ctx := metadata.NewOutgoingContext(context.Background(), md)
1784	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
1785	if _, err := toGRPC(client).KV.Put(ctx, reqput); rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
1786		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
1787	}
1788}
1789
1790func TestGRPCStreamRequireLeader(t *testing.T) {
1791	defer testutil.AfterTest(t)
1792
1793	cfg := ClusterConfig{Size: 3}
1794	clus := newClusterV3NoClients(t, &cfg)
1795	defer clus.Terminate(t)
1796
1797	client, err := NewClientV3(clus.Members[0])
1798	if err != nil {
1799		t.Fatalf("failed to create client (%v)", err)
1800	}
1801	defer client.Close()
1802
1803	wAPI := toGRPC(client).Watch
1804	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
1805	ctx := metadata.NewOutgoingContext(context.Background(), md)
1806	wStream, err := wAPI.Watch(ctx)
1807	if err != nil {
1808		t.Fatalf("wAPI.Watch error: %v", err)
1809	}
1810
1811	clus.Members[1].Stop(t)
1812	clus.Members[2].Stop(t)
1813
1814	// existing stream should be rejected
1815	_, err = wStream.Recv()
1816	if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
1817		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
1818	}
1819
1820	// new stream should also be rejected
1821	wStream, err = wAPI.Watch(ctx)
1822	if err != nil {
1823		t.Fatalf("wAPI.Watch error: %v", err)
1824	}
1825	_, err = wStream.Recv()
1826	if rpctypes.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
1827		t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
1828	}
1829
1830	clus.Members[1].Restart(t)
1831	clus.Members[2].Restart(t)
1832
1833	clus.waitLeader(t, clus.Members)
1834	time.Sleep(time.Duration(2*electionTicks) * tickDuration)
1835
1836	// new stream should also be OK now after we restarted the other members
1837	wStream, err = wAPI.Watch(ctx)
1838	if err != nil {
1839		t.Fatalf("wAPI.Watch error: %v", err)
1840	}
1841	wreq := &pb.WatchRequest{
1842		RequestUnion: &pb.WatchRequest_CreateRequest{
1843			CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")},
1844		},
1845	}
1846	err = wStream.Send(wreq)
1847	if err != nil {
1848		t.Errorf("err = %v, want nil", err)
1849	}
1850}
1851
1852// TestV3LargeRequests ensures that configurable MaxRequestBytes works as intended.
1853func TestV3LargeRequests(t *testing.T) {
1854	defer testutil.AfterTest(t)
1855	tests := []struct {
1856		maxRequestBytes uint
1857		valueSize       int
1858		expectError     error
1859	}{
1860		// don't set to 0. use 0 as the default.
1861		{1, 1024, rpctypes.ErrGRPCRequestTooLarge},
1862		{10 * 1024 * 1024, 9 * 1024 * 1024, nil},
1863		{10 * 1024 * 1024, 10 * 1024 * 1024, rpctypes.ErrGRPCRequestTooLarge},
1864		{10 * 1024 * 1024, 10*1024*1024 + 5, rpctypes.ErrGRPCRequestTooLarge},
1865	}
1866	for i, test := range tests {
1867		clus := NewClusterV3(t, &ClusterConfig{Size: 1, MaxRequestBytes: test.maxRequestBytes})
1868		kvcli := toGRPC(clus.Client(0)).KV
1869		reqput := &pb.PutRequest{Key: []byte("foo"), Value: make([]byte, test.valueSize)}
1870		_, err := kvcli.Put(context.TODO(), reqput)
1871		if !eqErrGRPC(err, test.expectError) {
1872			t.Errorf("#%d: expected error %v, got %v", i, test.expectError, err)
1873		}
1874
1875		// request went through, expect large response back from server
1876		if test.expectError == nil {
1877			reqget := &pb.RangeRequest{Key: []byte("foo")}
1878			// limit receive call size with original value + gRPC overhead bytes
1879			_, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024))
1880			if err != nil {
1881				t.Errorf("#%d: range expected no error, got %v", i, err)
1882			}
1883		}
1884
1885		clus.Terminate(t)
1886	}
1887}
1888
1889func eqErrGRPC(err1 error, err2 error) bool {
1890	return !(err1 == nil && err2 != nil) || err1.Error() == err2.Error()
1891}
1892
1893// waitForRestart tries a range request until the client's server responds.
1894// This is mainly a stop-gap function until grpcproxy's KVClient adapter
1895// (and by extension, clientv3) supports grpc.CallOption pass-through so
1896// FailFast=false works with Put.
1897func waitForRestart(t *testing.T, kvc pb.KVClient) {
1898	req := &pb.RangeRequest{Key: []byte("_"), Serializable: true}
1899	// TODO: Remove retry loop once the new grpc load balancer provides retry.
1900	var err error
1901	for i := 0; i < 10; i++ {
1902		if _, err = kvc.Range(context.TODO(), req, grpc.FailFast(false)); err != nil {
1903			if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
1904				time.Sleep(time.Millisecond * 250)
1905			} else {
1906				t.Fatal(err)
1907			}
1908		}
1909	}
1910	if err != nil {
1911		t.Fatalf("timed out waiting for restart: %v", err)
1912	}
1913}
1914