1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "bytes" 19 "context" 20 "fmt" 21 "os" 22 "reflect" 23 "strconv" 24 "strings" 25 "testing" 26 "time" 27 28 "go.etcd.io/etcd/clientv3" 29 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 30 "go.etcd.io/etcd/integration" 31 "go.etcd.io/etcd/mvcc/mvccpb" 32 "go.etcd.io/etcd/pkg/testutil" 33 "go.etcd.io/etcd/version" 34 35 "google.golang.org/grpc" 36 "google.golang.org/grpc/codes" 37) 38 39func TestKVPutError(t *testing.T) { 40 defer testutil.AfterTest(t) 41 42 var ( 43 maxReqBytes = 1.5 * 1024 * 1024 // hard coded max in v3_server.go 44 quota = int64(int(maxReqBytes*1.2) + 8*os.Getpagesize()) // make sure we have enough overhead in backend quota. See discussion in #6486. 45 ) 46 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota, ClientMaxCallSendMsgSize: 100 * 1024 * 1024}) 47 defer clus.Terminate(t) 48 49 kv := clus.RandClient() 50 ctx := context.TODO() 51 52 _, err := kv.Put(ctx, "", "bar") 53 if err != rpctypes.ErrEmptyKey { 54 t.Fatalf("expected %v, got %v", rpctypes.ErrEmptyKey, err) 55 } 56 57 _, err = kv.Put(ctx, "key", strings.Repeat("a", int(maxReqBytes+100))) 58 if err != rpctypes.ErrRequestTooLarge { 59 t.Fatalf("expected %v, got %v", rpctypes.ErrRequestTooLarge, err) 60 } 61 62 _, err = kv.Put(ctx, "foo1", strings.Repeat("a", int(maxReqBytes-50))) 63 if err != nil { // below quota 64 t.Fatal(err) 65 } 66 67 time.Sleep(1 * time.Second) // give enough time for commit 68 69 _, err = kv.Put(ctx, "foo2", strings.Repeat("a", int(maxReqBytes-50))) 70 if err != rpctypes.ErrNoSpace { // over quota 71 t.Fatalf("expected %v, got %v", rpctypes.ErrNoSpace, err) 72 } 73} 74 75func TestKVPut(t *testing.T) { 76 defer testutil.AfterTest(t) 77 78 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 79 defer clus.Terminate(t) 80 81 lapi := clus.RandClient() 82 83 kv := clus.RandClient() 84 ctx := context.TODO() 85 86 resp, err := lapi.Grant(context.Background(), 10) 87 if err != nil { 88 t.Fatalf("failed to create lease %v", err) 89 } 90 91 tests := []struct { 92 key, val string 93 leaseID clientv3.LeaseID 94 }{ 95 {"foo", "bar", clientv3.NoLease}, 96 {"hello", "world", resp.ID}, 97 } 98 99 for i, tt := range tests { 100 if _, err := kv.Put(ctx, tt.key, tt.val, clientv3.WithLease(tt.leaseID)); err != nil { 101 t.Fatalf("#%d: couldn't put %q (%v)", i, tt.key, err) 102 } 103 resp, err := kv.Get(ctx, tt.key) 104 if err != nil { 105 t.Fatalf("#%d: couldn't get key (%v)", i, err) 106 } 107 if len(resp.Kvs) != 1 { 108 t.Fatalf("#%d: expected 1 key, got %d", i, len(resp.Kvs)) 109 } 110 if !bytes.Equal([]byte(tt.val), resp.Kvs[0].Value) { 111 t.Errorf("#%d: val = %s, want %s", i, tt.val, resp.Kvs[0].Value) 112 } 113 if tt.leaseID != clientv3.LeaseID(resp.Kvs[0].Lease) { 114 t.Errorf("#%d: val = %d, want %d", i, tt.leaseID, resp.Kvs[0].Lease) 115 } 116 } 117} 118 119// TestKVPutWithIgnoreValue ensures that Put with WithIgnoreValue does not clobber the old value. 120func TestKVPutWithIgnoreValue(t *testing.T) { 121 defer testutil.AfterTest(t) 122 123 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 124 defer clus.Terminate(t) 125 126 kv := clus.RandClient() 127 128 _, err := kv.Put(context.TODO(), "foo", "", clientv3.WithIgnoreValue()) 129 if err != rpctypes.ErrKeyNotFound { 130 t.Fatalf("err expected %v, got %v", rpctypes.ErrKeyNotFound, err) 131 } 132 133 if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil { 134 t.Fatal(err) 135 } 136 137 if _, err := kv.Put(context.TODO(), "foo", "", clientv3.WithIgnoreValue()); err != nil { 138 t.Fatal(err) 139 } 140 rr, rerr := kv.Get(context.TODO(), "foo") 141 if rerr != nil { 142 t.Fatal(rerr) 143 } 144 if len(rr.Kvs) != 1 { 145 t.Fatalf("len(rr.Kvs) expected 1, got %d", len(rr.Kvs)) 146 } 147 if !bytes.Equal(rr.Kvs[0].Value, []byte("bar")) { 148 t.Fatalf("value expected 'bar', got %q", rr.Kvs[0].Value) 149 } 150} 151 152// TestKVPutWithIgnoreLease ensures that Put with WithIgnoreLease does not affect the existing lease for the key. 153func TestKVPutWithIgnoreLease(t *testing.T) { 154 defer testutil.AfterTest(t) 155 156 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 157 defer clus.Terminate(t) 158 159 kv := clus.RandClient() 160 161 lapi := clus.RandClient() 162 163 resp, err := lapi.Grant(context.Background(), 10) 164 if err != nil { 165 t.Errorf("failed to create lease %v", err) 166 } 167 168 if _, err := kv.Put(context.TODO(), "zoo", "bar", clientv3.WithIgnoreLease()); err != rpctypes.ErrKeyNotFound { 169 t.Fatalf("err expected %v, got %v", rpctypes.ErrKeyNotFound, err) 170 } 171 172 if _, err := kv.Put(context.TODO(), "zoo", "bar", clientv3.WithLease(resp.ID)); err != nil { 173 t.Fatal(err) 174 } 175 176 if _, err := kv.Put(context.TODO(), "zoo", "bar1", clientv3.WithIgnoreLease()); err != nil { 177 t.Fatal(err) 178 } 179 180 rr, rerr := kv.Get(context.TODO(), "zoo") 181 if rerr != nil { 182 t.Fatal(rerr) 183 } 184 if len(rr.Kvs) != 1 { 185 t.Fatalf("len(rr.Kvs) expected 1, got %d", len(rr.Kvs)) 186 } 187 if rr.Kvs[0].Lease != int64(resp.ID) { 188 t.Fatalf("lease expected %v, got %v", resp.ID, rr.Kvs[0].Lease) 189 } 190} 191 192func TestKVPutWithRequireLeader(t *testing.T) { 193 defer testutil.AfterTest(t) 194 195 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 196 defer clus.Terminate(t) 197 198 clus.Members[1].Stop(t) 199 clus.Members[2].Stop(t) 200 201 // wait for election timeout, then member[0] will not have a leader. 202 var ( 203 electionTicks = 10 204 tickDuration = 10 * time.Millisecond 205 ) 206 time.Sleep(time.Duration(3*electionTicks) * tickDuration) 207 208 kv := clus.Client(0) 209 _, err := kv.Put(clientv3.WithRequireLeader(context.Background()), "foo", "bar") 210 if err != rpctypes.ErrNoLeader { 211 t.Fatal(err) 212 } 213 214 cnt, err := clus.Members[0].Metric( 215 "etcd_server_client_requests_total", 216 `type="unary"`, 217 fmt.Sprintf(`client_api_version="%v"`, version.APIVersion), 218 ) 219 if err != nil { 220 t.Fatal(err) 221 } 222 cv, err := strconv.ParseInt(cnt, 10, 32) 223 if err != nil { 224 t.Fatal(err) 225 } 226 if cv < 1 { // >1 when retried 227 t.Fatalf("expected at least 1, got %q", cnt) 228 } 229 230 // clients may give timeout errors since the members are stopped; take 231 // the clients so that terminating the cluster won't complain 232 clus.Client(1).Close() 233 clus.Client(2).Close() 234 clus.TakeClient(1) 235 clus.TakeClient(2) 236} 237 238func TestKVRange(t *testing.T) { 239 defer testutil.AfterTest(t) 240 241 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 242 defer clus.Terminate(t) 243 244 kv := clus.RandClient() 245 ctx := context.TODO() 246 247 keySet := []string{"a", "b", "c", "c", "c", "foo", "foo/abc", "fop"} 248 for i, key := range keySet { 249 if _, err := kv.Put(ctx, key, ""); err != nil { 250 t.Fatalf("#%d: couldn't put %q (%v)", i, key, err) 251 } 252 } 253 resp, err := kv.Get(ctx, keySet[0]) 254 if err != nil { 255 t.Fatalf("couldn't get key (%v)", err) 256 } 257 wheader := resp.Header 258 259 tests := []struct { 260 begin, end string 261 rev int64 262 opts []clientv3.OpOption 263 264 wantSet []*mvccpb.KeyValue 265 }{ 266 // range first two 267 { 268 "a", "c", 269 0, 270 nil, 271 272 []*mvccpb.KeyValue{ 273 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 274 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 275 }, 276 }, 277 // range first two with serializable 278 { 279 "a", "c", 280 0, 281 []clientv3.OpOption{clientv3.WithSerializable()}, 282 283 []*mvccpb.KeyValue{ 284 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 285 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 286 }, 287 }, 288 // range all with rev 289 { 290 "a", "x", 291 2, 292 nil, 293 294 []*mvccpb.KeyValue{ 295 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 296 }, 297 }, 298 // range all with countOnly 299 { 300 "a", "x", 301 2, 302 []clientv3.OpOption{clientv3.WithCountOnly()}, 303 304 nil, 305 }, 306 // range all with SortByKey, SortAscend 307 { 308 "a", "x", 309 0, 310 []clientv3.OpOption{clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)}, 311 312 []*mvccpb.KeyValue{ 313 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 314 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 315 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 316 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 317 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 318 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 319 }, 320 }, 321 // range all with SortByKey, missing sorting order (ASCEND by default) 322 { 323 "a", "x", 324 0, 325 []clientv3.OpOption{clientv3.WithSort(clientv3.SortByKey, clientv3.SortNone)}, 326 327 []*mvccpb.KeyValue{ 328 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 329 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 330 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 331 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 332 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 333 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 334 }, 335 }, 336 // range all with SortByCreateRevision, SortDescend 337 { 338 "a", "x", 339 0, 340 []clientv3.OpOption{clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortDescend)}, 341 342 []*mvccpb.KeyValue{ 343 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 344 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 345 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 346 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 347 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 348 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 349 }, 350 }, 351 // range all with SortByCreateRevision, missing sorting order (ASCEND by default) 352 { 353 "a", "x", 354 0, 355 []clientv3.OpOption{clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortNone)}, 356 357 []*mvccpb.KeyValue{ 358 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 359 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 360 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 361 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 362 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 363 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 364 }, 365 }, 366 // range all with SortByModRevision, SortDescend 367 { 368 "a", "x", 369 0, 370 []clientv3.OpOption{clientv3.WithSort(clientv3.SortByModRevision, clientv3.SortDescend)}, 371 372 []*mvccpb.KeyValue{ 373 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 374 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 375 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 376 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 377 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 378 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 379 }, 380 }, 381 // WithPrefix 382 { 383 "foo", "", 384 0, 385 []clientv3.OpOption{clientv3.WithPrefix()}, 386 387 []*mvccpb.KeyValue{ 388 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 389 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 390 }, 391 }, 392 // WithFromKey 393 { 394 "fo", "", 395 0, 396 []clientv3.OpOption{clientv3.WithFromKey()}, 397 398 []*mvccpb.KeyValue{ 399 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 400 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 401 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 402 }, 403 }, 404 // fetch entire keyspace using WithFromKey 405 { 406 "\x00", "", 407 0, 408 []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)}, 409 410 []*mvccpb.KeyValue{ 411 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 412 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 413 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 414 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 415 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 416 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 417 }, 418 }, 419 // fetch entire keyspace using WithPrefix 420 { 421 "", "", 422 0, 423 []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)}, 424 425 []*mvccpb.KeyValue{ 426 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 427 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 428 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 429 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 430 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 431 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 432 }, 433 }, 434 // fetch keyspace with empty key using WithFromKey 435 { 436 "", "", 437 0, 438 []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)}, 439 440 []*mvccpb.KeyValue{ 441 {Key: []byte("a"), Value: nil, CreateRevision: 2, ModRevision: 2, Version: 1}, 442 {Key: []byte("b"), Value: nil, CreateRevision: 3, ModRevision: 3, Version: 1}, 443 {Key: []byte("c"), Value: nil, CreateRevision: 4, ModRevision: 6, Version: 3}, 444 {Key: []byte("foo"), Value: nil, CreateRevision: 7, ModRevision: 7, Version: 1}, 445 {Key: []byte("foo/abc"), Value: nil, CreateRevision: 8, ModRevision: 8, Version: 1}, 446 {Key: []byte("fop"), Value: nil, CreateRevision: 9, ModRevision: 9, Version: 1}, 447 }, 448 }, 449 } 450 451 for i, tt := range tests { 452 opts := []clientv3.OpOption{clientv3.WithRange(tt.end), clientv3.WithRev(tt.rev)} 453 opts = append(opts, tt.opts...) 454 resp, err := kv.Get(ctx, tt.begin, opts...) 455 if err != nil { 456 t.Fatalf("#%d: couldn't range (%v)", i, err) 457 } 458 if !reflect.DeepEqual(wheader, resp.Header) { 459 t.Fatalf("#%d: wheader expected %+v, got %+v", i, wheader, resp.Header) 460 } 461 if !reflect.DeepEqual(tt.wantSet, resp.Kvs) { 462 t.Fatalf("#%d: resp.Kvs expected %+v, got %+v", i, tt.wantSet, resp.Kvs) 463 } 464 } 465} 466 467func TestKVGetErrConnClosed(t *testing.T) { 468 defer testutil.AfterTest(t) 469 470 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 471 defer clus.Terminate(t) 472 473 cli := clus.Client(0) 474 475 donec := make(chan struct{}) 476 if err := cli.Close(); err != nil { 477 t.Fatal(err) 478 } 479 clus.TakeClient(0) 480 481 go func() { 482 defer close(donec) 483 _, err := cli.Get(context.TODO(), "foo") 484 if !clientv3.IsConnCanceled(err) { 485 t.Errorf("expected %v, got %v", context.Canceled, err) 486 } 487 }() 488 489 select { 490 case <-time.After(integration.RequestWaitTimeout): 491 t.Fatal("kv.Get took too long") 492 case <-donec: 493 } 494} 495 496func TestKVNewAfterClose(t *testing.T) { 497 defer testutil.AfterTest(t) 498 499 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 500 defer clus.Terminate(t) 501 502 cli := clus.Client(0) 503 clus.TakeClient(0) 504 if err := cli.Close(); err != nil { 505 t.Fatal(err) 506 } 507 508 donec := make(chan struct{}) 509 go func() { 510 _, err := cli.Get(context.TODO(), "foo") 511 if !clientv3.IsConnCanceled(err) { 512 t.Errorf("expected %v, got %v", context.Canceled, err) 513 } 514 close(donec) 515 }() 516 select { 517 case <-time.After(integration.RequestWaitTimeout): 518 t.Fatal("kv.Get took too long") 519 case <-donec: 520 } 521} 522 523func TestKVDeleteRange(t *testing.T) { 524 defer testutil.AfterTest(t) 525 526 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 527 defer clus.Terminate(t) 528 529 kv := clus.RandClient() 530 ctx := context.TODO() 531 532 tests := []struct { 533 key string 534 opts []clientv3.OpOption 535 536 wkeys []string 537 }{ 538 // [a, c) 539 { 540 key: "a", 541 opts: []clientv3.OpOption{clientv3.WithRange("c")}, 542 543 wkeys: []string{"c", "c/abc", "d"}, 544 }, 545 // >= c 546 { 547 key: "c", 548 opts: []clientv3.OpOption{clientv3.WithFromKey()}, 549 550 wkeys: []string{"a", "b"}, 551 }, 552 // c* 553 { 554 key: "c", 555 opts: []clientv3.OpOption{clientv3.WithPrefix()}, 556 557 wkeys: []string{"a", "b", "d"}, 558 }, 559 // * 560 { 561 key: "\x00", 562 opts: []clientv3.OpOption{clientv3.WithFromKey()}, 563 564 wkeys: []string{}, 565 }, 566 } 567 568 for i, tt := range tests { 569 keySet := []string{"a", "b", "c", "c/abc", "d"} 570 for j, key := range keySet { 571 if _, err := kv.Put(ctx, key, ""); err != nil { 572 t.Fatalf("#%d: couldn't put %q (%v)", j, key, err) 573 } 574 } 575 576 _, err := kv.Delete(ctx, tt.key, tt.opts...) 577 if err != nil { 578 t.Fatalf("#%d: couldn't delete range (%v)", i, err) 579 } 580 581 resp, err := kv.Get(ctx, "a", clientv3.WithFromKey()) 582 if err != nil { 583 t.Fatalf("#%d: couldn't get keys (%v)", i, err) 584 } 585 keys := []string{} 586 for _, kv := range resp.Kvs { 587 keys = append(keys, string(kv.Key)) 588 } 589 if !reflect.DeepEqual(tt.wkeys, keys) { 590 t.Errorf("#%d: resp.Kvs got %v, expected %v", i, keys, tt.wkeys) 591 } 592 } 593} 594 595func TestKVDelete(t *testing.T) { 596 defer testutil.AfterTest(t) 597 598 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 599 defer clus.Terminate(t) 600 601 kv := clus.RandClient() 602 ctx := context.TODO() 603 604 presp, err := kv.Put(ctx, "foo", "") 605 if err != nil { 606 t.Fatalf("couldn't put 'foo' (%v)", err) 607 } 608 if presp.Header.Revision != 2 { 609 t.Fatalf("presp.Header.Revision got %d, want %d", presp.Header.Revision, 2) 610 } 611 resp, err := kv.Delete(ctx, "foo") 612 if err != nil { 613 t.Fatalf("couldn't delete key (%v)", err) 614 } 615 if resp.Header.Revision != 3 { 616 t.Fatalf("resp.Header.Revision got %d, want %d", resp.Header.Revision, 3) 617 } 618 gresp, err := kv.Get(ctx, "foo") 619 if err != nil { 620 t.Fatalf("couldn't get key (%v)", err) 621 } 622 if len(gresp.Kvs) > 0 { 623 t.Fatalf("gresp.Kvs got %+v, want none", gresp.Kvs) 624 } 625} 626 627func TestKVCompactError(t *testing.T) { 628 defer testutil.AfterTest(t) 629 630 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 631 defer clus.Terminate(t) 632 633 kv := clus.RandClient() 634 ctx := context.TODO() 635 636 for i := 0; i < 5; i++ { 637 if _, err := kv.Put(ctx, "foo", "bar"); err != nil { 638 t.Fatalf("couldn't put 'foo' (%v)", err) 639 } 640 } 641 _, err := kv.Compact(ctx, 6) 642 if err != nil { 643 t.Fatalf("couldn't compact 6 (%v)", err) 644 } 645 646 _, err = kv.Compact(ctx, 6) 647 if err != rpctypes.ErrCompacted { 648 t.Fatalf("expected %v, got %v", rpctypes.ErrCompacted, err) 649 } 650 651 _, err = kv.Compact(ctx, 100) 652 if err != rpctypes.ErrFutureRev { 653 t.Fatalf("expected %v, got %v", rpctypes.ErrFutureRev, err) 654 } 655} 656 657func TestKVCompact(t *testing.T) { 658 defer testutil.AfterTest(t) 659 660 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 661 defer clus.Terminate(t) 662 663 kv := clus.RandClient() 664 ctx := context.TODO() 665 666 for i := 0; i < 10; i++ { 667 if _, err := kv.Put(ctx, "foo", "bar"); err != nil { 668 t.Fatalf("couldn't put 'foo' (%v)", err) 669 } 670 } 671 672 _, err := kv.Compact(ctx, 7) 673 if err != nil { 674 t.Fatalf("couldn't compact kv space (%v)", err) 675 } 676 _, err = kv.Compact(ctx, 7) 677 if err == nil || err != rpctypes.ErrCompacted { 678 t.Fatalf("error got %v, want %v", err, rpctypes.ErrCompacted) 679 } 680 681 wcli := clus.RandClient() 682 // new watcher could precede receiving the compaction without quorum first 683 wcli.Get(ctx, "quorum-get") 684 685 wchan := wcli.Watch(ctx, "foo", clientv3.WithRev(3)) 686 687 wr := <-wchan 688 if wr.CompactRevision != 7 { 689 t.Fatalf("wchan CompactRevision got %v, want 7", wr.CompactRevision) 690 } 691 if !wr.Canceled { 692 t.Fatalf("expected canceled watcher on compacted revision, got %v", wr.Canceled) 693 } 694 if wr.Err() != rpctypes.ErrCompacted { 695 t.Fatalf("watch response error expected %v, got %v", rpctypes.ErrCompacted, wr.Err()) 696 } 697 wr, ok := <-wchan 698 if ok { 699 t.Fatalf("wchan got %v, expected closed", wr) 700 } 701 if wr.Err() != nil { 702 t.Fatalf("watch response error expected nil, got %v", wr.Err()) 703 } 704 705 _, err = kv.Compact(ctx, 1000) 706 if err == nil || err != rpctypes.ErrFutureRev { 707 t.Fatalf("error got %v, want %v", err, rpctypes.ErrFutureRev) 708 } 709} 710 711// TestKVGetRetry ensures get will retry on disconnect. 712func TestKVGetRetry(t *testing.T) { 713 defer testutil.AfterTest(t) 714 715 clusterSize := 3 716 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) 717 defer clus.Terminate(t) 718 719 // because killing leader and following election 720 // could give no other endpoints for client reconnection 721 fIdx := (clus.WaitLeader(t) + 1) % clusterSize 722 723 kv := clus.Client(fIdx) 724 ctx := context.TODO() 725 726 if _, err := kv.Put(ctx, "foo", "bar"); err != nil { 727 t.Fatal(err) 728 } 729 730 clus.Members[fIdx].Stop(t) 731 732 donec := make(chan struct{}, 1) 733 go func() { 734 // Get will fail, but reconnect will trigger 735 gresp, gerr := kv.Get(ctx, "foo") 736 if gerr != nil { 737 t.Error(gerr) 738 } 739 wkvs := []*mvccpb.KeyValue{ 740 { 741 Key: []byte("foo"), 742 Value: []byte("bar"), 743 CreateRevision: 2, 744 ModRevision: 2, 745 Version: 1, 746 }, 747 } 748 if !reflect.DeepEqual(gresp.Kvs, wkvs) { 749 t.Errorf("bad get: got %v, want %v", gresp.Kvs, wkvs) 750 } 751 donec <- struct{}{} 752 }() 753 754 time.Sleep(100 * time.Millisecond) 755 clus.Members[fIdx].Restart(t) 756 clus.Members[fIdx].WaitOK(t) 757 758 select { 759 case <-time.After(20 * time.Second): 760 t.Fatalf("timed out waiting for get") 761 case <-donec: 762 } 763} 764 765// TestKVPutFailGetRetry ensures a get will retry following a failed put. 766func TestKVPutFailGetRetry(t *testing.T) { 767 defer testutil.AfterTest(t) 768 769 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 770 defer clus.Terminate(t) 771 772 kv := clus.Client(0) 773 clus.Members[0].Stop(t) 774 775 ctx, cancel := context.WithTimeout(context.TODO(), time.Second) 776 defer cancel() 777 _, err := kv.Put(ctx, "foo", "bar") 778 if err == nil { 779 t.Fatalf("got success on disconnected put, wanted error") 780 } 781 782 donec := make(chan struct{}) 783 go func() { 784 // Get will fail, but reconnect will trigger 785 gresp, gerr := kv.Get(context.TODO(), "foo") 786 if gerr != nil { 787 t.Error(gerr) 788 } 789 if len(gresp.Kvs) != 0 { 790 t.Errorf("bad get kvs: got %+v, want empty", gresp.Kvs) 791 } 792 donec <- struct{}{} 793 }() 794 795 time.Sleep(100 * time.Millisecond) 796 clus.Members[0].Restart(t) 797 798 select { 799 case <-time.After(20 * time.Second): 800 t.Fatalf("timed out waiting for get") 801 case <-donec: 802 } 803} 804 805// TestKVGetCancel tests that a context cancel on a Get terminates as expected. 806func TestKVGetCancel(t *testing.T) { 807 defer testutil.AfterTest(t) 808 809 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 810 defer clus.Terminate(t) 811 812 oldconn := clus.Client(0).ActiveConnection() 813 kv := clus.Client(0) 814 815 ctx, cancel := context.WithCancel(context.TODO()) 816 cancel() 817 818 resp, err := kv.Get(ctx, "abc") 819 if err == nil { 820 t.Fatalf("cancel on get response %v, expected context error", resp) 821 } 822 newconn := clus.Client(0).ActiveConnection() 823 if oldconn != newconn { 824 t.Fatalf("cancel on get broke client connection") 825 } 826} 827 828// TestKVGetStoppedServerAndClose ensures closing after a failed Get works. 829func TestKVGetStoppedServerAndClose(t *testing.T) { 830 defer testutil.AfterTest(t) 831 832 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 833 defer clus.Terminate(t) 834 835 cli := clus.Client(0) 836 clus.Members[0].Stop(t) 837 ctx, cancel := context.WithTimeout(context.TODO(), time.Second) 838 // this Get fails and triggers an asynchronous connection retry 839 _, err := cli.Get(ctx, "abc") 840 cancel() 841 if err != nil && !(isCanceled(err) || isClientTimeout(err)) { 842 t.Fatal(err) 843 } 844} 845 846// TestKVPutStoppedServerAndClose ensures closing after a failed Put works. 847func TestKVPutStoppedServerAndClose(t *testing.T) { 848 defer testutil.AfterTest(t) 849 850 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 851 defer clus.Terminate(t) 852 853 cli := clus.Client(0) 854 clus.Members[0].Stop(t) 855 856 ctx, cancel := context.WithTimeout(context.TODO(), time.Second) 857 // get retries on all errors. 858 // so here we use it to eat the potential broken pipe error for the next put. 859 // grpc client might see a broken pipe error when we issue the get request before 860 // grpc finds out the original connection is down due to the member shutdown. 861 _, err := cli.Get(ctx, "abc") 862 cancel() 863 if err != nil && !(isCanceled(err) || isClientTimeout(err)) { 864 t.Fatal(err) 865 } 866 867 ctx, cancel = context.WithTimeout(context.TODO(), time.Second) 868 // this Put fails and triggers an asynchronous connection retry 869 _, err = cli.Put(ctx, "abc", "123") 870 cancel() 871 if err != nil && !(isCanceled(err) || isClientTimeout(err) || isUnavailable(err)) { 872 t.Fatal(err) 873 } 874} 875 876// TestKVPutAtMostOnce ensures that a Put will only occur at most once 877// in the presence of network errors. 878func TestKVPutAtMostOnce(t *testing.T) { 879 defer testutil.AfterTest(t) 880 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) 881 defer clus.Terminate(t) 882 883 if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil { 884 t.Fatal(err) 885 } 886 887 for i := 0; i < 10; i++ { 888 clus.Members[0].DropConnections() 889 donec := make(chan struct{}) 890 go func() { 891 defer close(donec) 892 for i := 0; i < 10; i++ { 893 clus.Members[0].DropConnections() 894 time.Sleep(5 * time.Millisecond) 895 } 896 }() 897 _, err := clus.Client(0).Put(context.TODO(), "k", "v") 898 <-donec 899 if err != nil { 900 break 901 } 902 } 903 904 resp, err := clus.Client(0).Get(context.TODO(), "k") 905 if err != nil { 906 t.Fatal(err) 907 } 908 if resp.Kvs[0].Version > 11 { 909 t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0]) 910 } 911} 912 913// TestKVLargeRequests tests various client/server side request limits. 914func TestKVLargeRequests(t *testing.T) { 915 defer testutil.AfterTest(t) 916 tests := []struct { 917 // make sure that "MaxCallSendMsgSize" < server-side default send/recv limit 918 maxRequestBytesServer uint 919 maxCallSendBytesClient int 920 maxCallRecvBytesClient int 921 922 valueSize int 923 expectError error 924 }{ 925 { 926 maxRequestBytesServer: 256, 927 maxCallSendBytesClient: 0, 928 maxCallRecvBytesClient: 0, 929 valueSize: 1024, 930 expectError: rpctypes.ErrRequestTooLarge, 931 }, 932 933 // without proper client-side receive size limit 934 // "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)" 935 { 936 937 maxRequestBytesServer: 7*1024*1024 + 512*1024, 938 maxCallSendBytesClient: 7 * 1024 * 1024, 939 maxCallRecvBytesClient: 0, 940 valueSize: 5 * 1024 * 1024, 941 expectError: nil, 942 }, 943 944 { 945 maxRequestBytesServer: 10 * 1024 * 1024, 946 maxCallSendBytesClient: 100 * 1024 * 1024, 947 maxCallRecvBytesClient: 0, 948 valueSize: 10 * 1024 * 1024, 949 expectError: rpctypes.ErrRequestTooLarge, 950 }, 951 { 952 maxRequestBytesServer: 10 * 1024 * 1024, 953 maxCallSendBytesClient: 10 * 1024 * 1024, 954 maxCallRecvBytesClient: 0, 955 valueSize: 10 * 1024 * 1024, 956 expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), 957 }, 958 { 959 maxRequestBytesServer: 10 * 1024 * 1024, 960 maxCallSendBytesClient: 100 * 1024 * 1024, 961 maxCallRecvBytesClient: 0, 962 valueSize: 10*1024*1024 + 5, 963 expectError: rpctypes.ErrRequestTooLarge, 964 }, 965 { 966 maxRequestBytesServer: 10 * 1024 * 1024, 967 maxCallSendBytesClient: 10 * 1024 * 1024, 968 maxCallRecvBytesClient: 0, 969 valueSize: 10*1024*1024 + 5, 970 expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), 971 }, 972 } 973 for i, test := range tests { 974 clus := integration.NewClusterV3(t, 975 &integration.ClusterConfig{ 976 Size: 1, 977 MaxRequestBytes: test.maxRequestBytesServer, 978 ClientMaxCallSendMsgSize: test.maxCallSendBytesClient, 979 ClientMaxCallRecvMsgSize: test.maxCallRecvBytesClient, 980 }, 981 ) 982 cli := clus.Client(0) 983 _, err := cli.Put(context.TODO(), "foo", strings.Repeat("a", test.valueSize)) 984 985 if _, ok := err.(rpctypes.EtcdError); ok { 986 if err != test.expectError { 987 t.Errorf("#%d: expected %v, got %v", i, test.expectError, err) 988 } 989 } else if err != nil && !strings.HasPrefix(err.Error(), test.expectError.Error()) { 990 t.Errorf("#%d: expected error starting with '%s', got '%s'", i, test.expectError.Error(), err.Error()) 991 } 992 993 // put request went through, now expects large response back 994 if err == nil { 995 _, err = cli.Get(context.TODO(), "foo") 996 if err != nil { 997 t.Errorf("#%d: get expected no error, got %v", i, err) 998 } 999 } 1000 1001 clus.Terminate(t) 1002 } 1003} 1004 1005// TestKVForLearner ensures learner member only accepts serializable read request. 1006func TestKVForLearner(t *testing.T) { 1007 defer testutil.AfterTest(t) 1008 1009 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 1010 defer clus.Terminate(t) 1011 1012 // we have to add and launch learner member after initial cluster was created, because 1013 // bootstrapping a cluster with learner member is not supported. 1014 clus.AddAndLaunchLearnerMember(t) 1015 1016 learners, err := clus.GetLearnerMembers() 1017 if err != nil { 1018 t.Fatalf("failed to get the learner members in cluster: %v", err) 1019 } 1020 if len(learners) != 1 { 1021 t.Fatalf("added 1 learner to cluster, got %d", len(learners)) 1022 } 1023 1024 if len(clus.Members) != 4 { 1025 t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members)) 1026 } 1027 // note: 1028 // 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members 1029 // 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config, 1030 // because the implementation of integration test has diverged from embed/etcd.go. 1031 learnerEp := clus.Members[3].GRPCAddr() 1032 cfg := clientv3.Config{ 1033 Endpoints: []string{learnerEp}, 1034 DialTimeout: 5 * time.Second, 1035 DialOptions: []grpc.DialOption{grpc.WithBlock()}, 1036 } 1037 // this client only has endpoint of the learner member 1038 cli, err := clientv3.New(cfg) 1039 if err != nil { 1040 t.Fatalf("failed to create clientv3: %v", err) 1041 } 1042 defer cli.Close() 1043 1044 // wait until learner member is ready 1045 <-clus.Members[3].ReadyNotify() 1046 1047 tests := []struct { 1048 op clientv3.Op 1049 wErr bool 1050 }{ 1051 { 1052 op: clientv3.OpGet("foo", clientv3.WithSerializable()), 1053 wErr: false, 1054 }, 1055 { 1056 op: clientv3.OpGet("foo"), 1057 wErr: true, 1058 }, 1059 { 1060 op: clientv3.OpPut("foo", "bar"), 1061 wErr: true, 1062 }, 1063 { 1064 op: clientv3.OpDelete("foo"), 1065 wErr: true, 1066 }, 1067 { 1068 op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil), 1069 wErr: true, 1070 }, 1071 } 1072 1073 for idx, test := range tests { 1074 _, err := cli.Do(context.TODO(), test.op) 1075 if err != nil && !test.wErr { 1076 t.Errorf("%d: expect no error, got %v", idx, err) 1077 } 1078 if err == nil && test.wErr { 1079 t.Errorf("%d: expect error, got nil", idx) 1080 } 1081 } 1082} 1083 1084// TestBalancerSupportLearner verifies that balancer's retry and failover mechanism supports cluster with learner member 1085func TestBalancerSupportLearner(t *testing.T) { 1086 defer testutil.AfterTest(t) 1087 1088 clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) 1089 defer clus.Terminate(t) 1090 1091 // we have to add and launch learner member after initial cluster was created, because 1092 // bootstrapping a cluster with learner member is not supported. 1093 clus.AddAndLaunchLearnerMember(t) 1094 1095 learners, err := clus.GetLearnerMembers() 1096 if err != nil { 1097 t.Fatalf("failed to get the learner members in cluster: %v", err) 1098 } 1099 if len(learners) != 1 { 1100 t.Fatalf("added 1 learner to cluster, got %d", len(learners)) 1101 } 1102 1103 // clus.Members[3] is the newly added learner member, which was appended to clus.Members 1104 learnerEp := clus.Members[3].GRPCAddr() 1105 cfg := clientv3.Config{ 1106 Endpoints: []string{learnerEp}, 1107 DialTimeout: 5 * time.Second, 1108 DialOptions: []grpc.DialOption{grpc.WithBlock()}, 1109 } 1110 cli, err := clientv3.New(cfg) 1111 if err != nil { 1112 t.Fatalf("failed to create clientv3: %v", err) 1113 } 1114 defer cli.Close() 1115 1116 // wait until learner member is ready 1117 <-clus.Members[3].ReadyNotify() 1118 1119 if _, err := cli.Get(context.Background(), "foo"); err == nil { 1120 t.Fatalf("expect Get request to learner to fail, got no error") 1121 } 1122 1123 eps := []string{learnerEp, clus.Members[0].GRPCAddr()} 1124 cli.SetEndpoints(eps...) 1125 if _, err := cli.Get(context.Background(), "foo"); err != nil { 1126 t.Errorf("expect no error (balancer should retry when request to learner fails), got error: %v", err) 1127 } 1128} 1129