1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "context" 19 "crypto/tls" 20 "fmt" 21 "io/ioutil" 22 "log" 23 "math/rand" 24 "net" 25 "net/http" 26 "net/http/httptest" 27 "os" 28 "reflect" 29 "sort" 30 "strings" 31 "sync" 32 "sync/atomic" 33 "testing" 34 "time" 35 36 "go.etcd.io/etcd/client" 37 "go.etcd.io/etcd/clientv3" 38 "go.etcd.io/etcd/embed" 39 "go.etcd.io/etcd/etcdserver" 40 "go.etcd.io/etcd/etcdserver/api/etcdhttp" 41 "go.etcd.io/etcd/etcdserver/api/rafthttp" 42 "go.etcd.io/etcd/etcdserver/api/v2http" 43 "go.etcd.io/etcd/etcdserver/api/v3client" 44 "go.etcd.io/etcd/etcdserver/api/v3election" 45 epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb" 46 "go.etcd.io/etcd/etcdserver/api/v3lock" 47 lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb" 48 "go.etcd.io/etcd/etcdserver/api/v3rpc" 49 pb "go.etcd.io/etcd/etcdserver/etcdserverpb" 50 "go.etcd.io/etcd/pkg/logutil" 51 "go.etcd.io/etcd/pkg/testutil" 52 "go.etcd.io/etcd/pkg/tlsutil" 53 "go.etcd.io/etcd/pkg/transport" 54 "go.etcd.io/etcd/pkg/types" 55 56 "github.com/soheilhy/cmux" 57 "go.uber.org/zap" 58 "golang.org/x/crypto/bcrypt" 59 "google.golang.org/grpc" 60 "google.golang.org/grpc/grpclog" 61 "google.golang.org/grpc/keepalive" 62) 63 64const ( 65 // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss. 66 RequestWaitTimeout = 3 * time.Second 67 tickDuration = 10 * time.Millisecond 68 requestTimeout = 20 * time.Second 69 70 clusterName = "etcd" 71 basePort = 21000 72 URLScheme = "unix" 73 URLSchemeTLS = "unixs" 74) 75 76var ( 77 electionTicks = 10 78 79 // integration test uses unique ports, counting up, to listen for each 80 // member, ensuring restarted members can listen on the same port again. 81 localListenCount = int64(0) 82 83 testTLSInfo = transport.TLSInfo{ 84 KeyFile: "./fixtures/server.key.insecure", 85 CertFile: "./fixtures/server.crt", 86 TrustedCAFile: "./fixtures/ca.crt", 87 ClientCertAuth: true, 88 } 89 90 testTLSInfoIP = transport.TLSInfo{ 91 KeyFile: "./fixtures/server-ip.key.insecure", 92 CertFile: "./fixtures/server-ip.crt", 93 TrustedCAFile: "./fixtures/ca.crt", 94 ClientCertAuth: true, 95 } 96 97 testTLSInfoExpired = transport.TLSInfo{ 98 KeyFile: "./fixtures-expired/server.key.insecure", 99 CertFile: "./fixtures-expired/server.crt", 100 TrustedCAFile: "./fixtures-expired/ca.crt", 101 ClientCertAuth: true, 102 } 103 104 testTLSInfoExpiredIP = transport.TLSInfo{ 105 KeyFile: "./fixtures-expired/server-ip.key.insecure", 106 CertFile: "./fixtures-expired/server-ip.crt", 107 TrustedCAFile: "./fixtures-expired/ca.crt", 108 ClientCertAuth: true, 109 } 110 111 defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s" 112 113 lg = zap.NewNop() 114) 115 116func init() { 117 if os.Getenv("CLUSTER_DEBUG") != "" { 118 lg, _ = zap.NewProduction() 119 } 120} 121 122type ClusterConfig struct { 123 Size int 124 PeerTLS *transport.TLSInfo 125 ClientTLS *transport.TLSInfo 126 127 DiscoveryURL string 128 129 AuthToken string 130 131 UseGRPC bool 132 133 QuotaBackendBytes int64 134 135 MaxTxnOps uint 136 MaxRequestBytes uint 137 SnapshotCount uint64 138 SnapshotCatchUpEntries uint64 139 140 GRPCKeepAliveMinTime time.Duration 141 GRPCKeepAliveInterval time.Duration 142 GRPCKeepAliveTimeout time.Duration 143 144 // SkipCreatingClient to skip creating clients for each member. 145 SkipCreatingClient bool 146 147 ClientMaxCallSendMsgSize int 148 ClientMaxCallRecvMsgSize int 149 150 // UseIP is true to use only IP for gRPC requests. 151 UseIP bool 152 153 EnableLeaseCheckpoint bool 154 LeaseCheckpointInterval time.Duration 155} 156 157type cluster struct { 158 cfg *ClusterConfig 159 Members []*member 160} 161 162func schemeFromTLSInfo(tls *transport.TLSInfo) string { 163 if tls == nil { 164 return URLScheme 165 } 166 return URLSchemeTLS 167} 168 169func (c *cluster) fillClusterForMembers() error { 170 if c.cfg.DiscoveryURL != "" { 171 // cluster will be discovered 172 return nil 173 } 174 175 addrs := make([]string, 0) 176 for _, m := range c.Members { 177 scheme := schemeFromTLSInfo(m.PeerTLSInfo) 178 for _, l := range m.PeerListeners { 179 addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) 180 } 181 } 182 clusterStr := strings.Join(addrs, ",") 183 var err error 184 for _, m := range c.Members { 185 m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 186 if err != nil { 187 return err 188 } 189 } 190 return nil 191} 192 193func newCluster(t testing.TB, cfg *ClusterConfig) *cluster { 194 c := &cluster{cfg: cfg} 195 ms := make([]*member, cfg.Size) 196 for i := 0; i < cfg.Size; i++ { 197 ms[i] = c.mustNewMember(t) 198 } 199 c.Members = ms 200 if err := c.fillClusterForMembers(); err != nil { 201 t.Fatal(err) 202 } 203 204 return c 205} 206 207// NewCluster returns an unlaunched cluster of the given size which has been 208// set to use static bootstrap. 209func NewCluster(t testing.TB, size int) *cluster { 210 return newCluster(t, &ClusterConfig{Size: size}) 211} 212 213// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration 214func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster { 215 return newCluster(t, cfg) 216} 217 218func (c *cluster) Launch(t testing.TB) { 219 errc := make(chan error) 220 for _, m := range c.Members { 221 // Members are launched in separate goroutines because if they boot 222 // using discovery url, they have to wait for others to register to continue. 223 go func(m *member) { 224 errc <- m.Launch() 225 }(m) 226 } 227 for range c.Members { 228 if err := <-errc; err != nil { 229 t.Fatalf("error setting up member: %v", err) 230 } 231 } 232 // wait cluster to be stable to receive future client requests 233 c.waitMembersMatch(t, c.HTTPMembers()) 234 c.waitVersion() 235} 236 237func (c *cluster) URL(i int) string { 238 return c.Members[i].ClientURLs[0].String() 239} 240 241// URLs returns a list of all active client URLs in the cluster 242func (c *cluster) URLs() []string { 243 return getMembersURLs(c.Members) 244} 245 246func getMembersURLs(members []*member) []string { 247 urls := make([]string, 0) 248 for _, m := range members { 249 select { 250 case <-m.s.StopNotify(): 251 continue 252 default: 253 } 254 for _, u := range m.ClientURLs { 255 urls = append(urls, u.String()) 256 } 257 } 258 return urls 259} 260 261// HTTPMembers returns a list of all active members as client.Members 262func (c *cluster) HTTPMembers() []client.Member { 263 ms := []client.Member{} 264 for _, m := range c.Members { 265 pScheme := schemeFromTLSInfo(m.PeerTLSInfo) 266 cScheme := schemeFromTLSInfo(m.ClientTLSInfo) 267 cm := client.Member{Name: m.Name} 268 for _, ln := range m.PeerListeners { 269 cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String()) 270 } 271 for _, ln := range m.ClientListeners { 272 cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String()) 273 } 274 ms = append(ms, cm) 275 } 276 return ms 277} 278 279func (c *cluster) mustNewMember(t testing.TB) *member { 280 m := mustNewMember(t, 281 memberConfig{ 282 name: c.name(rand.Int()), 283 authToken: c.cfg.AuthToken, 284 peerTLS: c.cfg.PeerTLS, 285 clientTLS: c.cfg.ClientTLS, 286 quotaBackendBytes: c.cfg.QuotaBackendBytes, 287 maxTxnOps: c.cfg.MaxTxnOps, 288 maxRequestBytes: c.cfg.MaxRequestBytes, 289 snapshotCount: c.cfg.SnapshotCount, 290 snapshotCatchUpEntries: c.cfg.SnapshotCatchUpEntries, 291 grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, 292 grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, 293 grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, 294 clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, 295 clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, 296 useIP: c.cfg.UseIP, 297 enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, 298 leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, 299 }) 300 m.DiscoveryURL = c.cfg.DiscoveryURL 301 if c.cfg.UseGRPC { 302 if err := m.listenGRPC(); err != nil { 303 t.Fatal(err) 304 } 305 } 306 return m 307} 308 309func (c *cluster) addMember(t testing.TB) { 310 m := c.mustNewMember(t) 311 312 scheme := schemeFromTLSInfo(c.cfg.PeerTLS) 313 314 // send add request to the cluster 315 var err error 316 for i := 0; i < len(c.Members); i++ { 317 clientURL := c.URL(i) 318 peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() 319 if err = c.addMemberByURL(t, clientURL, peerURL); err == nil { 320 break 321 } 322 } 323 if err != nil { 324 t.Fatalf("add member failed on all members error: %v", err) 325 } 326 327 m.InitialPeerURLsMap = types.URLsMap{} 328 for _, mm := range c.Members { 329 m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs 330 } 331 m.InitialPeerURLsMap[m.Name] = m.PeerURLs 332 m.NewCluster = false 333 if err := m.Launch(); err != nil { 334 t.Fatal(err) 335 } 336 c.Members = append(c.Members, m) 337 // wait cluster to be stable to receive future client requests 338 c.waitMembersMatch(t, c.HTTPMembers()) 339} 340 341func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error { 342 cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) 343 ma := client.NewMembersAPI(cc) 344 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 345 _, err := ma.Add(ctx, peerURL) 346 cancel() 347 if err != nil { 348 return err 349 } 350 351 // wait for the add node entry applied in the cluster 352 members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) 353 c.waitMembersMatch(t, members) 354 return nil 355} 356 357func (c *cluster) AddMember(t testing.TB) { 358 c.addMember(t) 359} 360 361func (c *cluster) RemoveMember(t testing.TB, id uint64) { 362 if err := c.removeMember(t, id); err != nil { 363 t.Fatal(err) 364 } 365} 366 367func (c *cluster) removeMember(t testing.TB, id uint64) error { 368 // send remove request to the cluster 369 cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) 370 ma := client.NewMembersAPI(cc) 371 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 372 err := ma.Remove(ctx, types.ID(id).String()) 373 cancel() 374 if err != nil { 375 return err 376 } 377 newMembers := make([]*member, 0) 378 for _, m := range c.Members { 379 if uint64(m.s.ID()) != id { 380 newMembers = append(newMembers, m) 381 } else { 382 select { 383 case <-m.s.StopNotify(): 384 m.Terminate(t) 385 // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout 386 // TODO: remove connection write timeout by selecting on http response closeNotifier 387 // blocking on https://github.com/golang/go/issues/9524 388 case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout): 389 t.Fatalf("failed to remove member %s in time", m.s.ID()) 390 } 391 } 392 } 393 c.Members = newMembers 394 c.waitMembersMatch(t, c.HTTPMembers()) 395 return nil 396} 397 398func (c *cluster) Terminate(t testing.TB) { 399 var wg sync.WaitGroup 400 wg.Add(len(c.Members)) 401 for _, m := range c.Members { 402 go func(mm *member) { 403 defer wg.Done() 404 mm.Terminate(t) 405 }(m) 406 } 407 wg.Wait() 408} 409 410func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) { 411 for _, u := range c.URLs() { 412 cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) 413 ma := client.NewMembersAPI(cc) 414 for { 415 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 416 ms, err := ma.List(ctx) 417 cancel() 418 if err == nil && isMembersEqual(ms, membs) { 419 break 420 } 421 time.Sleep(tickDuration) 422 } 423 } 424} 425 426func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) } 427 428// waitLeader waits until given members agree on the same leader. 429func (c *cluster) waitLeader(t testing.TB, membs []*member) int { 430 possibleLead := make(map[uint64]bool) 431 var lead uint64 432 for _, m := range membs { 433 possibleLead[uint64(m.s.ID())] = true 434 } 435 cc := MustNewHTTPClient(t, getMembersURLs(membs), nil) 436 kapi := client.NewKeysAPI(cc) 437 438 // ensure leader is up via linearizable get 439 for { 440 ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second) 441 _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true}) 442 cancel() 443 if err == nil || strings.Contains(err.Error(), "Key not found") { 444 break 445 } 446 } 447 448 for lead == 0 || !possibleLead[lead] { 449 lead = 0 450 for _, m := range membs { 451 select { 452 case <-m.s.StopNotify(): 453 continue 454 default: 455 } 456 if lead != 0 && lead != m.s.Lead() { 457 lead = 0 458 time.Sleep(10 * tickDuration) 459 break 460 } 461 lead = m.s.Lead() 462 } 463 } 464 465 for i, m := range membs { 466 if uint64(m.s.ID()) == lead { 467 return i 468 } 469 } 470 471 return -1 472} 473 474func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) } 475 476// waitNoLeader waits until given members lose leader. 477func (c *cluster) waitNoLeader(membs []*member) { 478 noLeader := false 479 for !noLeader { 480 noLeader = true 481 for _, m := range membs { 482 select { 483 case <-m.s.StopNotify(): 484 continue 485 default: 486 } 487 if m.s.Lead() != 0 { 488 noLeader = false 489 time.Sleep(10 * tickDuration) 490 break 491 } 492 } 493 } 494} 495 496func (c *cluster) waitVersion() { 497 for _, m := range c.Members { 498 for { 499 if m.s.ClusterVersion() != nil { 500 break 501 } 502 time.Sleep(tickDuration) 503 } 504 } 505} 506 507func (c *cluster) name(i int) string { 508 return fmt.Sprint(i) 509} 510 511// isMembersEqual checks whether two members equal except ID field. 512// The given wmembs should always set ID field to empty string. 513func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { 514 sort.Sort(SortableMemberSliceByPeerURLs(membs)) 515 sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) 516 for i := range membs { 517 membs[i].ID = "" 518 } 519 return reflect.DeepEqual(membs, wmembs) 520} 521 522func newLocalListener(t testing.TB) net.Listener { 523 c := atomic.AddInt64(&localListenCount, 1) 524 // Go 1.8+ allows only numbers in port 525 addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid()) 526 return NewListenerWithAddr(t, addr) 527} 528 529func NewListenerWithAddr(t testing.TB, addr string) net.Listener { 530 l, err := transport.NewUnixListener(addr) 531 if err != nil { 532 t.Fatal(err) 533 } 534 return l 535} 536 537type member struct { 538 etcdserver.ServerConfig 539 PeerListeners, ClientListeners []net.Listener 540 grpcListener net.Listener 541 // PeerTLSInfo enables peer TLS when set 542 PeerTLSInfo *transport.TLSInfo 543 // ClientTLSInfo enables client TLS when set 544 ClientTLSInfo *transport.TLSInfo 545 DialOptions []grpc.DialOption 546 547 raftHandler *testutil.PauseableHandler 548 s *etcdserver.EtcdServer 549 serverClosers []func() 550 551 grpcServerOpts []grpc.ServerOption 552 grpcServer *grpc.Server 553 grpcServerPeer *grpc.Server 554 grpcAddr string 555 grpcBridge *bridge 556 557 // serverClient is a clientv3 that directly calls the etcdserver. 558 serverClient *clientv3.Client 559 560 keepDataDirTerminate bool 561 clientMaxCallSendMsgSize int 562 clientMaxCallRecvMsgSize int 563 useIP bool 564 565 isLearner bool 566} 567 568func (m *member) GRPCAddr() string { return m.grpcAddr } 569 570type memberConfig struct { 571 name string 572 peerTLS *transport.TLSInfo 573 clientTLS *transport.TLSInfo 574 authToken string 575 quotaBackendBytes int64 576 maxTxnOps uint 577 maxRequestBytes uint 578 snapshotCount uint64 579 snapshotCatchUpEntries uint64 580 grpcKeepAliveMinTime time.Duration 581 grpcKeepAliveInterval time.Duration 582 grpcKeepAliveTimeout time.Duration 583 clientMaxCallSendMsgSize int 584 clientMaxCallRecvMsgSize int 585 useIP bool 586 enableLeaseCheckpoint bool 587 leaseCheckpointInterval time.Duration 588} 589 590// mustNewMember return an inited member with the given name. If peerTLS is 591// set, it will use https scheme to communicate between peers. 592func mustNewMember(t testing.TB, mcfg memberConfig) *member { 593 var err error 594 m := &member{} 595 596 peerScheme := schemeFromTLSInfo(mcfg.peerTLS) 597 clientScheme := schemeFromTLSInfo(mcfg.clientTLS) 598 599 pln := newLocalListener(t) 600 m.PeerListeners = []net.Listener{pln} 601 m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()}) 602 if err != nil { 603 t.Fatal(err) 604 } 605 m.PeerTLSInfo = mcfg.peerTLS 606 607 cln := newLocalListener(t) 608 m.ClientListeners = []net.Listener{cln} 609 m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()}) 610 if err != nil { 611 t.Fatal(err) 612 } 613 m.ClientTLSInfo = mcfg.clientTLS 614 615 m.Name = mcfg.name 616 617 m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") 618 if err != nil { 619 t.Fatal(err) 620 } 621 clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String()) 622 m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 623 if err != nil { 624 t.Fatal(err) 625 } 626 m.InitialClusterToken = clusterName 627 m.NewCluster = true 628 m.BootstrapTimeout = 10 * time.Millisecond 629 if m.PeerTLSInfo != nil { 630 m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo 631 } 632 m.ElectionTicks = electionTicks 633 m.InitialElectionTickAdvance = true 634 m.TickMs = uint(tickDuration / time.Millisecond) 635 m.QuotaBackendBytes = mcfg.quotaBackendBytes 636 m.MaxTxnOps = mcfg.maxTxnOps 637 if m.MaxTxnOps == 0 { 638 m.MaxTxnOps = embed.DefaultMaxTxnOps 639 } 640 m.MaxRequestBytes = mcfg.maxRequestBytes 641 if m.MaxRequestBytes == 0 { 642 m.MaxRequestBytes = embed.DefaultMaxRequestBytes 643 } 644 m.SnapshotCount = etcdserver.DefaultSnapshotCount 645 if mcfg.snapshotCount != 0 { 646 m.SnapshotCount = mcfg.snapshotCount 647 } 648 m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries 649 if mcfg.snapshotCatchUpEntries != 0 { 650 m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries 651 } 652 653 // for the purpose of integration testing, simple token is enough 654 m.AuthToken = "simple" 655 if mcfg.authToken != "" { 656 m.AuthToken = mcfg.authToken 657 } 658 659 m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing 660 661 m.grpcServerOpts = []grpc.ServerOption{} 662 if mcfg.grpcKeepAliveMinTime > time.Duration(0) { 663 m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 664 MinTime: mcfg.grpcKeepAliveMinTime, 665 PermitWithoutStream: false, 666 })) 667 } 668 if mcfg.grpcKeepAliveInterval > time.Duration(0) && 669 mcfg.grpcKeepAliveTimeout > time.Duration(0) { 670 m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{ 671 Time: mcfg.grpcKeepAliveInterval, 672 Timeout: mcfg.grpcKeepAliveTimeout, 673 })) 674 } 675 m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize 676 m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize 677 m.useIP = mcfg.useIP 678 m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint 679 m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval 680 681 m.InitialCorruptCheck = true 682 683 lcfg := logutil.DefaultZapLoggerConfig 684 m.LoggerConfig = &lcfg 685 m.LoggerConfig.OutputPaths = []string{"/dev/null"} 686 m.LoggerConfig.ErrorOutputPaths = []string{"/dev/null"} 687 if os.Getenv("CLUSTER_DEBUG") != "" { 688 m.LoggerConfig.OutputPaths = []string{"stderr"} 689 m.LoggerConfig.ErrorOutputPaths = []string{"stderr"} 690 } 691 m.Logger, err = m.LoggerConfig.Build() 692 if err != nil { 693 t.Fatal(err) 694 } 695 return m 696} 697 698// listenGRPC starts a grpc server over a unix domain socket on the member 699func (m *member) listenGRPC() error { 700 // prefix with localhost so cert has right domain 701 m.grpcAddr = "localhost:" + m.Name 702 if m.useIP { // for IP-only TLS certs 703 m.grpcAddr = "127.0.0.1:" + m.Name 704 } 705 l, err := transport.NewUnixListener(m.grpcAddr) 706 if err != nil { 707 return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) 708 } 709 m.grpcBridge, err = newBridge(m.grpcAddr) 710 if err != nil { 711 l.Close() 712 return err 713 } 714 m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr 715 m.grpcListener = l 716 return nil 717} 718 719func (m *member) ElectionTimeout() time.Duration { 720 return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond 721} 722 723func (m *member) ID() types.ID { return m.s.ID() } 724 725func (m *member) DropConnections() { m.grpcBridge.Reset() } 726func (m *member) PauseConnections() { m.grpcBridge.Pause() } 727func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } 728func (m *member) Blackhole() { m.grpcBridge.Blackhole() } 729func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } 730 731// NewClientV3 creates a new grpc client connection to the member 732func NewClientV3(m *member) (*clientv3.Client, error) { 733 if m.grpcAddr == "" { 734 return nil, fmt.Errorf("member not configured for grpc") 735 } 736 737 cfg := clientv3.Config{ 738 Endpoints: []string{m.grpcAddr}, 739 DialTimeout: 5 * time.Second, 740 DialOptions: []grpc.DialOption{grpc.WithBlock()}, 741 MaxCallSendMsgSize: m.clientMaxCallSendMsgSize, 742 MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize, 743 } 744 745 if m.ClientTLSInfo != nil { 746 tls, err := m.ClientTLSInfo.ClientConfig() 747 if err != nil { 748 return nil, err 749 } 750 cfg.TLS = tls 751 } 752 if m.DialOptions != nil { 753 cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...) 754 } 755 return newClientV3(cfg) 756} 757 758// Clone returns a member with the same server configuration. The returned 759// member will not set PeerListeners and ClientListeners. 760func (m *member) Clone(t testing.TB) *member { 761 mm := &member{} 762 mm.ServerConfig = m.ServerConfig 763 764 var err error 765 clientURLStrs := m.ClientURLs.StringSlice() 766 mm.ClientURLs, err = types.NewURLs(clientURLStrs) 767 if err != nil { 768 // this should never fail 769 panic(err) 770 } 771 peerURLStrs := m.PeerURLs.StringSlice() 772 mm.PeerURLs, err = types.NewURLs(peerURLStrs) 773 if err != nil { 774 // this should never fail 775 panic(err) 776 } 777 clusterStr := m.InitialPeerURLsMap.String() 778 mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 779 if err != nil { 780 // this should never fail 781 panic(err) 782 } 783 mm.InitialClusterToken = m.InitialClusterToken 784 mm.ElectionTicks = m.ElectionTicks 785 mm.PeerTLSInfo = m.PeerTLSInfo 786 mm.ClientTLSInfo = m.ClientTLSInfo 787 return mm 788} 789 790// Launch starts a member based on ServerConfig, PeerListeners 791// and ClientListeners. 792func (m *member) Launch() error { 793 lg.Info( 794 "launching a member", 795 zap.String("name", m.Name), 796 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 797 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 798 zap.String("grpc-address", m.grpcAddr), 799 ) 800 var err error 801 if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil { 802 return fmt.Errorf("failed to initialize the etcd server: %v", err) 803 } 804 m.s.SyncTicker = time.NewTicker(500 * time.Millisecond) 805 m.s.Start() 806 807 var peerTLScfg *tls.Config 808 if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() { 809 if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil { 810 return err 811 } 812 } 813 814 if m.grpcListener != nil { 815 var ( 816 tlscfg *tls.Config 817 ) 818 if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() { 819 tlscfg, err = m.ClientTLSInfo.ServerConfig() 820 if err != nil { 821 return err 822 } 823 } 824 m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) 825 m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) 826 m.serverClient = v3client.New(m.s) 827 lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) 828 epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) 829 go m.grpcServer.Serve(m.grpcListener) 830 } 831 832 m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)} 833 834 h := (http.Handler)(m.raftHandler) 835 if m.grpcListener != nil { 836 h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 837 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { 838 m.grpcServerPeer.ServeHTTP(w, r) 839 } else { 840 m.raftHandler.ServeHTTP(w, r) 841 } 842 }) 843 } 844 845 for _, ln := range m.PeerListeners { 846 cm := cmux.New(ln) 847 // don't hang on matcher after closing listener 848 cm.SetReadTimeout(time.Second) 849 850 if m.grpcServer != nil { 851 grpcl := cm.Match(cmux.HTTP2()) 852 go m.grpcServerPeer.Serve(grpcl) 853 } 854 855 // serve http1/http2 rafthttp/grpc 856 ll := cm.Match(cmux.Any()) 857 if peerTLScfg != nil { 858 if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil { 859 return err 860 } 861 } 862 hs := &httptest.Server{ 863 Listener: ll, 864 Config: &http.Server{ 865 Handler: h, 866 TLSConfig: peerTLScfg, 867 ErrorLog: log.New(ioutil.Discard, "net/http", 0), 868 }, 869 TLS: peerTLScfg, 870 } 871 hs.Start() 872 873 donec := make(chan struct{}) 874 go func() { 875 defer close(donec) 876 cm.Serve() 877 }() 878 closer := func() { 879 ll.Close() 880 hs.CloseClientConnections() 881 hs.Close() 882 <-donec 883 } 884 m.serverClosers = append(m.serverClosers, closer) 885 } 886 for _, ln := range m.ClientListeners { 887 hs := &httptest.Server{ 888 Listener: ln, 889 Config: &http.Server{ 890 Handler: v2http.NewClientHandler( 891 m.Logger, 892 m.s, 893 m.ServerConfig.ReqTimeout(), 894 ), 895 ErrorLog: log.New(ioutil.Discard, "net/http", 0), 896 }, 897 } 898 if m.ClientTLSInfo == nil { 899 hs.Start() 900 } else { 901 info := m.ClientTLSInfo 902 hs.TLS, err = info.ServerConfig() 903 if err != nil { 904 return err 905 } 906 907 // baseConfig is called on initial TLS handshake start. 908 // 909 // Previously, 910 // 1. Server has non-empty (*tls.Config).Certificates on client hello 911 // 2. Server calls (*tls.Config).GetCertificate iff: 912 // - Server's (*tls.Config).Certificates is not empty, or 913 // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName 914 // 915 // When (*tls.Config).Certificates is always populated on initial handshake, 916 // client is expected to provide a valid matching SNI to pass the TLS 917 // verification, thus trigger server (*tls.Config).GetCertificate to reload 918 // TLS assets. However, a cert whose SAN field does not include domain names 919 // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus 920 // it was never able to trigger TLS reload on initial handshake; first 921 // ceritifcate object was being used, never being updated. 922 // 923 // Now, (*tls.Config).Certificates is created empty on initial TLS client 924 // handshake, in order to trigger (*tls.Config).GetCertificate and populate 925 // rest of the certificates on every new TLS connection, even when client 926 // SNI is empty (e.g. cert only includes IPs). 927 // 928 // This introduces another problem with "httptest.Server": 929 // when server initial certificates are empty, certificates 930 // are overwritten by Go's internal test certs, which have 931 // different SAN fields (e.g. example.com). To work around, 932 // re-overwrite (*tls.Config).Certificates before starting 933 // test server. 934 tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil) 935 if err != nil { 936 return err 937 } 938 hs.TLS.Certificates = []tls.Certificate{*tlsCert} 939 940 hs.StartTLS() 941 } 942 closer := func() { 943 ln.Close() 944 hs.CloseClientConnections() 945 hs.Close() 946 } 947 m.serverClosers = append(m.serverClosers, closer) 948 } 949 950 lg.Info( 951 "launched a member", 952 zap.String("name", m.Name), 953 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 954 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 955 zap.String("grpc-address", m.grpcAddr), 956 ) 957 return nil 958} 959 960func (m *member) WaitOK(t testing.TB) { 961 m.WaitStarted(t) 962 for m.s.Leader() == 0 { 963 time.Sleep(tickDuration) 964 } 965} 966 967func (m *member) WaitStarted(t testing.TB) { 968 cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) 969 kapi := client.NewKeysAPI(cc) 970 for { 971 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 972 _, err := kapi.Get(ctx, "/", nil) 973 if err != nil { 974 time.Sleep(tickDuration) 975 continue 976 } 977 cancel() 978 break 979 } 980} 981 982func WaitClientV3(t testing.TB, kv clientv3.KV) { 983 timeout := time.Now().Add(requestTimeout) 984 var err error 985 for time.Now().Before(timeout) { 986 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 987 _, err = kv.Get(ctx, "/") 988 cancel() 989 if err == nil { 990 return 991 } 992 time.Sleep(tickDuration) 993 } 994 if err != nil { 995 t.Fatalf("timed out waiting for client: %v", err) 996 } 997} 998 999func (m *member) URL() string { return m.ClientURLs[0].String() } 1000 1001func (m *member) Pause() { 1002 m.raftHandler.Pause() 1003 m.s.PauseSending() 1004} 1005 1006func (m *member) Resume() { 1007 m.raftHandler.Resume() 1008 m.s.ResumeSending() 1009} 1010 1011// Close stops the member's etcdserver and closes its connections 1012func (m *member) Close() { 1013 if m.grpcBridge != nil { 1014 m.grpcBridge.Close() 1015 m.grpcBridge = nil 1016 } 1017 if m.serverClient != nil { 1018 m.serverClient.Close() 1019 m.serverClient = nil 1020 } 1021 if m.grpcServer != nil { 1022 ch := make(chan struct{}) 1023 go func() { 1024 defer close(ch) 1025 // close listeners to stop accepting new connections, 1026 // will block on any existing transports 1027 m.grpcServer.GracefulStop() 1028 }() 1029 // wait until all pending RPCs are finished 1030 select { 1031 case <-ch: 1032 case <-time.After(2 * time.Second): 1033 // took too long, manually close open transports 1034 // e.g. watch streams 1035 m.grpcServer.Stop() 1036 <-ch 1037 } 1038 m.grpcServer = nil 1039 m.grpcServerPeer.GracefulStop() 1040 m.grpcServerPeer.Stop() 1041 m.grpcServerPeer = nil 1042 } 1043 m.s.HardStop() 1044 for _, f := range m.serverClosers { 1045 f() 1046 } 1047} 1048 1049// Stop stops the member, but the data dir of the member is preserved. 1050func (m *member) Stop(t testing.TB) { 1051 lg.Info( 1052 "stopping a member", 1053 zap.String("name", m.Name), 1054 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1055 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1056 zap.String("grpc-address", m.grpcAddr), 1057 ) 1058 m.Close() 1059 m.serverClosers = nil 1060 lg.Info( 1061 "stopped a member", 1062 zap.String("name", m.Name), 1063 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1064 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1065 zap.String("grpc-address", m.grpcAddr), 1066 ) 1067} 1068 1069// checkLeaderTransition waits for leader transition, returning the new leader ID. 1070func checkLeaderTransition(m *member, oldLead uint64) uint64 { 1071 interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond 1072 for m.s.Lead() == 0 || (m.s.Lead() == oldLead) { 1073 time.Sleep(interval) 1074 } 1075 return m.s.Lead() 1076} 1077 1078// StopNotify unblocks when a member stop completes 1079func (m *member) StopNotify() <-chan struct{} { 1080 return m.s.StopNotify() 1081} 1082 1083// Restart starts the member using the preserved data dir. 1084func (m *member) Restart(t testing.TB) error { 1085 lg.Info( 1086 "restarting a member", 1087 zap.String("name", m.Name), 1088 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1089 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1090 zap.String("grpc-address", m.grpcAddr), 1091 ) 1092 newPeerListeners := make([]net.Listener, 0) 1093 for _, ln := range m.PeerListeners { 1094 newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String())) 1095 } 1096 m.PeerListeners = newPeerListeners 1097 newClientListeners := make([]net.Listener, 0) 1098 for _, ln := range m.ClientListeners { 1099 newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String())) 1100 } 1101 m.ClientListeners = newClientListeners 1102 1103 if m.grpcListener != nil { 1104 if err := m.listenGRPC(); err != nil { 1105 t.Fatal(err) 1106 } 1107 } 1108 1109 err := m.Launch() 1110 lg.Info( 1111 "restarted a member", 1112 zap.String("name", m.Name), 1113 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1114 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1115 zap.String("grpc-address", m.grpcAddr), 1116 zap.Error(err), 1117 ) 1118 return err 1119} 1120 1121// Terminate stops the member and removes the data dir. 1122func (m *member) Terminate(t testing.TB) { 1123 lg.Info( 1124 "terminating a member", 1125 zap.String("name", m.Name), 1126 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1127 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1128 zap.String("grpc-address", m.grpcAddr), 1129 ) 1130 m.Close() 1131 if !m.keepDataDirTerminate { 1132 if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { 1133 t.Fatal(err) 1134 } 1135 } 1136 lg.Info( 1137 "terminated a member", 1138 zap.String("name", m.Name), 1139 zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()), 1140 zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()), 1141 zap.String("grpc-address", m.grpcAddr), 1142 ) 1143} 1144 1145// Metric gets the metric value for a member 1146func (m *member) Metric(metricName string, expectLabels ...string) (string, error) { 1147 cfgtls := transport.TLSInfo{} 1148 tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second) 1149 if err != nil { 1150 return "", err 1151 } 1152 cli := &http.Client{Transport: tr} 1153 resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics") 1154 if err != nil { 1155 return "", err 1156 } 1157 defer resp.Body.Close() 1158 b, rerr := ioutil.ReadAll(resp.Body) 1159 if rerr != nil { 1160 return "", rerr 1161 } 1162 lines := strings.Split(string(b), "\n") 1163 for _, l := range lines { 1164 if !strings.HasPrefix(l, metricName) { 1165 continue 1166 } 1167 ok := true 1168 for _, lv := range expectLabels { 1169 if !strings.Contains(l, lv) { 1170 ok = false 1171 break 1172 } 1173 } 1174 if !ok { 1175 continue 1176 } 1177 return strings.Split(l, " ")[1], nil 1178 } 1179 return "", nil 1180} 1181 1182// InjectPartition drops connections from m to others, vice versa. 1183func (m *member) InjectPartition(t testing.TB, others ...*member) { 1184 for _, other := range others { 1185 m.s.CutPeer(other.s.ID()) 1186 other.s.CutPeer(m.s.ID()) 1187 } 1188} 1189 1190// RecoverPartition recovers connections from m to others, vice versa. 1191func (m *member) RecoverPartition(t testing.TB, others ...*member) { 1192 for _, other := range others { 1193 m.s.MendPeer(other.s.ID()) 1194 other.s.MendPeer(m.s.ID()) 1195 } 1196} 1197 1198func (m *member) ReadyNotify() <-chan struct{} { 1199 return m.s.ReadyNotify() 1200} 1201 1202func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client { 1203 cfgtls := transport.TLSInfo{} 1204 if tls != nil { 1205 cfgtls = *tls 1206 } 1207 cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps} 1208 c, err := client.New(cfg) 1209 if err != nil { 1210 t.Fatal(err) 1211 } 1212 return c 1213} 1214 1215func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport { 1216 // tick in integration test is short, so 1s dial timeout could play well. 1217 tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) 1218 if err != nil { 1219 t.Fatal(err) 1220 } 1221 return tr 1222} 1223 1224type SortableMemberSliceByPeerURLs []client.Member 1225 1226func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } 1227func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { 1228 return p[i].PeerURLs[0] < p[j].PeerURLs[0] 1229} 1230func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } 1231 1232type ClusterV3 struct { 1233 *cluster 1234 1235 mu sync.Mutex 1236 clients []*clientv3.Client 1237} 1238 1239// NewClusterV3 returns a launched cluster with a grpc client connection 1240// for each cluster member. 1241func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 { 1242 cfg.UseGRPC = true 1243 if os.Getenv("CLIENT_DEBUG") != "" { 1244 clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4)) 1245 } 1246 clus := &ClusterV3{ 1247 cluster: NewClusterByConfig(t, cfg), 1248 } 1249 clus.Launch(t) 1250 1251 if !cfg.SkipCreatingClient { 1252 for _, m := range clus.Members { 1253 client, err := NewClientV3(m) 1254 if err != nil { 1255 t.Fatalf("cannot create client: %v", err) 1256 } 1257 clus.clients = append(clus.clients, client) 1258 } 1259 } 1260 1261 return clus 1262} 1263 1264func (c *ClusterV3) TakeClient(idx int) { 1265 c.mu.Lock() 1266 c.clients[idx] = nil 1267 c.mu.Unlock() 1268} 1269 1270func (c *ClusterV3) Terminate(t testing.TB) { 1271 c.mu.Lock() 1272 for _, client := range c.clients { 1273 if client == nil { 1274 continue 1275 } 1276 if err := client.Close(); err != nil { 1277 t.Error(err) 1278 } 1279 } 1280 c.mu.Unlock() 1281 c.cluster.Terminate(t) 1282} 1283 1284func (c *ClusterV3) RandClient() *clientv3.Client { 1285 return c.clients[rand.Intn(len(c.clients))] 1286} 1287 1288func (c *ClusterV3) Client(i int) *clientv3.Client { 1289 return c.clients[i] 1290} 1291 1292type grpcAPI struct { 1293 // Cluster is the cluster API for the client's connection. 1294 Cluster pb.ClusterClient 1295 // KV is the keyvalue API for the client's connection. 1296 KV pb.KVClient 1297 // Lease is the lease API for the client's connection. 1298 Lease pb.LeaseClient 1299 // Watch is the watch API for the client's connection. 1300 Watch pb.WatchClient 1301 // Maintenance is the maintenance API for the client's connection. 1302 Maintenance pb.MaintenanceClient 1303 // Auth is the authentication API for the client's connection. 1304 Auth pb.AuthClient 1305 // Lock is the lock API for the client's connection. 1306 Lock lockpb.LockClient 1307 // Election is the election API for the client's connection. 1308 Election epb.ElectionClient 1309} 1310 1311// GetLearnerMembers returns the list of learner members in cluster using MemberList API. 1312func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) { 1313 cli := c.Client(0) 1314 resp, err := cli.MemberList(context.Background()) 1315 if err != nil { 1316 return nil, fmt.Errorf("failed to list member %v", err) 1317 } 1318 var learners []*pb.Member 1319 for _, m := range resp.Members { 1320 if m.IsLearner { 1321 learners = append(learners, m) 1322 } 1323 } 1324 return learners, nil 1325} 1326 1327// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster 1328// via v3 MemberAdd API, and then launches the new member. 1329func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) { 1330 m := c.mustNewMember(t) 1331 m.isLearner = true 1332 1333 scheme := schemeFromTLSInfo(c.cfg.PeerTLS) 1334 peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()} 1335 1336 cli := c.Client(0) 1337 _, err := cli.MemberAddAsLearner(context.Background(), peerURLs) 1338 if err != nil { 1339 t.Fatalf("failed to add learner member %v", err) 1340 } 1341 1342 m.InitialPeerURLsMap = types.URLsMap{} 1343 for _, mm := range c.Members { 1344 m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs 1345 } 1346 m.InitialPeerURLsMap[m.Name] = m.PeerURLs 1347 m.NewCluster = false 1348 1349 if err := m.Launch(); err != nil { 1350 t.Fatal(err) 1351 } 1352 1353 c.Members = append(c.Members, m) 1354 1355 c.waitMembersMatch(t) 1356} 1357 1358// getMembers returns a list of members in cluster, in format of etcdserverpb.Member 1359func (c *ClusterV3) getMembers() []*pb.Member { 1360 var mems []*pb.Member 1361 for _, m := range c.Members { 1362 mem := &pb.Member{ 1363 Name: m.Name, 1364 PeerURLs: m.PeerURLs.StringSlice(), 1365 ClientURLs: m.ClientURLs.StringSlice(), 1366 IsLearner: m.isLearner, 1367 } 1368 mems = append(mems, mem) 1369 } 1370 return mems 1371} 1372 1373// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the 1374// local 'c.Members', which is the local recording of members in the testing cluster. With 1375// the exception that the local recording c.Members does not have info on Member.ID, which 1376// is generated when the member is been added to cluster. 1377// 1378// Note: 1379// A successful match means the Member.clientURLs are matched. This means member has already 1380// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide 1381// write request (in v2 server). Therefore, at this point, any raft log entries prior to this 1382// would have already been applied. 1383// 1384// If a new member was added to an existing cluster, at this point, it has finished publishing 1385// its own server attributes to the cluster. And therefore by the same argument, it has already 1386// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point, 1387// the new member has the correct view of the cluster configuration. 1388// 1389// Special note on learner member: 1390// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting 1391// the learner member, its initial view of the cluster created by peerURLs map does not have info 1392// on whether or not the new member itself is learner. But at this point, a successful match does 1393// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry 1394// which was used to add the learner itself to the cluster, and therefore it has the correct info 1395// on learner. 1396func (c *ClusterV3) waitMembersMatch(t testing.TB) { 1397 wMembers := c.getMembers() 1398 sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers)) 1399 cli := c.Client(0) 1400 for { 1401 resp, err := cli.MemberList(context.Background()) 1402 if err != nil { 1403 t.Fatalf("failed to list member %v", err) 1404 } 1405 1406 if len(resp.Members) != len(wMembers) { 1407 continue 1408 } 1409 sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members)) 1410 for _, m := range resp.Members { 1411 m.ID = 0 1412 } 1413 if reflect.DeepEqual(resp.Members, wMembers) { 1414 return 1415 } 1416 1417 time.Sleep(tickDuration) 1418 } 1419} 1420 1421type SortableProtoMemberSliceByPeerURLs []*pb.Member 1422 1423func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) } 1424func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool { 1425 return p[i].PeerURLs[0] < p[j].PeerURLs[0] 1426} 1427func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } 1428 1429// MustNewMember creates a new member instance based on the response of V3 Member Add API. 1430func (c *ClusterV3) MustNewMember(t testing.TB, resp *clientv3.MemberAddResponse) *member { 1431 m := c.mustNewMember(t) 1432 m.isLearner = resp.Member.IsLearner 1433 m.NewCluster = false 1434 1435 m.InitialPeerURLsMap = types.URLsMap{} 1436 for _, mm := range c.Members { 1437 m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs 1438 } 1439 m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs) 1440 1441 return m 1442} 1443