1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "context" 19 "fmt" 20 "log" 21 "math/rand" 22 "os" 23 "strconv" 24 "strings" 25 "testing" 26 "time" 27 28 "go.etcd.io/etcd/client" 29 "go.etcd.io/etcd/etcdserver" 30 "go.etcd.io/etcd/pkg/testutil" 31) 32 33func init() { 34 // open microsecond-level time log for integration test debugging 35 log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile) 36 if t := os.Getenv("ETCD_ELECTION_TIMEOUT_TICKS"); t != "" { 37 if i, err := strconv.ParseInt(t, 10, 64); err == nil { 38 electionTicks = int(i) 39 } 40 } 41} 42 43func TestClusterOf1(t *testing.T) { testCluster(t, 1) } 44func TestClusterOf3(t *testing.T) { testCluster(t, 3) } 45 46func testCluster(t *testing.T, size int) { 47 defer testutil.AfterTest(t) 48 c := NewCluster(t, size) 49 c.Launch(t) 50 defer c.Terminate(t) 51 clusterMustProgress(t, c.Members) 52} 53 54func TestTLSClusterOf3(t *testing.T) { 55 defer testutil.AfterTest(t) 56 c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) 57 c.Launch(t) 58 defer c.Terminate(t) 59 clusterMustProgress(t, c.Members) 60} 61 62func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) } 63func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) } 64 65func testClusterUsingDiscovery(t *testing.T, size int) { 66 defer testutil.AfterTest(t) 67 dc := NewCluster(t, 1) 68 dc.Launch(t) 69 defer dc.Terminate(t) 70 // init discovery token space 71 dcc := MustNewHTTPClient(t, dc.URLs(), nil) 72 dkapi := client.NewKeysAPI(dcc) 73 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 74 if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil { 75 t.Fatal(err) 76 } 77 cancel() 78 79 c := NewClusterByConfig( 80 t, 81 &ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"}, 82 ) 83 c.Launch(t) 84 defer c.Terminate(t) 85 clusterMustProgress(t, c.Members) 86} 87 88func TestTLSClusterOf3UsingDiscovery(t *testing.T) { 89 defer testutil.AfterTest(t) 90 dc := NewCluster(t, 1) 91 dc.Launch(t) 92 defer dc.Terminate(t) 93 // init discovery token space 94 dcc := MustNewHTTPClient(t, dc.URLs(), nil) 95 dkapi := client.NewKeysAPI(dcc) 96 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 97 if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil { 98 t.Fatal(err) 99 } 100 cancel() 101 102 c := NewClusterByConfig(t, 103 &ClusterConfig{ 104 Size: 3, 105 PeerTLS: &testTLSInfo, 106 DiscoveryURL: dc.URL(0) + "/v2/keys"}, 107 ) 108 c.Launch(t) 109 defer c.Terminate(t) 110 clusterMustProgress(t, c.Members) 111} 112 113func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) } 114func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) } 115 116func testDoubleClusterSize(t *testing.T, size int) { 117 defer testutil.AfterTest(t) 118 c := NewCluster(t, size) 119 c.Launch(t) 120 defer c.Terminate(t) 121 122 for i := 0; i < size; i++ { 123 c.AddMember(t) 124 } 125 clusterMustProgress(t, c.Members) 126} 127 128func TestDoubleTLSClusterSizeOf3(t *testing.T) { 129 defer testutil.AfterTest(t) 130 c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo}) 131 c.Launch(t) 132 defer c.Terminate(t) 133 134 for i := 0; i < 3; i++ { 135 c.AddMember(t) 136 } 137 clusterMustProgress(t, c.Members) 138} 139 140func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) } 141func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) } 142 143func testDecreaseClusterSize(t *testing.T, size int) { 144 defer testutil.AfterTest(t) 145 c := NewCluster(t, size) 146 c.Launch(t) 147 defer c.Terminate(t) 148 149 // TODO: remove the last but one member 150 for i := 0; i < size-1; i++ { 151 id := c.Members[len(c.Members)-1].s.ID() 152 // may hit second leader election on slow machines 153 if err := c.removeMember(t, uint64(id)); err != nil { 154 if strings.Contains(err.Error(), "no leader") { 155 t.Logf("got leader error (%v)", err) 156 i-- 157 continue 158 } 159 t.Fatal(err) 160 } 161 c.waitLeader(t, c.Members) 162 } 163 clusterMustProgress(t, c.Members) 164} 165 166func TestForceNewCluster(t *testing.T) { 167 c := NewCluster(t, 3) 168 c.Launch(t) 169 cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) 170 kapi := client.NewKeysAPI(cc) 171 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 172 resp, err := kapi.Create(ctx, "/foo", "bar") 173 if err != nil { 174 t.Fatalf("unexpected create error: %v", err) 175 } 176 cancel() 177 // ensure create has been applied in this machine 178 ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) 179 if _, err = kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil { 180 t.Fatalf("unexpected watch error: %v", err) 181 } 182 cancel() 183 184 c.Members[0].Stop(t) 185 c.Members[1].Terminate(t) 186 c.Members[2].Terminate(t) 187 c.Members[0].ForceNewCluster = true 188 err = c.Members[0].Restart(t) 189 if err != nil { 190 t.Fatalf("unexpected ForceRestart error: %v", err) 191 } 192 defer c.Members[0].Terminate(t) 193 c.waitLeader(t, c.Members[:1]) 194 195 // use new http client to init new connection 196 cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) 197 kapi = client.NewKeysAPI(cc) 198 // ensure force restart keep the old data, and new cluster can make progress 199 ctx, cancel = context.WithTimeout(context.Background(), requestTimeout) 200 if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil { 201 t.Fatalf("unexpected watch error: %v", err) 202 } 203 cancel() 204 clusterMustProgress(t, c.Members[:1]) 205} 206 207func TestAddMemberAfterClusterFullRotation(t *testing.T) { 208 defer testutil.AfterTest(t) 209 c := NewCluster(t, 3) 210 c.Launch(t) 211 defer c.Terminate(t) 212 213 // remove all the previous three members and add in three new members. 214 for i := 0; i < 3; i++ { 215 c.RemoveMember(t, uint64(c.Members[0].s.ID())) 216 c.waitLeader(t, c.Members) 217 218 c.AddMember(t) 219 c.waitLeader(t, c.Members) 220 } 221 222 c.AddMember(t) 223 c.waitLeader(t, c.Members) 224 225 clusterMustProgress(t, c.Members) 226} 227 228// Ensure we can remove a member then add a new one back immediately. 229func TestIssue2681(t *testing.T) { 230 defer testutil.AfterTest(t) 231 c := NewCluster(t, 5) 232 c.Launch(t) 233 defer c.Terminate(t) 234 235 c.RemoveMember(t, uint64(c.Members[4].s.ID())) 236 c.waitLeader(t, c.Members) 237 238 c.AddMember(t) 239 c.waitLeader(t, c.Members) 240 clusterMustProgress(t, c.Members) 241} 242 243// Ensure we can remove a member after a snapshot then add a new one back. 244func TestIssue2746(t *testing.T) { testIssue2746(t, 5) } 245 246// With 3 nodes TestIssue2476 sometimes had a shutdown with an inflight snapshot. 247func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) } 248 249func testIssue2746(t *testing.T, members int) { 250 defer testutil.AfterTest(t) 251 c := NewCluster(t, members) 252 253 for _, m := range c.Members { 254 m.SnapshotCount = 10 255 } 256 257 c.Launch(t) 258 defer c.Terminate(t) 259 260 // force a snapshot 261 for i := 0; i < 20; i++ { 262 clusterMustProgress(t, c.Members) 263 } 264 265 c.RemoveMember(t, uint64(c.Members[members-1].s.ID())) 266 c.waitLeader(t, c.Members) 267 268 c.AddMember(t) 269 c.waitLeader(t, c.Members) 270 clusterMustProgress(t, c.Members) 271} 272 273// Ensure etcd will not panic when removing a just started member. 274func TestIssue2904(t *testing.T) { 275 defer testutil.AfterTest(t) 276 // start 1-member cluster to ensure member 0 is the leader of the cluster. 277 c := NewCluster(t, 1) 278 c.Launch(t) 279 defer c.Terminate(t) 280 281 c.AddMember(t) 282 c.Members[1].Stop(t) 283 284 // send remove member-1 request to the cluster. 285 cc := MustNewHTTPClient(t, c.URLs(), nil) 286 ma := client.NewMembersAPI(cc) 287 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 288 // the proposal is not committed because member 1 is stopped, but the 289 // proposal is appended to leader's raft log. 290 ma.Remove(ctx, c.Members[1].s.ID().String()) 291 cancel() 292 293 // restart member, and expect it to send UpdateAttributes request. 294 // the log in the leader is like this: 295 // [..., remove 1, ..., update attr 1, ...] 296 c.Members[1].Restart(t) 297 // when the member comes back, it ack the proposal to remove itself, 298 // and apply it. 299 <-c.Members[1].s.StopNotify() 300 301 // terminate removed member 302 c.Members[1].Terminate(t) 303 c.Members = c.Members[:1] 304 // wait member to be removed. 305 c.waitMembersMatch(t, c.HTTPMembers()) 306} 307 308// TestIssue3699 tests minority failure during cluster configuration; it was 309// deadlocking. 310func TestIssue3699(t *testing.T) { 311 // start a cluster of 3 nodes a, b, c 312 defer testutil.AfterTest(t) 313 c := NewCluster(t, 3) 314 c.Launch(t) 315 defer c.Terminate(t) 316 317 // make node a unavailable 318 c.Members[0].Stop(t) 319 320 // add node d 321 c.AddMember(t) 322 323 // electing node d as leader makes node a unable to participate 324 leaderID := c.waitLeader(t, c.Members) 325 for leaderID != 3 { 326 c.Members[leaderID].Stop(t) 327 <-c.Members[leaderID].s.StopNotify() 328 // do not restart the killed member immediately. 329 // the member will advance its election timeout after restart, 330 // so it will have a better chance to become the leader again. 331 time.Sleep(time.Duration(electionTicks * int(tickDuration))) 332 c.Members[leaderID].Restart(t) 333 leaderID = c.waitLeader(t, c.Members) 334 } 335 336 // bring back node a 337 // node a will remain useless as long as d is the leader. 338 if err := c.Members[0].Restart(t); err != nil { 339 t.Fatal(err) 340 } 341 select { 342 // waiting for ReadyNotify can take several seconds 343 case <-time.After(10 * time.Second): 344 t.Fatalf("waited too long for ready notification") 345 case <-c.Members[0].s.StopNotify(): 346 t.Fatalf("should not be stopped") 347 case <-c.Members[0].s.ReadyNotify(): 348 } 349 // must waitLeader so goroutines don't leak on terminate 350 c.waitLeader(t, c.Members) 351 352 // try to participate in cluster 353 cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS) 354 kapi := client.NewKeysAPI(cc) 355 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 356 if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil { 357 t.Fatalf("unexpected error on Set (%v)", err) 358 } 359 cancel() 360} 361 362// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. 363func TestRejectUnhealthyAdd(t *testing.T) { 364 defer testutil.AfterTest(t) 365 c := NewCluster(t, 3) 366 for _, m := range c.Members { 367 m.ServerConfig.StrictReconfigCheck = true 368 } 369 c.Launch(t) 370 defer c.Terminate(t) 371 372 // make cluster unhealthy and wait for downed peer 373 c.Members[0].Stop(t) 374 c.WaitLeader(t) 375 376 // all attempts to add member should fail 377 for i := 1; i < len(c.Members); i++ { 378 err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345") 379 if err == nil { 380 t.Fatalf("should have failed adding peer") 381 } 382 // TODO: client should return descriptive error codes for internal errors 383 if !strings.Contains(err.Error(), "has no leader") { 384 t.Errorf("unexpected error (%v)", err) 385 } 386 } 387 388 // make cluster healthy 389 c.Members[0].Restart(t) 390 c.WaitLeader(t) 391 time.Sleep(2 * etcdserver.HealthInterval) 392 393 // add member should succeed now that it's healthy 394 var err error 395 for i := 1; i < len(c.Members); i++ { 396 if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil { 397 break 398 } 399 } 400 if err != nil { 401 t.Fatalf("should have added peer to healthy cluster (%v)", err) 402 } 403} 404 405// TestRejectUnhealthyRemove ensures an unhealthy cluster rejects removing members 406// if quorum will be lost. 407func TestRejectUnhealthyRemove(t *testing.T) { 408 defer testutil.AfterTest(t) 409 c := NewCluster(t, 5) 410 for _, m := range c.Members { 411 m.ServerConfig.StrictReconfigCheck = true 412 } 413 c.Launch(t) 414 defer c.Terminate(t) 415 416 // make cluster unhealthy and wait for downed peer; (3 up, 2 down) 417 c.Members[0].Stop(t) 418 c.Members[1].Stop(t) 419 c.WaitLeader(t) 420 421 // reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum 422 err := c.removeMember(t, uint64(c.Members[2].s.ID())) 423 if err == nil { 424 t.Fatalf("should reject quorum breaking remove") 425 } 426 // TODO: client should return more descriptive error codes for internal errors 427 if !strings.Contains(err.Error(), "has no leader") { 428 t.Errorf("unexpected error (%v)", err) 429 } 430 431 // member stopped after launch; wait for missing heartbeats 432 time.Sleep(time.Duration(electionTicks * int(tickDuration))) 433 434 // permit remove dead member since (3,2) - (0,1) => (3,1) has quorum 435 if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil { 436 t.Fatalf("should accept removing down member") 437 } 438 439 // bring cluster to (4,1) 440 c.Members[0].Restart(t) 441 442 // restarted member must be connected for a HealthInterval before remove is accepted 443 time.Sleep((3 * etcdserver.HealthInterval) / 2) 444 445 // accept remove member since (4,1)-(1,0) => (3,1) has quorum 446 if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil { 447 t.Fatalf("expected to remove member, got error %v", err) 448 } 449} 450 451// TestRestartRemoved ensures that restarting removed member must exit 452// if 'initial-cluster-state' is set 'new' and old data directory still exists 453// (see https://github.com/etcd-io/etcd/issues/7512 for more). 454func TestRestartRemoved(t *testing.T) { 455 defer testutil.AfterTest(t) 456 457 // 1. start single-member cluster 458 c := NewCluster(t, 1) 459 for _, m := range c.Members { 460 m.ServerConfig.StrictReconfigCheck = true 461 } 462 c.Launch(t) 463 defer c.Terminate(t) 464 465 // 2. add a new member 466 c.AddMember(t) 467 c.WaitLeader(t) 468 469 oldm := c.Members[0] 470 oldm.keepDataDirTerminate = true 471 472 // 3. remove first member, shut down without deleting data 473 if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil { 474 t.Fatalf("expected to remove member, got error %v", err) 475 } 476 c.WaitLeader(t) 477 478 // 4. restart first member with 'initial-cluster-state=new' 479 // wrong config, expects exit within ReqTimeout 480 oldm.ServerConfig.NewCluster = false 481 if err := oldm.Restart(t); err != nil { 482 t.Fatalf("unexpected ForceRestart error: %v", err) 483 } 484 defer func() { 485 oldm.Close() 486 os.RemoveAll(oldm.ServerConfig.DataDir) 487 }() 488 select { 489 case <-oldm.s.StopNotify(): 490 case <-time.After(time.Minute): 491 t.Fatalf("removed member didn't exit within %v", time.Minute) 492 } 493} 494 495// clusterMustProgress ensures that cluster can make progress. It creates 496// a random key first, and check the new key could be got from all client urls 497// of the cluster. 498func clusterMustProgress(t *testing.T, membs []*member) { 499 cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil) 500 kapi := client.NewKeysAPI(cc) 501 key := fmt.Sprintf("foo%d", rand.Int()) 502 var ( 503 err error 504 resp *client.Response 505 ) 506 // retry in case of leader loss induced by slow CI 507 for i := 0; i < 3; i++ { 508 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 509 resp, err = kapi.Create(ctx, "/"+key, "bar") 510 cancel() 511 if err == nil { 512 break 513 } 514 t.Logf("failed to create key on %q (%v)", membs[0].URL(), err) 515 } 516 if err != nil { 517 t.Fatalf("create on %s error: %v", membs[0].URL(), err) 518 } 519 520 for i, m := range membs { 521 u := m.URL() 522 mcc := MustNewHTTPClient(t, []string{u}, nil) 523 mkapi := client.NewKeysAPI(mcc) 524 mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout) 525 if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil { 526 t.Fatalf("#%d: watch on %s error: %v", i, u, err) 527 } 528 mcancel() 529 } 530} 531 532func TestSpeedyTerminate(t *testing.T) { 533 defer testutil.AfterTest(t) 534 clus := NewClusterV3(t, &ClusterConfig{Size: 3}) 535 // Stop/Restart so requests will time out on lost leaders 536 for i := 0; i < 3; i++ { 537 clus.Members[i].Stop(t) 538 clus.Members[i].Restart(t) 539 } 540 donec := make(chan struct{}) 541 go func() { 542 defer close(donec) 543 clus.Terminate(t) 544 }() 545 select { 546 case <-time.After(10 * time.Second): 547 t.Fatalf("cluster took too long to terminate") 548 case <-donec: 549 } 550} 551