1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"context"
19	"os"
20	"path/filepath"
21	"sync"
22	"testing"
23	"time"
24
25	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27	"github.com/coreos/etcd/mvcc"
28	"github.com/coreos/etcd/mvcc/backend"
29	"github.com/coreos/etcd/pkg/testutil"
30)
31
32// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
33func TestV3StorageQuotaApply(t *testing.T) {
34	testutil.AfterTest(t)
35	quotasize := int64(16 * os.Getpagesize())
36
37	clus := NewClusterV3(t, &ClusterConfig{Size: 2})
38	defer clus.Terminate(t)
39	kvc0 := toGRPC(clus.Client(0)).KV
40	kvc1 := toGRPC(clus.Client(1)).KV
41
42	// Set a quota on one node
43	clus.Members[0].QuotaBackendBytes = quotasize
44	clus.Members[0].Stop(t)
45	clus.Members[0].Restart(t)
46	clus.waitLeader(t, clus.Members)
47	waitForRestart(t, kvc0)
48
49	key := []byte("abc")
50
51	// test small put still works
52	smallbuf := make([]byte, 1024)
53	_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
54	if serr != nil {
55		t.Fatal(serr)
56	}
57
58	// test big put
59	bigbuf := make([]byte, quotasize)
60	_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
61	if err != nil {
62		t.Fatal(err)
63	}
64
65	// quorum get should work regardless of whether alarm is raised
66	_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
67	if err != nil {
68		t.Fatal(err)
69	}
70
71	// wait until alarm is raised for sure-- poll the alarms
72	stopc := time.After(5 * time.Second)
73	for {
74		req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
75		resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
76		if aerr != nil {
77			t.Fatal(aerr)
78		}
79		if len(resp.Alarms) != 0 {
80			break
81		}
82		select {
83		case <-stopc:
84			t.Fatalf("timed out waiting for alarm")
85		case <-time.After(10 * time.Millisecond):
86		}
87	}
88
89	// small quota machine should reject put
90	if _, err := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
91		t.Fatalf("past-quota instance should reject put")
92	}
93
94	// large quota machine should reject put
95	if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
96		t.Fatalf("past-quota instance should reject put")
97	}
98
99	// reset large quota node to ensure alarm persisted
100	clus.Members[1].Stop(t)
101	clus.Members[1].Restart(t)
102	clus.waitLeader(t, clus.Members)
103
104	if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
105		t.Fatalf("alarmed instance should reject put after reset")
106	}
107}
108
109// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
110func TestV3AlarmDeactivate(t *testing.T) {
111	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
112	defer clus.Terminate(t)
113	kvc := toGRPC(clus.RandClient()).KV
114	mt := toGRPC(clus.RandClient()).Maintenance
115
116	alarmReq := &pb.AlarmRequest{
117		MemberID: 123,
118		Action:   pb.AlarmRequest_ACTIVATE,
119		Alarm:    pb.AlarmType_NOSPACE,
120	}
121	if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
122		t.Fatal(err)
123	}
124
125	key := []byte("abc")
126	smallbuf := make([]byte, 512)
127	_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
128	if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
129		t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
130	}
131
132	alarmReq.Action = pb.AlarmRequest_DEACTIVATE
133	if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
134		t.Fatal(err)
135	}
136
137	if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
138		t.Fatal(err)
139	}
140}
141
142type fakeConsistentIndex struct{ rev uint64 }
143
144func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev }
145
146func TestV3CorruptAlarm(t *testing.T) {
147	defer testutil.AfterTest(t)
148	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
149	defer clus.Terminate(t)
150
151	var wg sync.WaitGroup
152	wg.Add(10)
153	for i := 0; i < 10; i++ {
154		go func() {
155			defer wg.Done()
156			if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
157				t.Fatal(err)
158			}
159		}()
160	}
161	wg.Wait()
162
163	// Corrupt member 0 by modifying backend offline.
164	clus.Members[0].Stop(t)
165	fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
166	be := backend.NewDefaultBackend(fp)
167	s := mvcc.NewStore(be, nil, &fakeConsistentIndex{13})
168	// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
169	s.Put([]byte("abc"), []byte("def"), 0)
170	s.Put([]byte("xyz"), []byte("123"), 0)
171	s.Compact(5)
172	s.Commit()
173	s.Close()
174	be.Close()
175
176	// Wait for cluster so Puts succeed in case member 0 was the leader.
177	if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
178		t.Fatal(err)
179	}
180	clus.Client(1).Put(context.TODO(), "xyz", "321")
181	clus.Client(1).Put(context.TODO(), "abc", "fed")
182
183	// Restart with corruption checking enabled.
184	clus.Members[1].Stop(t)
185	clus.Members[2].Stop(t)
186	for _, m := range clus.Members {
187		m.CorruptCheckTime = time.Second
188		m.Restart(t)
189	}
190	// Member 0 restarts into split brain.
191
192	resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
193	if err0 != nil {
194		t.Fatal(err0)
195	}
196	resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
197	if err1 != nil {
198		t.Fatal(err1)
199	}
200
201	if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
202		t.Fatalf("matching ModRevision values")
203	}
204
205	for i := 0; i < 5; i++ {
206		presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
207		if perr != nil {
208			if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
209				t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
210			} else {
211				return
212			}
213		}
214		time.Sleep(time.Second)
215	}
216	t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
217}
218