1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package integration 16 17import ( 18 "context" 19 "crypto/tls" 20 "fmt" 21 "io/ioutil" 22 "math/rand" 23 "net" 24 "net/http" 25 "net/http/httptest" 26 "os" 27 "reflect" 28 "sort" 29 "strings" 30 "sync" 31 "sync/atomic" 32 "testing" 33 "time" 34 35 "github.com/coreos/etcd/client" 36 "github.com/coreos/etcd/clientv3" 37 "github.com/coreos/etcd/embed" 38 "github.com/coreos/etcd/etcdserver" 39 "github.com/coreos/etcd/etcdserver/api/etcdhttp" 40 "github.com/coreos/etcd/etcdserver/api/v2http" 41 "github.com/coreos/etcd/etcdserver/api/v3client" 42 "github.com/coreos/etcd/etcdserver/api/v3election" 43 epb "github.com/coreos/etcd/etcdserver/api/v3election/v3electionpb" 44 "github.com/coreos/etcd/etcdserver/api/v3lock" 45 lockpb "github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb" 46 "github.com/coreos/etcd/etcdserver/api/v3rpc" 47 pb "github.com/coreos/etcd/etcdserver/etcdserverpb" 48 "github.com/coreos/etcd/pkg/testutil" 49 "github.com/coreos/etcd/pkg/tlsutil" 50 "github.com/coreos/etcd/pkg/transport" 51 "github.com/coreos/etcd/pkg/types" 52 "github.com/coreos/etcd/rafthttp" 53 54 "github.com/coreos/pkg/capnslog" 55 "github.com/soheilhy/cmux" 56 "google.golang.org/grpc" 57 "google.golang.org/grpc/grpclog" 58 "google.golang.org/grpc/keepalive" 59) 60 61const ( 62 // RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss. 63 RequestWaitTimeout = 3 * time.Second 64 tickDuration = 10 * time.Millisecond 65 requestTimeout = 20 * time.Second 66 67 clusterName = "etcd" 68 basePort = 21000 69 UrlScheme = "unix" 70 UrlSchemeTLS = "unixs" 71) 72 73var ( 74 electionTicks = 10 75 76 // integration test uses unique ports, counting up, to listen for each 77 // member, ensuring restarted members can listen on the same port again. 78 localListenCount int64 = 0 79 80 testTLSInfo = transport.TLSInfo{ 81 KeyFile: "./fixtures/server.key.insecure", 82 CertFile: "./fixtures/server.crt", 83 TrustedCAFile: "./fixtures/ca.crt", 84 ClientCertAuth: true, 85 } 86 87 testTLSInfoExpired = transport.TLSInfo{ 88 KeyFile: "./fixtures-expired/server-key.pem", 89 CertFile: "./fixtures-expired/server.pem", 90 TrustedCAFile: "./fixtures-expired/etcd-root-ca.pem", 91 ClientCertAuth: true, 92 } 93 94 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "integration") 95) 96 97type ClusterConfig struct { 98 Size int 99 PeerTLS *transport.TLSInfo 100 ClientTLS *transport.TLSInfo 101 DiscoveryURL string 102 UseGRPC bool 103 QuotaBackendBytes int64 104 MaxTxnOps uint 105 MaxRequestBytes uint 106 GRPCKeepAliveMinTime time.Duration 107 GRPCKeepAliveInterval time.Duration 108 GRPCKeepAliveTimeout time.Duration 109 // SkipCreatingClient to skip creating clients for each member. 110 SkipCreatingClient bool 111 112 ClientMaxCallSendMsgSize int 113 ClientMaxCallRecvMsgSize int 114} 115 116type cluster struct { 117 cfg *ClusterConfig 118 Members []*member 119} 120 121func schemeFromTLSInfo(tls *transport.TLSInfo) string { 122 if tls == nil { 123 return UrlScheme 124 } 125 return UrlSchemeTLS 126} 127 128func (c *cluster) fillClusterForMembers() error { 129 if c.cfg.DiscoveryURL != "" { 130 // cluster will be discovered 131 return nil 132 } 133 134 addrs := make([]string, 0) 135 for _, m := range c.Members { 136 scheme := schemeFromTLSInfo(m.PeerTLSInfo) 137 for _, l := range m.PeerListeners { 138 addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String())) 139 } 140 } 141 clusterStr := strings.Join(addrs, ",") 142 var err error 143 for _, m := range c.Members { 144 m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 145 if err != nil { 146 return err 147 } 148 } 149 return nil 150} 151 152func newCluster(t *testing.T, cfg *ClusterConfig) *cluster { 153 c := &cluster{cfg: cfg} 154 ms := make([]*member, cfg.Size) 155 for i := 0; i < cfg.Size; i++ { 156 ms[i] = c.mustNewMember(t) 157 } 158 c.Members = ms 159 if err := c.fillClusterForMembers(); err != nil { 160 t.Fatal(err) 161 } 162 163 return c 164} 165 166// NewCluster returns an unlaunched cluster of the given size which has been 167// set to use static bootstrap. 168func NewCluster(t *testing.T, size int) *cluster { 169 return newCluster(t, &ClusterConfig{Size: size}) 170} 171 172// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration 173func NewClusterByConfig(t *testing.T, cfg *ClusterConfig) *cluster { 174 return newCluster(t, cfg) 175} 176 177func (c *cluster) Launch(t *testing.T) { 178 errc := make(chan error) 179 for _, m := range c.Members { 180 // Members are launched in separate goroutines because if they boot 181 // using discovery url, they have to wait for others to register to continue. 182 go func(m *member) { 183 errc <- m.Launch() 184 }(m) 185 } 186 for range c.Members { 187 if err := <-errc; err != nil { 188 t.Fatalf("error setting up member: %v", err) 189 } 190 } 191 // wait cluster to be stable to receive future client requests 192 c.waitMembersMatch(t, c.HTTPMembers()) 193 c.waitVersion() 194} 195 196func (c *cluster) URL(i int) string { 197 return c.Members[i].ClientURLs[0].String() 198} 199 200// URLs returns a list of all active client URLs in the cluster 201func (c *cluster) URLs() []string { 202 return getMembersURLs(c.Members) 203} 204 205func getMembersURLs(members []*member) []string { 206 urls := make([]string, 0) 207 for _, m := range members { 208 select { 209 case <-m.s.StopNotify(): 210 continue 211 default: 212 } 213 for _, u := range m.ClientURLs { 214 urls = append(urls, u.String()) 215 } 216 } 217 return urls 218} 219 220// HTTPMembers returns a list of all active members as client.Members 221func (c *cluster) HTTPMembers() []client.Member { 222 ms := []client.Member{} 223 for _, m := range c.Members { 224 pScheme := schemeFromTLSInfo(m.PeerTLSInfo) 225 cScheme := schemeFromTLSInfo(m.ClientTLSInfo) 226 cm := client.Member{Name: m.Name} 227 for _, ln := range m.PeerListeners { 228 cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String()) 229 } 230 for _, ln := range m.ClientListeners { 231 cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String()) 232 } 233 ms = append(ms, cm) 234 } 235 return ms 236} 237 238func (c *cluster) mustNewMember(t *testing.T) *member { 239 m := mustNewMember(t, 240 memberConfig{ 241 name: c.name(rand.Int()), 242 peerTLS: c.cfg.PeerTLS, 243 clientTLS: c.cfg.ClientTLS, 244 quotaBackendBytes: c.cfg.QuotaBackendBytes, 245 maxTxnOps: c.cfg.MaxTxnOps, 246 maxRequestBytes: c.cfg.MaxRequestBytes, 247 grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, 248 grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, 249 grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, 250 clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, 251 clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, 252 }) 253 m.DiscoveryURL = c.cfg.DiscoveryURL 254 if c.cfg.UseGRPC { 255 if err := m.listenGRPC(); err != nil { 256 t.Fatal(err) 257 } 258 } 259 return m 260} 261 262func (c *cluster) addMember(t *testing.T) { 263 m := c.mustNewMember(t) 264 265 scheme := schemeFromTLSInfo(c.cfg.PeerTLS) 266 267 // send add request to the cluster 268 var err error 269 for i := 0; i < len(c.Members); i++ { 270 clientURL := c.URL(i) 271 peerURL := scheme + "://" + m.PeerListeners[0].Addr().String() 272 if err = c.addMemberByURL(t, clientURL, peerURL); err == nil { 273 break 274 } 275 } 276 if err != nil { 277 t.Fatalf("add member failed on all members error: %v", err) 278 } 279 280 m.InitialPeerURLsMap = types.URLsMap{} 281 for _, mm := range c.Members { 282 m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs 283 } 284 m.InitialPeerURLsMap[m.Name] = m.PeerURLs 285 m.NewCluster = false 286 if err := m.Launch(); err != nil { 287 t.Fatal(err) 288 } 289 c.Members = append(c.Members, m) 290 // wait cluster to be stable to receive future client requests 291 c.waitMembersMatch(t, c.HTTPMembers()) 292} 293 294func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error { 295 cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS) 296 ma := client.NewMembersAPI(cc) 297 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 298 _, err := ma.Add(ctx, peerURL) 299 cancel() 300 if err != nil { 301 return err 302 } 303 304 // wait for the add node entry applied in the cluster 305 members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}}) 306 c.waitMembersMatch(t, members) 307 return nil 308} 309 310func (c *cluster) AddMember(t *testing.T) { 311 c.addMember(t) 312} 313 314func (c *cluster) RemoveMember(t *testing.T, id uint64) { 315 if err := c.removeMember(t, id); err != nil { 316 t.Fatal(err) 317 } 318} 319 320func (c *cluster) removeMember(t *testing.T, id uint64) error { 321 // send remove request to the cluster 322 cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS) 323 ma := client.NewMembersAPI(cc) 324 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 325 err := ma.Remove(ctx, types.ID(id).String()) 326 cancel() 327 if err != nil { 328 return err 329 } 330 newMembers := make([]*member, 0) 331 for _, m := range c.Members { 332 if uint64(m.s.ID()) != id { 333 newMembers = append(newMembers, m) 334 } else { 335 select { 336 case <-m.s.StopNotify(): 337 m.Terminate(t) 338 // 1s stop delay + election timeout + 1s disk and network delay + connection write timeout 339 // TODO: remove connection write timeout by selecting on http response closeNotifier 340 // blocking on https://github.com/golang/go/issues/9524 341 case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout): 342 t.Fatalf("failed to remove member %s in time", m.s.ID()) 343 } 344 } 345 } 346 c.Members = newMembers 347 c.waitMembersMatch(t, c.HTTPMembers()) 348 return nil 349} 350 351func (c *cluster) Terminate(t *testing.T) { 352 var wg sync.WaitGroup 353 wg.Add(len(c.Members)) 354 for _, m := range c.Members { 355 go func(mm *member) { 356 defer wg.Done() 357 mm.Terminate(t) 358 }(m) 359 } 360 wg.Wait() 361} 362 363func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { 364 for _, u := range c.URLs() { 365 cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS) 366 ma := client.NewMembersAPI(cc) 367 for { 368 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 369 ms, err := ma.List(ctx) 370 cancel() 371 if err == nil && isMembersEqual(ms, membs) { 372 break 373 } 374 time.Sleep(tickDuration) 375 } 376 } 377} 378 379func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) } 380 381// waitLeader waits until given members agree on the same leader. 382func (c *cluster) waitLeader(t *testing.T, membs []*member) int { 383 possibleLead := make(map[uint64]bool) 384 var lead uint64 385 for _, m := range membs { 386 possibleLead[uint64(m.s.ID())] = true 387 } 388 cc := MustNewHTTPClient(t, getMembersURLs(membs), nil) 389 kapi := client.NewKeysAPI(cc) 390 391 // ensure leader is up via linearizable get 392 for { 393 ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second) 394 _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true}) 395 cancel() 396 if err == nil || strings.Contains(err.Error(), "Key not found") { 397 break 398 } 399 } 400 401 for lead == 0 || !possibleLead[lead] { 402 lead = 0 403 for _, m := range membs { 404 select { 405 case <-m.s.StopNotify(): 406 continue 407 default: 408 } 409 if lead != 0 && lead != m.s.Lead() { 410 lead = 0 411 time.Sleep(10 * tickDuration) 412 break 413 } 414 lead = m.s.Lead() 415 } 416 } 417 418 for i, m := range membs { 419 if uint64(m.s.ID()) == lead { 420 return i 421 } 422 } 423 424 return -1 425} 426 427func (c *cluster) WaitNoLeader(t *testing.T) { c.waitNoLeader(t, c.Members) } 428 429// waitNoLeader waits until given members lose leader. 430func (c *cluster) waitNoLeader(t *testing.T, membs []*member) { 431 noLeader := false 432 for !noLeader { 433 noLeader = true 434 for _, m := range membs { 435 select { 436 case <-m.s.StopNotify(): 437 continue 438 default: 439 } 440 if m.s.Lead() != 0 { 441 noLeader = false 442 time.Sleep(10 * tickDuration) 443 break 444 } 445 } 446 } 447} 448 449func (c *cluster) waitVersion() { 450 for _, m := range c.Members { 451 for { 452 if m.s.ClusterVersion() != nil { 453 break 454 } 455 time.Sleep(tickDuration) 456 } 457 } 458} 459 460func (c *cluster) name(i int) string { 461 return fmt.Sprint(i) 462} 463 464// isMembersEqual checks whether two members equal except ID field. 465// The given wmembs should always set ID field to empty string. 466func isMembersEqual(membs []client.Member, wmembs []client.Member) bool { 467 sort.Sort(SortableMemberSliceByPeerURLs(membs)) 468 sort.Sort(SortableMemberSliceByPeerURLs(wmembs)) 469 for i := range membs { 470 membs[i].ID = "" 471 } 472 return reflect.DeepEqual(membs, wmembs) 473} 474 475func newLocalListener(t *testing.T) net.Listener { 476 c := atomic.AddInt64(&localListenCount, 1) 477 // Go 1.8+ allows only numbers in port 478 addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid()) 479 return NewListenerWithAddr(t, addr) 480} 481 482func NewListenerWithAddr(t *testing.T, addr string) net.Listener { 483 l, err := transport.NewUnixListener(addr) 484 if err != nil { 485 t.Fatal(err) 486 } 487 return l 488} 489 490type member struct { 491 etcdserver.ServerConfig 492 PeerListeners, ClientListeners []net.Listener 493 grpcListener net.Listener 494 // PeerTLSInfo enables peer TLS when set 495 PeerTLSInfo *transport.TLSInfo 496 // ClientTLSInfo enables client TLS when set 497 ClientTLSInfo *transport.TLSInfo 498 499 raftHandler *testutil.PauseableHandler 500 s *etcdserver.EtcdServer 501 serverClosers []func() 502 503 grpcServerOpts []grpc.ServerOption 504 grpcServer *grpc.Server 505 grpcServerPeer *grpc.Server 506 grpcAddr string 507 grpcBridge *bridge 508 509 // serverClient is a clientv3 that directly calls the etcdserver. 510 serverClient *clientv3.Client 511 512 keepDataDirTerminate bool 513 clientMaxCallSendMsgSize int 514 clientMaxCallRecvMsgSize int 515} 516 517func (m *member) GRPCAddr() string { return m.grpcAddr } 518 519type memberConfig struct { 520 name string 521 peerTLS *transport.TLSInfo 522 clientTLS *transport.TLSInfo 523 quotaBackendBytes int64 524 maxTxnOps uint 525 maxRequestBytes uint 526 grpcKeepAliveMinTime time.Duration 527 grpcKeepAliveInterval time.Duration 528 grpcKeepAliveTimeout time.Duration 529 clientMaxCallSendMsgSize int 530 clientMaxCallRecvMsgSize int 531} 532 533// mustNewMember return an inited member with the given name. If peerTLS is 534// set, it will use https scheme to communicate between peers. 535func mustNewMember(t *testing.T, mcfg memberConfig) *member { 536 var err error 537 m := &member{} 538 539 peerScheme := schemeFromTLSInfo(mcfg.peerTLS) 540 clientScheme := schemeFromTLSInfo(mcfg.clientTLS) 541 542 pln := newLocalListener(t) 543 m.PeerListeners = []net.Listener{pln} 544 m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()}) 545 if err != nil { 546 t.Fatal(err) 547 } 548 m.PeerTLSInfo = mcfg.peerTLS 549 550 cln := newLocalListener(t) 551 m.ClientListeners = []net.Listener{cln} 552 m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()}) 553 if err != nil { 554 t.Fatal(err) 555 } 556 m.ClientTLSInfo = mcfg.clientTLS 557 558 m.Name = mcfg.name 559 560 m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd") 561 if err != nil { 562 t.Fatal(err) 563 } 564 clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String()) 565 m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 566 if err != nil { 567 t.Fatal(err) 568 } 569 m.InitialClusterToken = clusterName 570 m.NewCluster = true 571 m.BootstrapTimeout = 10 * time.Millisecond 572 if m.PeerTLSInfo != nil { 573 m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo 574 } 575 m.ElectionTicks = electionTicks 576 m.InitialElectionTickAdvance = true 577 m.TickMs = uint(tickDuration / time.Millisecond) 578 m.QuotaBackendBytes = mcfg.quotaBackendBytes 579 m.MaxTxnOps = mcfg.maxTxnOps 580 if m.MaxTxnOps == 0 { 581 m.MaxTxnOps = embed.DefaultMaxTxnOps 582 } 583 m.MaxRequestBytes = mcfg.maxRequestBytes 584 if m.MaxRequestBytes == 0 { 585 m.MaxRequestBytes = embed.DefaultMaxRequestBytes 586 } 587 m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough 588 589 m.grpcServerOpts = []grpc.ServerOption{} 590 if mcfg.grpcKeepAliveMinTime > time.Duration(0) { 591 m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 592 MinTime: mcfg.grpcKeepAliveMinTime, 593 PermitWithoutStream: false, 594 })) 595 } 596 if mcfg.grpcKeepAliveInterval > time.Duration(0) && 597 mcfg.grpcKeepAliveTimeout > time.Duration(0) { 598 m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{ 599 Time: mcfg.grpcKeepAliveInterval, 600 Timeout: mcfg.grpcKeepAliveTimeout, 601 })) 602 } 603 m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize 604 m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize 605 606 m.InitialCorruptCheck = true 607 608 return m 609} 610 611// listenGRPC starts a grpc server over a unix domain socket on the member 612func (m *member) listenGRPC() error { 613 // prefix with localhost so cert has right domain 614 m.grpcAddr = "localhost:" + m.Name 615 l, err := transport.NewUnixListener(m.grpcAddr) 616 if err != nil { 617 return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err) 618 } 619 m.grpcBridge, err = newBridge(m.grpcAddr) 620 if err != nil { 621 l.Close() 622 return err 623 } 624 m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr 625 m.grpcListener = l 626 return nil 627} 628 629func (m *member) ElectionTimeout() time.Duration { 630 return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond 631} 632 633func (m *member) ID() types.ID { return m.s.ID() } 634 635func (m *member) DropConnections() { m.grpcBridge.Reset() } 636func (m *member) PauseConnections() { m.grpcBridge.Pause() } 637func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } 638func (m *member) Blackhole() { m.grpcBridge.Blackhole() } 639func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } 640 641// NewClientV3 creates a new grpc client connection to the member 642func NewClientV3(m *member) (*clientv3.Client, error) { 643 if m.grpcAddr == "" { 644 return nil, fmt.Errorf("member not configured for grpc") 645 } 646 647 cfg := clientv3.Config{ 648 Endpoints: []string{m.grpcAddr}, 649 DialTimeout: 5 * time.Second, 650 DialOptions: []grpc.DialOption{grpc.WithBlock()}, 651 MaxCallSendMsgSize: m.clientMaxCallSendMsgSize, 652 MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize, 653 } 654 655 if m.ClientTLSInfo != nil { 656 tls, err := m.ClientTLSInfo.ClientConfig() 657 if err != nil { 658 return nil, err 659 } 660 cfg.TLS = tls 661 } 662 return newClientV3(cfg) 663} 664 665// Clone returns a member with the same server configuration. The returned 666// member will not set PeerListeners and ClientListeners. 667func (m *member) Clone(t *testing.T) *member { 668 mm := &member{} 669 mm.ServerConfig = m.ServerConfig 670 671 var err error 672 clientURLStrs := m.ClientURLs.StringSlice() 673 mm.ClientURLs, err = types.NewURLs(clientURLStrs) 674 if err != nil { 675 // this should never fail 676 panic(err) 677 } 678 peerURLStrs := m.PeerURLs.StringSlice() 679 mm.PeerURLs, err = types.NewURLs(peerURLStrs) 680 if err != nil { 681 // this should never fail 682 panic(err) 683 } 684 clusterStr := m.InitialPeerURLsMap.String() 685 mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr) 686 if err != nil { 687 // this should never fail 688 panic(err) 689 } 690 mm.InitialClusterToken = m.InitialClusterToken 691 mm.ElectionTicks = m.ElectionTicks 692 mm.PeerTLSInfo = m.PeerTLSInfo 693 mm.ClientTLSInfo = m.ClientTLSInfo 694 return mm 695} 696 697// Launch starts a member based on ServerConfig, PeerListeners 698// and ClientListeners. 699func (m *member) Launch() error { 700 plog.Printf("launching %s (%s)", m.Name, m.grpcAddr) 701 var err error 702 if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil { 703 return fmt.Errorf("failed to initialize the etcd server: %v", err) 704 } 705 m.s.SyncTicker = time.NewTicker(500 * time.Millisecond) 706 m.s.Start() 707 708 var peerTLScfg *tls.Config 709 if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() { 710 if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil { 711 return err 712 } 713 } 714 715 if m.grpcListener != nil { 716 var ( 717 tlscfg *tls.Config 718 ) 719 if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() { 720 tlscfg, err = m.ClientTLSInfo.ServerConfig() 721 if err != nil { 722 return err 723 } 724 } 725 m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) 726 m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg) 727 m.serverClient = v3client.New(m.s) 728 lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) 729 epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) 730 go m.grpcServer.Serve(m.grpcListener) 731 } 732 733 m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)} 734 735 h := (http.Handler)(m.raftHandler) 736 if m.grpcListener != nil { 737 h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 738 if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { 739 m.grpcServerPeer.ServeHTTP(w, r) 740 } else { 741 m.raftHandler.ServeHTTP(w, r) 742 } 743 }) 744 } 745 746 for _, ln := range m.PeerListeners { 747 cm := cmux.New(ln) 748 // don't hang on matcher after closing listener 749 cm.SetReadTimeout(time.Second) 750 751 if m.grpcServer != nil { 752 grpcl := cm.Match(cmux.HTTP2()) 753 go m.grpcServerPeer.Serve(grpcl) 754 } 755 756 // serve http1/http2 rafthttp/grpc 757 ll := cm.Match(cmux.Any()) 758 if peerTLScfg != nil { 759 if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil { 760 return err 761 } 762 } 763 hs := &httptest.Server{ 764 Listener: ll, 765 Config: &http.Server{Handler: h, TLSConfig: peerTLScfg}, 766 TLS: peerTLScfg, 767 } 768 hs.Start() 769 770 donec := make(chan struct{}) 771 go func() { 772 defer close(donec) 773 cm.Serve() 774 }() 775 closer := func() { 776 ll.Close() 777 hs.CloseClientConnections() 778 hs.Close() 779 <-donec 780 } 781 m.serverClosers = append(m.serverClosers, closer) 782 } 783 for _, ln := range m.ClientListeners { 784 hs := &httptest.Server{ 785 Listener: ln, 786 Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())}, 787 } 788 if m.ClientTLSInfo == nil { 789 hs.Start() 790 } else { 791 info := m.ClientTLSInfo 792 hs.TLS, err = info.ServerConfig() 793 if err != nil { 794 return err 795 } 796 797 // baseConfig is called on initial TLS handshake start. 798 // 799 // Previously, 800 // 1. Server has non-empty (*tls.Config).Certificates on client hello 801 // 2. Server calls (*tls.Config).GetCertificate iff: 802 // - Server's (*tls.Config).Certificates is not empty, or 803 // - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName 804 // 805 // When (*tls.Config).Certificates is always populated on initial handshake, 806 // client is expected to provide a valid matching SNI to pass the TLS 807 // verification, thus trigger server (*tls.Config).GetCertificate to reload 808 // TLS assets. However, a cert whose SAN field does not include domain names 809 // but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus 810 // it was never able to trigger TLS reload on initial handshake; first 811 // ceritifcate object was being used, never being updated. 812 // 813 // Now, (*tls.Config).Certificates is created empty on initial TLS client 814 // handshake, in order to trigger (*tls.Config).GetCertificate and populate 815 // rest of the certificates on every new TLS connection, even when client 816 // SNI is empty (e.g. cert only includes IPs). 817 // 818 // This introduces another problem with "httptest.Server": 819 // when server initial certificates are empty, certificates 820 // are overwritten by Go's internal test certs, which have 821 // different SAN fields (e.g. example.com). To work around, 822 // re-overwrite (*tls.Config).Certificates before starting 823 // test server. 824 tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil) 825 if err != nil { 826 return err 827 } 828 hs.TLS.Certificates = []tls.Certificate{*tlsCert} 829 830 hs.StartTLS() 831 } 832 closer := func() { 833 ln.Close() 834 hs.CloseClientConnections() 835 hs.Close() 836 } 837 m.serverClosers = append(m.serverClosers, closer) 838 } 839 840 plog.Printf("launched %s (%s)", m.Name, m.grpcAddr) 841 return nil 842} 843 844func (m *member) WaitOK(t *testing.T) { 845 cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo) 846 kapi := client.NewKeysAPI(cc) 847 for { 848 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) 849 _, err := kapi.Get(ctx, "/", nil) 850 if err != nil { 851 time.Sleep(tickDuration) 852 continue 853 } 854 cancel() 855 break 856 } 857 for m.s.Leader() == 0 { 858 time.Sleep(tickDuration) 859 } 860} 861 862func (m *member) URL() string { return m.ClientURLs[0].String() } 863 864func (m *member) Pause() { 865 m.raftHandler.Pause() 866 m.s.PauseSending() 867} 868 869func (m *member) Resume() { 870 m.raftHandler.Resume() 871 m.s.ResumeSending() 872} 873 874// Close stops the member's etcdserver and closes its connections 875func (m *member) Close() { 876 if m.grpcBridge != nil { 877 m.grpcBridge.Close() 878 m.grpcBridge = nil 879 } 880 if m.serverClient != nil { 881 m.serverClient.Close() 882 m.serverClient = nil 883 } 884 if m.grpcServer != nil { 885 m.grpcServer.Stop() 886 m.grpcServer.GracefulStop() 887 m.grpcServer = nil 888 m.grpcServerPeer.Stop() 889 m.grpcServerPeer.GracefulStop() 890 m.grpcServerPeer = nil 891 } 892 m.s.HardStop() 893 for _, f := range m.serverClosers { 894 f() 895 } 896} 897 898// Stop stops the member, but the data dir of the member is preserved. 899func (m *member) Stop(t *testing.T) { 900 plog.Printf("stopping %s (%s)", m.Name, m.grpcAddr) 901 m.Close() 902 m.serverClosers = nil 903 plog.Printf("stopped %s (%s)", m.Name, m.grpcAddr) 904} 905 906// checkLeaderTransition waits for leader transition, returning the new leader ID. 907func checkLeaderTransition(t *testing.T, m *member, oldLead uint64) uint64 { 908 interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond 909 for m.s.Lead() == 0 || (m.s.Lead() == oldLead) { 910 time.Sleep(interval) 911 } 912 return m.s.Lead() 913} 914 915// StopNotify unblocks when a member stop completes 916func (m *member) StopNotify() <-chan struct{} { 917 return m.s.StopNotify() 918} 919 920// Restart starts the member using the preserved data dir. 921func (m *member) Restart(t *testing.T) error { 922 plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr) 923 newPeerListeners := make([]net.Listener, 0) 924 for _, ln := range m.PeerListeners { 925 newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String())) 926 } 927 m.PeerListeners = newPeerListeners 928 newClientListeners := make([]net.Listener, 0) 929 for _, ln := range m.ClientListeners { 930 newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String())) 931 } 932 m.ClientListeners = newClientListeners 933 934 if m.grpcListener != nil { 935 if err := m.listenGRPC(); err != nil { 936 t.Fatal(err) 937 } 938 } 939 940 err := m.Launch() 941 plog.Printf("restarted %s (%s)", m.Name, m.grpcAddr) 942 return err 943} 944 945// Terminate stops the member and removes the data dir. 946func (m *member) Terminate(t *testing.T) { 947 plog.Printf("terminating %s (%s)", m.Name, m.grpcAddr) 948 m.Close() 949 if !m.keepDataDirTerminate { 950 if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil { 951 t.Fatal(err) 952 } 953 } 954 plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) 955} 956 957// Metric gets the metric value for a member 958func (m *member) Metric(metricName string) (string, error) { 959 cfgtls := transport.TLSInfo{} 960 tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second) 961 if err != nil { 962 return "", err 963 } 964 cli := &http.Client{Transport: tr} 965 resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics") 966 if err != nil { 967 return "", err 968 } 969 defer resp.Body.Close() 970 b, rerr := ioutil.ReadAll(resp.Body) 971 if rerr != nil { 972 return "", rerr 973 } 974 lines := strings.Split(string(b), "\n") 975 for _, l := range lines { 976 if strings.HasPrefix(l, metricName) { 977 return strings.Split(l, " ")[1], nil 978 } 979 } 980 return "", nil 981} 982 983// InjectPartition drops connections from m to others, vice versa. 984func (m *member) InjectPartition(t *testing.T, others ...*member) { 985 for _, other := range others { 986 m.s.CutPeer(other.s.ID()) 987 other.s.CutPeer(m.s.ID()) 988 } 989} 990 991// RecoverPartition recovers connections from m to others, vice versa. 992func (m *member) RecoverPartition(t *testing.T, others ...*member) { 993 for _, other := range others { 994 m.s.MendPeer(other.s.ID()) 995 other.s.MendPeer(m.s.ID()) 996 } 997} 998 999func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { 1000 cfgtls := transport.TLSInfo{} 1001 if tls != nil { 1002 cfgtls = *tls 1003 } 1004 cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps} 1005 c, err := client.New(cfg) 1006 if err != nil { 1007 t.Fatal(err) 1008 } 1009 return c 1010} 1011 1012func mustNewTransport(t *testing.T, tlsInfo transport.TLSInfo) *http.Transport { 1013 // tick in integration test is short, so 1s dial timeout could play well. 1014 tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) 1015 if err != nil { 1016 t.Fatal(err) 1017 } 1018 return tr 1019} 1020 1021type SortableMemberSliceByPeerURLs []client.Member 1022 1023func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) } 1024func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool { 1025 return p[i].PeerURLs[0] < p[j].PeerURLs[0] 1026} 1027func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } 1028 1029type ClusterV3 struct { 1030 *cluster 1031 1032 mu sync.Mutex 1033 clients []*clientv3.Client 1034} 1035 1036// NewClusterV3 returns a launched cluster with a grpc client connection 1037// for each cluster member. 1038func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { 1039 cfg.UseGRPC = true 1040 if os.Getenv("CLIENT_DEBUG") != "" { 1041 clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4)) 1042 } 1043 clus := &ClusterV3{ 1044 cluster: NewClusterByConfig(t, cfg), 1045 } 1046 clus.Launch(t) 1047 1048 if !cfg.SkipCreatingClient { 1049 for _, m := range clus.Members { 1050 client, err := NewClientV3(m) 1051 if err != nil { 1052 t.Fatalf("cannot create client: %v", err) 1053 } 1054 clus.clients = append(clus.clients, client) 1055 } 1056 } 1057 1058 return clus 1059} 1060 1061func (c *ClusterV3) TakeClient(idx int) { 1062 c.mu.Lock() 1063 c.clients[idx] = nil 1064 c.mu.Unlock() 1065} 1066 1067func (c *ClusterV3) Terminate(t *testing.T) { 1068 c.mu.Lock() 1069 for _, client := range c.clients { 1070 if client == nil { 1071 continue 1072 } 1073 if err := client.Close(); err != nil { 1074 t.Error(err) 1075 } 1076 } 1077 c.mu.Unlock() 1078 c.cluster.Terminate(t) 1079} 1080 1081func (c *ClusterV3) RandClient() *clientv3.Client { 1082 return c.clients[rand.Intn(len(c.clients))] 1083} 1084 1085func (c *ClusterV3) Client(i int) *clientv3.Client { 1086 return c.clients[i] 1087} 1088 1089type grpcAPI struct { 1090 // Cluster is the cluster API for the client's connection. 1091 Cluster pb.ClusterClient 1092 // KV is the keyvalue API for the client's connection. 1093 KV pb.KVClient 1094 // Lease is the lease API for the client's connection. 1095 Lease pb.LeaseClient 1096 // Watch is the watch API for the client's connection. 1097 Watch pb.WatchClient 1098 // Maintenance is the maintenance API for the client's connection. 1099 Maintenance pb.MaintenanceClient 1100 // Auth is the authentication API for the client's connection. 1101 Auth pb.AuthClient 1102 // Lock is the lock API for the client's connection. 1103 Lock lockpb.LockClient 1104 // Election is the election API for the client's connection. 1105 Election epb.ElectionClient 1106} 1107