1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"context"
19	"os"
20	"path/filepath"
21	"sync"
22	"testing"
23	"time"
24
25	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
26	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
27	"go.etcd.io/etcd/pkg/v3/traceutil"
28	"go.etcd.io/etcd/server/v3/mvcc"
29	"go.etcd.io/etcd/server/v3/mvcc/backend"
30	"go.uber.org/zap/zaptest"
31)
32
33// TestV3StorageQuotaApply tests the V3 server respects quotas during apply
34func TestV3StorageQuotaApply(t *testing.T) {
35	BeforeTest(t)
36	quotasize := int64(16 * os.Getpagesize())
37
38	clus := NewClusterV3(t, &ClusterConfig{Size: 2})
39	defer clus.Terminate(t)
40	kvc0 := toGRPC(clus.Client(0)).KV
41	kvc1 := toGRPC(clus.Client(1)).KV
42
43	// Set a quota on one node
44	clus.Members[0].QuotaBackendBytes = quotasize
45	clus.Members[0].Stop(t)
46	clus.Members[0].Restart(t)
47	clus.waitLeader(t, clus.Members)
48	waitForRestart(t, kvc0)
49
50	key := []byte("abc")
51
52	// test small put still works
53	smallbuf := make([]byte, 1024)
54	_, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
55	if serr != nil {
56		t.Fatal(serr)
57	}
58
59	// test big put
60	bigbuf := make([]byte, quotasize)
61	_, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf})
62	if err != nil {
63		t.Fatal(err)
64	}
65
66	// quorum get should work regardless of whether alarm is raised
67	_, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")})
68	if err != nil {
69		t.Fatal(err)
70	}
71
72	// wait until alarm is raised for sure-- poll the alarms
73	stopc := time.After(5 * time.Second)
74	for {
75		req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET}
76		resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req)
77		if aerr != nil {
78			t.Fatal(aerr)
79		}
80		if len(resp.Alarms) != 0 {
81			break
82		}
83		select {
84		case <-stopc:
85			t.Fatalf("timed out waiting for alarm")
86		case <-time.After(10 * time.Millisecond):
87		}
88	}
89
90	ctx, cancel := context.WithTimeout(context.TODO(), RequestWaitTimeout)
91	defer cancel()
92
93	// small quota machine should reject put
94	if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
95		t.Fatalf("past-quota instance should reject put")
96	}
97
98	// large quota machine should reject put
99	if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
100		t.Fatalf("past-quota instance should reject put")
101	}
102
103	// reset large quota node to ensure alarm persisted
104	clus.Members[1].Stop(t)
105	clus.Members[1].Restart(t)
106	clus.waitLeader(t, clus.Members)
107
108	if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil {
109		t.Fatalf("alarmed instance should reject put after reset")
110	}
111}
112
113// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through.
114func TestV3AlarmDeactivate(t *testing.T) {
115	BeforeTest(t)
116
117	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
118	defer clus.Terminate(t)
119	kvc := toGRPC(clus.RandClient()).KV
120	mt := toGRPC(clus.RandClient()).Maintenance
121
122	alarmReq := &pb.AlarmRequest{
123		MemberID: 123,
124		Action:   pb.AlarmRequest_ACTIVATE,
125		Alarm:    pb.AlarmType_NOSPACE,
126	}
127	if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil {
128		t.Fatal(err)
129	}
130
131	key := []byte("abc")
132	smallbuf := make([]byte, 512)
133	_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf})
134	if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) {
135		t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace)
136	}
137
138	alarmReq.Action = pb.AlarmRequest_DEACTIVATE
139	if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil {
140		t.Fatal(err)
141	}
142
143	if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil {
144		t.Fatal(err)
145	}
146}
147
148func TestV3CorruptAlarm(t *testing.T) {
149	BeforeTest(t)
150	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
151	defer clus.Terminate(t)
152
153	var wg sync.WaitGroup
154	wg.Add(10)
155	for i := 0; i < 10; i++ {
156		go func() {
157			defer wg.Done()
158			if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil {
159				t.Error(err)
160			}
161		}()
162	}
163	wg.Wait()
164
165	// Corrupt member 0 by modifying backend offline.
166	clus.Members[0].Stop(t)
167	fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
168	be := backend.NewDefaultBackend(fp)
169	s := mvcc.NewStore(zaptest.NewLogger(t), be, nil, mvcc.StoreConfig{})
170	// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
171	s.Put([]byte("abc"), []byte("def"), 0)
172	s.Put([]byte("xyz"), []byte("123"), 0)
173	s.Compact(traceutil.TODO(), 5)
174	s.Commit()
175	s.Close()
176	be.Close()
177
178	clus.Members[1].WaitOK(t)
179	clus.Members[2].WaitOK(t)
180	time.Sleep(time.Second * 2)
181
182	// Wait for cluster so Puts succeed in case member 0 was the leader.
183	if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil {
184		t.Fatal(err)
185	}
186	if _, err := clus.Client(1).Put(context.TODO(), "xyz", "321"); err != nil {
187		t.Fatal(err)
188	}
189	if _, err := clus.Client(1).Put(context.TODO(), "abc", "fed"); err != nil {
190		t.Fatal(err)
191	}
192
193	// Restart with corruption checking enabled.
194	clus.Members[1].Stop(t)
195	clus.Members[2].Stop(t)
196	for _, m := range clus.Members {
197		m.CorruptCheckTime = time.Second
198		m.Restart(t)
199	}
200	clus.WaitLeader(t)
201	time.Sleep(time.Second * 2)
202
203	clus.Members[0].WaitStarted(t)
204	resp0, err0 := clus.Client(0).Get(context.TODO(), "abc")
205	if err0 != nil {
206		t.Fatal(err0)
207	}
208	clus.Members[1].WaitStarted(t)
209	resp1, err1 := clus.Client(1).Get(context.TODO(), "abc")
210	if err1 != nil {
211		t.Fatal(err1)
212	}
213
214	if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision {
215		t.Fatalf("matching ModRevision values")
216	}
217
218	for i := 0; i < 5; i++ {
219		presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
220		if perr != nil {
221			if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
222				t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
223			} else {
224				return
225			}
226		}
227		time.Sleep(time.Second)
228	}
229	t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
230}
231