1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "context" 19 "os" 20 "path/filepath" 21 "sync" 22 "testing" 23 "time" 24 25 pb "go.etcd.io/etcd/api/v3/etcdserverpb" 26 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 27 "go.etcd.io/etcd/pkg/v3/traceutil" 28 "go.etcd.io/etcd/server/v3/mvcc" 29 "go.etcd.io/etcd/server/v3/mvcc/backend" 30 "go.uber.org/zap/zaptest" 31) 32 33// TestV3StorageQuotaApply tests the V3 server respects quotas during apply 34func TestV3StorageQuotaApply(t *testing.T) { 35 BeforeTest(t) 36 quotasize := int64(16 * os.Getpagesize()) 37 38 clus := NewClusterV3(t, &ClusterConfig{Size: 2}) 39 defer clus.Terminate(t) 40 kvc0 := toGRPC(clus.Client(0)).KV 41 kvc1 := toGRPC(clus.Client(1)).KV 42 43 // Set a quota on one node 44 clus.Members[0].QuotaBackendBytes = quotasize 45 clus.Members[0].Stop(t) 46 clus.Members[0].Restart(t) 47 clus.waitLeader(t, clus.Members) 48 waitForRestart(t, kvc0) 49 50 key := []byte("abc") 51 52 // test small put still works 53 smallbuf := make([]byte, 1024) 54 _, serr := kvc0.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) 55 if serr != nil { 56 t.Fatal(serr) 57 } 58 59 // test big put 60 bigbuf := make([]byte, quotasize) 61 _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: bigbuf}) 62 if err != nil { 63 t.Fatal(err) 64 } 65 66 // quorum get should work regardless of whether alarm is raised 67 _, err = kvc0.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) 68 if err != nil { 69 t.Fatal(err) 70 } 71 72 // wait until alarm is raised for sure-- poll the alarms 73 stopc := time.After(5 * time.Second) 74 for { 75 req := &pb.AlarmRequest{Action: pb.AlarmRequest_GET} 76 resp, aerr := clus.Members[0].s.Alarm(context.TODO(), req) 77 if aerr != nil { 78 t.Fatal(aerr) 79 } 80 if len(resp.Alarms) != 0 { 81 break 82 } 83 select { 84 case <-stopc: 85 t.Fatalf("timed out waiting for alarm") 86 case <-time.After(10 * time.Millisecond): 87 } 88 } 89 90 ctx, cancel := context.WithTimeout(context.TODO(), RequestWaitTimeout) 91 defer cancel() 92 93 // small quota machine should reject put 94 if _, err := kvc0.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { 95 t.Fatalf("past-quota instance should reject put") 96 } 97 98 // large quota machine should reject put 99 if _, err := kvc1.Put(ctx, &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { 100 t.Fatalf("past-quota instance should reject put") 101 } 102 103 // reset large quota node to ensure alarm persisted 104 clus.Members[1].Stop(t) 105 clus.Members[1].Restart(t) 106 clus.waitLeader(t, clus.Members) 107 108 if _, err := kvc1.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err == nil { 109 t.Fatalf("alarmed instance should reject put after reset") 110 } 111} 112 113// TestV3AlarmDeactivate ensures that space alarms can be deactivated so puts go through. 114func TestV3AlarmDeactivate(t *testing.T) { 115 BeforeTest(t) 116 117 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 118 defer clus.Terminate(t) 119 kvc := toGRPC(clus.RandClient()).KV 120 mt := toGRPC(clus.RandClient()).Maintenance 121 122 alarmReq := &pb.AlarmRequest{ 123 MemberID: 123, 124 Action: pb.AlarmRequest_ACTIVATE, 125 Alarm: pb.AlarmType_NOSPACE, 126 } 127 if _, err := mt.Alarm(context.TODO(), alarmReq); err != nil { 128 t.Fatal(err) 129 } 130 131 key := []byte("abc") 132 smallbuf := make([]byte, 512) 133 _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}) 134 if err == nil && !eqErrGRPC(err, rpctypes.ErrGRPCNoSpace) { 135 t.Fatalf("put got %v, expected %v", err, rpctypes.ErrGRPCNoSpace) 136 } 137 138 alarmReq.Action = pb.AlarmRequest_DEACTIVATE 139 if _, err = mt.Alarm(context.TODO(), alarmReq); err != nil { 140 t.Fatal(err) 141 } 142 143 if _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: key, Value: smallbuf}); err != nil { 144 t.Fatal(err) 145 } 146} 147 148func TestV3CorruptAlarm(t *testing.T) { 149 BeforeTest(t) 150 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 151 defer clus.Terminate(t) 152 153 var wg sync.WaitGroup 154 wg.Add(10) 155 for i := 0; i < 10; i++ { 156 go func() { 157 defer wg.Done() 158 if _, err := clus.Client(0).Put(context.TODO(), "k", "v"); err != nil { 159 t.Error(err) 160 } 161 }() 162 } 163 wg.Wait() 164 165 // Corrupt member 0 by modifying backend offline. 166 clus.Members[0].Stop(t) 167 fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") 168 be := backend.NewDefaultBackend(fp) 169 s := mvcc.NewStore(zaptest.NewLogger(t), be, nil, mvcc.StoreConfig{}) 170 // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. 171 s.Put([]byte("abc"), []byte("def"), 0) 172 s.Put([]byte("xyz"), []byte("123"), 0) 173 s.Compact(traceutil.TODO(), 5) 174 s.Commit() 175 s.Close() 176 be.Close() 177 178 clus.Members[1].WaitOK(t) 179 clus.Members[2].WaitOK(t) 180 time.Sleep(time.Second * 2) 181 182 // Wait for cluster so Puts succeed in case member 0 was the leader. 183 if _, err := clus.Client(1).Get(context.TODO(), "k"); err != nil { 184 t.Fatal(err) 185 } 186 if _, err := clus.Client(1).Put(context.TODO(), "xyz", "321"); err != nil { 187 t.Fatal(err) 188 } 189 if _, err := clus.Client(1).Put(context.TODO(), "abc", "fed"); err != nil { 190 t.Fatal(err) 191 } 192 193 // Restart with corruption checking enabled. 194 clus.Members[1].Stop(t) 195 clus.Members[2].Stop(t) 196 for _, m := range clus.Members { 197 m.CorruptCheckTime = time.Second 198 m.Restart(t) 199 } 200 clus.WaitLeader(t) 201 time.Sleep(time.Second * 2) 202 203 clus.Members[0].WaitStarted(t) 204 resp0, err0 := clus.Client(0).Get(context.TODO(), "abc") 205 if err0 != nil { 206 t.Fatal(err0) 207 } 208 clus.Members[1].WaitStarted(t) 209 resp1, err1 := clus.Client(1).Get(context.TODO(), "abc") 210 if err1 != nil { 211 t.Fatal(err1) 212 } 213 214 if resp0.Kvs[0].ModRevision == resp1.Kvs[0].ModRevision { 215 t.Fatalf("matching ModRevision values") 216 } 217 218 for i := 0; i < 5; i++ { 219 presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa") 220 if perr != nil { 221 if !eqErrGRPC(perr, rpctypes.ErrCorrupt) { 222 t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr) 223 } else { 224 return 225 } 226 } 227 time.Sleep(time.Second) 228 } 229 t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second) 230} 231