1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"context"
19	"crypto/tls"
20	"fmt"
21	"io/ioutil"
22	"log"
23	"math/rand"
24	"net"
25	"net/http"
26	"net/http/httptest"
27	"os"
28	"reflect"
29	"sort"
30	"strings"
31	"sync"
32	"sync/atomic"
33	"testing"
34	"time"
35
36	"go.etcd.io/etcd/client"
37	"go.etcd.io/etcd/clientv3"
38	"go.etcd.io/etcd/embed"
39	"go.etcd.io/etcd/etcdserver"
40	"go.etcd.io/etcd/etcdserver/api/etcdhttp"
41	"go.etcd.io/etcd/etcdserver/api/rafthttp"
42	"go.etcd.io/etcd/etcdserver/api/v2http"
43	"go.etcd.io/etcd/etcdserver/api/v3client"
44	"go.etcd.io/etcd/etcdserver/api/v3election"
45	epb "go.etcd.io/etcd/etcdserver/api/v3election/v3electionpb"
46	"go.etcd.io/etcd/etcdserver/api/v3lock"
47	lockpb "go.etcd.io/etcd/etcdserver/api/v3lock/v3lockpb"
48	"go.etcd.io/etcd/etcdserver/api/v3rpc"
49	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
50	"go.etcd.io/etcd/pkg/logutil"
51	"go.etcd.io/etcd/pkg/testutil"
52	"go.etcd.io/etcd/pkg/tlsutil"
53	"go.etcd.io/etcd/pkg/transport"
54	"go.etcd.io/etcd/pkg/types"
55
56	"github.com/soheilhy/cmux"
57	"go.uber.org/zap"
58	"golang.org/x/crypto/bcrypt"
59	"google.golang.org/grpc"
60	"google.golang.org/grpc/grpclog"
61	"google.golang.org/grpc/keepalive"
62)
63
64const (
65	// RequestWaitTimeout is the time duration to wait for a request to go through or detect leader loss.
66	RequestWaitTimeout = 3 * time.Second
67	tickDuration       = 10 * time.Millisecond
68	requestTimeout     = 20 * time.Second
69
70	clusterName  = "etcd"
71	basePort     = 21000
72	URLScheme    = "unix"
73	URLSchemeTLS = "unixs"
74)
75
76var (
77	electionTicks = 10
78
79	// integration test uses unique ports, counting up, to listen for each
80	// member, ensuring restarted members can listen on the same port again.
81	localListenCount = int64(0)
82
83	testTLSInfo = transport.TLSInfo{
84		KeyFile:        "./fixtures/server.key.insecure",
85		CertFile:       "./fixtures/server.crt",
86		TrustedCAFile:  "./fixtures/ca.crt",
87		ClientCertAuth: true,
88	}
89
90	testTLSInfoIP = transport.TLSInfo{
91		KeyFile:        "./fixtures/server-ip.key.insecure",
92		CertFile:       "./fixtures/server-ip.crt",
93		TrustedCAFile:  "./fixtures/ca.crt",
94		ClientCertAuth: true,
95	}
96
97	testTLSInfoExpired = transport.TLSInfo{
98		KeyFile:        "./fixtures-expired/server.key.insecure",
99		CertFile:       "./fixtures-expired/server.crt",
100		TrustedCAFile:  "./fixtures-expired/ca.crt",
101		ClientCertAuth: true,
102	}
103
104	testTLSInfoExpiredIP = transport.TLSInfo{
105		KeyFile:        "./fixtures-expired/server-ip.key.insecure",
106		CertFile:       "./fixtures-expired/server-ip.crt",
107		TrustedCAFile:  "./fixtures-expired/ca.crt",
108		ClientCertAuth: true,
109	}
110
111	defaultTokenJWT = "jwt,pub-key=./fixtures/server.crt,priv-key=./fixtures/server.key.insecure,sign-method=RS256,ttl=1s"
112
113	lg = zap.NewNop()
114)
115
116func init() {
117	if os.Getenv("CLUSTER_DEBUG") != "" {
118		lg, _ = zap.NewProduction()
119	}
120}
121
122type ClusterConfig struct {
123	Size      int
124	PeerTLS   *transport.TLSInfo
125	ClientTLS *transport.TLSInfo
126
127	DiscoveryURL string
128
129	AuthToken string
130
131	UseGRPC bool
132
133	QuotaBackendBytes int64
134
135	MaxTxnOps              uint
136	MaxRequestBytes        uint
137	SnapshotCount          uint64
138	SnapshotCatchUpEntries uint64
139
140	GRPCKeepAliveMinTime  time.Duration
141	GRPCKeepAliveInterval time.Duration
142	GRPCKeepAliveTimeout  time.Duration
143
144	// SkipCreatingClient to skip creating clients for each member.
145	SkipCreatingClient bool
146
147	ClientMaxCallSendMsgSize int
148	ClientMaxCallRecvMsgSize int
149
150	// UseIP is true to use only IP for gRPC requests.
151	UseIP bool
152
153	EnableLeaseCheckpoint   bool
154	LeaseCheckpointInterval time.Duration
155}
156
157type cluster struct {
158	cfg     *ClusterConfig
159	Members []*member
160}
161
162func schemeFromTLSInfo(tls *transport.TLSInfo) string {
163	if tls == nil {
164		return URLScheme
165	}
166	return URLSchemeTLS
167}
168
169func (c *cluster) fillClusterForMembers() error {
170	if c.cfg.DiscoveryURL != "" {
171		// cluster will be discovered
172		return nil
173	}
174
175	addrs := make([]string, 0)
176	for _, m := range c.Members {
177		scheme := schemeFromTLSInfo(m.PeerTLSInfo)
178		for _, l := range m.PeerListeners {
179			addrs = append(addrs, fmt.Sprintf("%s=%s://%s", m.Name, scheme, l.Addr().String()))
180		}
181	}
182	clusterStr := strings.Join(addrs, ",")
183	var err error
184	for _, m := range c.Members {
185		m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
186		if err != nil {
187			return err
188		}
189	}
190	return nil
191}
192
193func newCluster(t testing.TB, cfg *ClusterConfig) *cluster {
194	c := &cluster{cfg: cfg}
195	ms := make([]*member, cfg.Size)
196	for i := 0; i < cfg.Size; i++ {
197		ms[i] = c.mustNewMember(t)
198	}
199	c.Members = ms
200	if err := c.fillClusterForMembers(); err != nil {
201		t.Fatal(err)
202	}
203
204	return c
205}
206
207// NewCluster returns an unlaunched cluster of the given size which has been
208// set to use static bootstrap.
209func NewCluster(t testing.TB, size int) *cluster {
210	return newCluster(t, &ClusterConfig{Size: size})
211}
212
213// NewClusterByConfig returns an unlaunched cluster defined by a cluster configuration
214func NewClusterByConfig(t testing.TB, cfg *ClusterConfig) *cluster {
215	return newCluster(t, cfg)
216}
217
218func (c *cluster) Launch(t testing.TB) {
219	errc := make(chan error)
220	for _, m := range c.Members {
221		// Members are launched in separate goroutines because if they boot
222		// using discovery url, they have to wait for others to register to continue.
223		go func(m *member) {
224			errc <- m.Launch()
225		}(m)
226	}
227	for range c.Members {
228		if err := <-errc; err != nil {
229			t.Fatalf("error setting up member: %v", err)
230		}
231	}
232	// wait cluster to be stable to receive future client requests
233	c.waitMembersMatch(t, c.HTTPMembers())
234	c.waitVersion()
235}
236
237func (c *cluster) URL(i int) string {
238	return c.Members[i].ClientURLs[0].String()
239}
240
241// URLs returns a list of all active client URLs in the cluster
242func (c *cluster) URLs() []string {
243	return getMembersURLs(c.Members)
244}
245
246func getMembersURLs(members []*member) []string {
247	urls := make([]string, 0)
248	for _, m := range members {
249		select {
250		case <-m.s.StopNotify():
251			continue
252		default:
253		}
254		for _, u := range m.ClientURLs {
255			urls = append(urls, u.String())
256		}
257	}
258	return urls
259}
260
261// HTTPMembers returns a list of all active members as client.Members
262func (c *cluster) HTTPMembers() []client.Member {
263	ms := []client.Member{}
264	for _, m := range c.Members {
265		pScheme := schemeFromTLSInfo(m.PeerTLSInfo)
266		cScheme := schemeFromTLSInfo(m.ClientTLSInfo)
267		cm := client.Member{Name: m.Name}
268		for _, ln := range m.PeerListeners {
269			cm.PeerURLs = append(cm.PeerURLs, pScheme+"://"+ln.Addr().String())
270		}
271		for _, ln := range m.ClientListeners {
272			cm.ClientURLs = append(cm.ClientURLs, cScheme+"://"+ln.Addr().String())
273		}
274		ms = append(ms, cm)
275	}
276	return ms
277}
278
279func (c *cluster) mustNewMember(t testing.TB) *member {
280	m := mustNewMember(t,
281		memberConfig{
282			name:                     c.name(rand.Int()),
283			authToken:                c.cfg.AuthToken,
284			peerTLS:                  c.cfg.PeerTLS,
285			clientTLS:                c.cfg.ClientTLS,
286			quotaBackendBytes:        c.cfg.QuotaBackendBytes,
287			maxTxnOps:                c.cfg.MaxTxnOps,
288			maxRequestBytes:          c.cfg.MaxRequestBytes,
289			snapshotCount:            c.cfg.SnapshotCount,
290			snapshotCatchUpEntries:   c.cfg.SnapshotCatchUpEntries,
291			grpcKeepAliveMinTime:     c.cfg.GRPCKeepAliveMinTime,
292			grpcKeepAliveInterval:    c.cfg.GRPCKeepAliveInterval,
293			grpcKeepAliveTimeout:     c.cfg.GRPCKeepAliveTimeout,
294			clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize,
295			clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
296			useIP:                    c.cfg.UseIP,
297			enableLeaseCheckpoint:    c.cfg.EnableLeaseCheckpoint,
298			leaseCheckpointInterval:  c.cfg.LeaseCheckpointInterval,
299		})
300	m.DiscoveryURL = c.cfg.DiscoveryURL
301	if c.cfg.UseGRPC {
302		if err := m.listenGRPC(); err != nil {
303			t.Fatal(err)
304		}
305	}
306	return m
307}
308
309func (c *cluster) addMember(t testing.TB) {
310	m := c.mustNewMember(t)
311
312	scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
313
314	// send add request to the cluster
315	var err error
316	for i := 0; i < len(c.Members); i++ {
317		clientURL := c.URL(i)
318		peerURL := scheme + "://" + m.PeerListeners[0].Addr().String()
319		if err = c.addMemberByURL(t, clientURL, peerURL); err == nil {
320			break
321		}
322	}
323	if err != nil {
324		t.Fatalf("add member failed on all members error: %v", err)
325	}
326
327	m.InitialPeerURLsMap = types.URLsMap{}
328	for _, mm := range c.Members {
329		m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
330	}
331	m.InitialPeerURLsMap[m.Name] = m.PeerURLs
332	m.NewCluster = false
333	if err := m.Launch(); err != nil {
334		t.Fatal(err)
335	}
336	c.Members = append(c.Members, m)
337	// wait cluster to be stable to receive future client requests
338	c.waitMembersMatch(t, c.HTTPMembers())
339}
340
341func (c *cluster) addMemberByURL(t testing.TB, clientURL, peerURL string) error {
342	cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
343	ma := client.NewMembersAPI(cc)
344	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
345	_, err := ma.Add(ctx, peerURL)
346	cancel()
347	if err != nil {
348		return err
349	}
350
351	// wait for the add node entry applied in the cluster
352	members := append(c.HTTPMembers(), client.Member{PeerURLs: []string{peerURL}, ClientURLs: []string{}})
353	c.waitMembersMatch(t, members)
354	return nil
355}
356
357func (c *cluster) AddMember(t testing.TB) {
358	c.addMember(t)
359}
360
361func (c *cluster) RemoveMember(t testing.TB, id uint64) {
362	if err := c.removeMember(t, id); err != nil {
363		t.Fatal(err)
364	}
365}
366
367func (c *cluster) removeMember(t testing.TB, id uint64) error {
368	// send remove request to the cluster
369	cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
370	ma := client.NewMembersAPI(cc)
371	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
372	err := ma.Remove(ctx, types.ID(id).String())
373	cancel()
374	if err != nil {
375		return err
376	}
377	newMembers := make([]*member, 0)
378	for _, m := range c.Members {
379		if uint64(m.s.ID()) != id {
380			newMembers = append(newMembers, m)
381		} else {
382			select {
383			case <-m.s.StopNotify():
384				m.Terminate(t)
385			// 1s stop delay + election timeout + 1s disk and network delay + connection write timeout
386			// TODO: remove connection write timeout by selecting on http response closeNotifier
387			// blocking on https://github.com/golang/go/issues/9524
388			case <-time.After(time.Second + time.Duration(electionTicks)*tickDuration + time.Second + rafthttp.ConnWriteTimeout):
389				t.Fatalf("failed to remove member %s in time", m.s.ID())
390			}
391		}
392	}
393	c.Members = newMembers
394	c.waitMembersMatch(t, c.HTTPMembers())
395	return nil
396}
397
398func (c *cluster) Terminate(t testing.TB) {
399	var wg sync.WaitGroup
400	wg.Add(len(c.Members))
401	for _, m := range c.Members {
402		go func(mm *member) {
403			defer wg.Done()
404			mm.Terminate(t)
405		}(m)
406	}
407	wg.Wait()
408}
409
410func (c *cluster) waitMembersMatch(t testing.TB, membs []client.Member) {
411	for _, u := range c.URLs() {
412		cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
413		ma := client.NewMembersAPI(cc)
414		for {
415			ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
416			ms, err := ma.List(ctx)
417			cancel()
418			if err == nil && isMembersEqual(ms, membs) {
419				break
420			}
421			time.Sleep(tickDuration)
422		}
423	}
424}
425
426func (c *cluster) WaitLeader(t testing.TB) int { return c.waitLeader(t, c.Members) }
427
428// waitLeader waits until given members agree on the same leader.
429func (c *cluster) waitLeader(t testing.TB, membs []*member) int {
430	possibleLead := make(map[uint64]bool)
431	var lead uint64
432	for _, m := range membs {
433		possibleLead[uint64(m.s.ID())] = true
434	}
435	cc := MustNewHTTPClient(t, getMembersURLs(membs), nil)
436	kapi := client.NewKeysAPI(cc)
437
438	// ensure leader is up via linearizable get
439	for {
440		ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second)
441		_, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true})
442		cancel()
443		if err == nil || strings.Contains(err.Error(), "Key not found") {
444			break
445		}
446	}
447
448	for lead == 0 || !possibleLead[lead] {
449		lead = 0
450		for _, m := range membs {
451			select {
452			case <-m.s.StopNotify():
453				continue
454			default:
455			}
456			if lead != 0 && lead != m.s.Lead() {
457				lead = 0
458				time.Sleep(10 * tickDuration)
459				break
460			}
461			lead = m.s.Lead()
462		}
463	}
464
465	for i, m := range membs {
466		if uint64(m.s.ID()) == lead {
467			return i
468		}
469	}
470
471	return -1
472}
473
474func (c *cluster) WaitNoLeader() { c.waitNoLeader(c.Members) }
475
476// waitNoLeader waits until given members lose leader.
477func (c *cluster) waitNoLeader(membs []*member) {
478	noLeader := false
479	for !noLeader {
480		noLeader = true
481		for _, m := range membs {
482			select {
483			case <-m.s.StopNotify():
484				continue
485			default:
486			}
487			if m.s.Lead() != 0 {
488				noLeader = false
489				time.Sleep(10 * tickDuration)
490				break
491			}
492		}
493	}
494}
495
496func (c *cluster) waitVersion() {
497	for _, m := range c.Members {
498		for {
499			if m.s.ClusterVersion() != nil {
500				break
501			}
502			time.Sleep(tickDuration)
503		}
504	}
505}
506
507func (c *cluster) name(i int) string {
508	return fmt.Sprint(i)
509}
510
511// isMembersEqual checks whether two members equal except ID field.
512// The given wmembs should always set ID field to empty string.
513func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
514	sort.Sort(SortableMemberSliceByPeerURLs(membs))
515	sort.Sort(SortableMemberSliceByPeerURLs(wmembs))
516	for i := range membs {
517		membs[i].ID = ""
518	}
519	return reflect.DeepEqual(membs, wmembs)
520}
521
522func newLocalListener(t testing.TB) net.Listener {
523	c := atomic.AddInt64(&localListenCount, 1)
524	// Go 1.8+ allows only numbers in port
525	addr := fmt.Sprintf("127.0.0.1:%05d%05d", c+basePort, os.Getpid())
526	return NewListenerWithAddr(t, addr)
527}
528
529func NewListenerWithAddr(t testing.TB, addr string) net.Listener {
530	l, err := transport.NewUnixListener(addr)
531	if err != nil {
532		t.Fatal(err)
533	}
534	return l
535}
536
537type member struct {
538	etcdserver.ServerConfig
539	PeerListeners, ClientListeners []net.Listener
540	grpcListener                   net.Listener
541	// PeerTLSInfo enables peer TLS when set
542	PeerTLSInfo *transport.TLSInfo
543	// ClientTLSInfo enables client TLS when set
544	ClientTLSInfo *transport.TLSInfo
545	DialOptions   []grpc.DialOption
546
547	raftHandler   *testutil.PauseableHandler
548	s             *etcdserver.EtcdServer
549	serverClosers []func()
550
551	grpcServerOpts []grpc.ServerOption
552	grpcServer     *grpc.Server
553	grpcServerPeer *grpc.Server
554	grpcAddr       string
555	grpcBridge     *bridge
556
557	// serverClient is a clientv3 that directly calls the etcdserver.
558	serverClient *clientv3.Client
559
560	keepDataDirTerminate     bool
561	clientMaxCallSendMsgSize int
562	clientMaxCallRecvMsgSize int
563	useIP                    bool
564
565	isLearner bool
566}
567
568func (m *member) GRPCAddr() string { return m.grpcAddr }
569
570type memberConfig struct {
571	name                     string
572	peerTLS                  *transport.TLSInfo
573	clientTLS                *transport.TLSInfo
574	authToken                string
575	quotaBackendBytes        int64
576	maxTxnOps                uint
577	maxRequestBytes          uint
578	snapshotCount            uint64
579	snapshotCatchUpEntries   uint64
580	grpcKeepAliveMinTime     time.Duration
581	grpcKeepAliveInterval    time.Duration
582	grpcKeepAliveTimeout     time.Duration
583	clientMaxCallSendMsgSize int
584	clientMaxCallRecvMsgSize int
585	useIP                    bool
586	enableLeaseCheckpoint    bool
587	leaseCheckpointInterval  time.Duration
588}
589
590// mustNewMember return an inited member with the given name. If peerTLS is
591// set, it will use https scheme to communicate between peers.
592func mustNewMember(t testing.TB, mcfg memberConfig) *member {
593	var err error
594	m := &member{}
595
596	peerScheme := schemeFromTLSInfo(mcfg.peerTLS)
597	clientScheme := schemeFromTLSInfo(mcfg.clientTLS)
598
599	pln := newLocalListener(t)
600	m.PeerListeners = []net.Listener{pln}
601	m.PeerURLs, err = types.NewURLs([]string{peerScheme + "://" + pln.Addr().String()})
602	if err != nil {
603		t.Fatal(err)
604	}
605	m.PeerTLSInfo = mcfg.peerTLS
606
607	cln := newLocalListener(t)
608	m.ClientListeners = []net.Listener{cln}
609	m.ClientURLs, err = types.NewURLs([]string{clientScheme + "://" + cln.Addr().String()})
610	if err != nil {
611		t.Fatal(err)
612	}
613	m.ClientTLSInfo = mcfg.clientTLS
614
615	m.Name = mcfg.name
616
617	m.DataDir, err = ioutil.TempDir(os.TempDir(), "etcd")
618	if err != nil {
619		t.Fatal(err)
620	}
621	clusterStr := fmt.Sprintf("%s=%s://%s", mcfg.name, peerScheme, pln.Addr().String())
622	m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
623	if err != nil {
624		t.Fatal(err)
625	}
626	m.InitialClusterToken = clusterName
627	m.NewCluster = true
628	m.BootstrapTimeout = 10 * time.Millisecond
629	if m.PeerTLSInfo != nil {
630		m.ServerConfig.PeerTLSInfo = *m.PeerTLSInfo
631	}
632	m.ElectionTicks = electionTicks
633	m.InitialElectionTickAdvance = true
634	m.TickMs = uint(tickDuration / time.Millisecond)
635	m.QuotaBackendBytes = mcfg.quotaBackendBytes
636	m.MaxTxnOps = mcfg.maxTxnOps
637	if m.MaxTxnOps == 0 {
638		m.MaxTxnOps = embed.DefaultMaxTxnOps
639	}
640	m.MaxRequestBytes = mcfg.maxRequestBytes
641	if m.MaxRequestBytes == 0 {
642		m.MaxRequestBytes = embed.DefaultMaxRequestBytes
643	}
644	m.SnapshotCount = etcdserver.DefaultSnapshotCount
645	if mcfg.snapshotCount != 0 {
646		m.SnapshotCount = mcfg.snapshotCount
647	}
648	m.SnapshotCatchUpEntries = etcdserver.DefaultSnapshotCatchUpEntries
649	if mcfg.snapshotCatchUpEntries != 0 {
650		m.SnapshotCatchUpEntries = mcfg.snapshotCatchUpEntries
651	}
652
653	// for the purpose of integration testing, simple token is enough
654	m.AuthToken = "simple"
655	if mcfg.authToken != "" {
656		m.AuthToken = mcfg.authToken
657	}
658
659	m.BcryptCost = uint(bcrypt.MinCost) // use min bcrypt cost to speedy up integration testing
660
661	m.grpcServerOpts = []grpc.ServerOption{}
662	if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
663		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
664			MinTime:             mcfg.grpcKeepAliveMinTime,
665			PermitWithoutStream: false,
666		}))
667	}
668	if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
669		mcfg.grpcKeepAliveTimeout > time.Duration(0) {
670		m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
671			Time:    mcfg.grpcKeepAliveInterval,
672			Timeout: mcfg.grpcKeepAliveTimeout,
673		}))
674	}
675	m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize
676	m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize
677	m.useIP = mcfg.useIP
678	m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
679	m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
680
681	m.InitialCorruptCheck = true
682
683	lcfg := logutil.DefaultZapLoggerConfig
684	m.LoggerConfig = &lcfg
685	m.LoggerConfig.OutputPaths = []string{"/dev/null"}
686	m.LoggerConfig.ErrorOutputPaths = []string{"/dev/null"}
687	if os.Getenv("CLUSTER_DEBUG") != "" {
688		m.LoggerConfig.OutputPaths = []string{"stderr"}
689		m.LoggerConfig.ErrorOutputPaths = []string{"stderr"}
690	}
691	m.Logger, err = m.LoggerConfig.Build()
692	if err != nil {
693		t.Fatal(err)
694	}
695	return m
696}
697
698// listenGRPC starts a grpc server over a unix domain socket on the member
699func (m *member) listenGRPC() error {
700	// prefix with localhost so cert has right domain
701	m.grpcAddr = "localhost:" + m.Name
702	if m.useIP { // for IP-only TLS certs
703		m.grpcAddr = "127.0.0.1:" + m.Name
704	}
705	l, err := transport.NewUnixListener(m.grpcAddr)
706	if err != nil {
707		return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcAddr, err)
708	}
709	m.grpcBridge, err = newBridge(m.grpcAddr)
710	if err != nil {
711		l.Close()
712		return err
713	}
714	m.grpcAddr = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + m.grpcBridge.inaddr
715	m.grpcListener = l
716	return nil
717}
718
719func (m *member) ElectionTimeout() time.Duration {
720	return time.Duration(m.s.Cfg.ElectionTicks*int(m.s.Cfg.TickMs)) * time.Millisecond
721}
722
723func (m *member) ID() types.ID { return m.s.ID() }
724
725func (m *member) DropConnections()    { m.grpcBridge.Reset() }
726func (m *member) PauseConnections()   { m.grpcBridge.Pause() }
727func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
728func (m *member) Blackhole()          { m.grpcBridge.Blackhole() }
729func (m *member) Unblackhole()        { m.grpcBridge.Unblackhole() }
730
731// NewClientV3 creates a new grpc client connection to the member
732func NewClientV3(m *member) (*clientv3.Client, error) {
733	if m.grpcAddr == "" {
734		return nil, fmt.Errorf("member not configured for grpc")
735	}
736
737	cfg := clientv3.Config{
738		Endpoints:          []string{m.grpcAddr},
739		DialTimeout:        5 * time.Second,
740		DialOptions:        []grpc.DialOption{grpc.WithBlock()},
741		MaxCallSendMsgSize: m.clientMaxCallSendMsgSize,
742		MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize,
743	}
744
745	if m.ClientTLSInfo != nil {
746		tls, err := m.ClientTLSInfo.ClientConfig()
747		if err != nil {
748			return nil, err
749		}
750		cfg.TLS = tls
751	}
752	if m.DialOptions != nil {
753		cfg.DialOptions = append(cfg.DialOptions, m.DialOptions...)
754	}
755	return newClientV3(cfg)
756}
757
758// Clone returns a member with the same server configuration. The returned
759// member will not set PeerListeners and ClientListeners.
760func (m *member) Clone(t testing.TB) *member {
761	mm := &member{}
762	mm.ServerConfig = m.ServerConfig
763
764	var err error
765	clientURLStrs := m.ClientURLs.StringSlice()
766	mm.ClientURLs, err = types.NewURLs(clientURLStrs)
767	if err != nil {
768		// this should never fail
769		panic(err)
770	}
771	peerURLStrs := m.PeerURLs.StringSlice()
772	mm.PeerURLs, err = types.NewURLs(peerURLStrs)
773	if err != nil {
774		// this should never fail
775		panic(err)
776	}
777	clusterStr := m.InitialPeerURLsMap.String()
778	mm.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
779	if err != nil {
780		// this should never fail
781		panic(err)
782	}
783	mm.InitialClusterToken = m.InitialClusterToken
784	mm.ElectionTicks = m.ElectionTicks
785	mm.PeerTLSInfo = m.PeerTLSInfo
786	mm.ClientTLSInfo = m.ClientTLSInfo
787	return mm
788}
789
790// Launch starts a member based on ServerConfig, PeerListeners
791// and ClientListeners.
792func (m *member) Launch() error {
793	lg.Info(
794		"launching a member",
795		zap.String("name", m.Name),
796		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
797		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
798		zap.String("grpc-address", m.grpcAddr),
799	)
800	var err error
801	if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
802		return fmt.Errorf("failed to initialize the etcd server: %v", err)
803	}
804	m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
805	m.s.Start()
806
807	var peerTLScfg *tls.Config
808	if m.PeerTLSInfo != nil && !m.PeerTLSInfo.Empty() {
809		if peerTLScfg, err = m.PeerTLSInfo.ServerConfig(); err != nil {
810			return err
811		}
812	}
813
814	if m.grpcListener != nil {
815		var (
816			tlscfg *tls.Config
817		)
818		if m.ClientTLSInfo != nil && !m.ClientTLSInfo.Empty() {
819			tlscfg, err = m.ClientTLSInfo.ServerConfig()
820			if err != nil {
821				return err
822			}
823		}
824		m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
825		m.grpcServerPeer = v3rpc.Server(m.s, peerTLScfg)
826		m.serverClient = v3client.New(m.s)
827		lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
828		epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
829		go m.grpcServer.Serve(m.grpcListener)
830	}
831
832	m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.Logger, m.s)}
833
834	h := (http.Handler)(m.raftHandler)
835	if m.grpcListener != nil {
836		h = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
837			if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
838				m.grpcServerPeer.ServeHTTP(w, r)
839			} else {
840				m.raftHandler.ServeHTTP(w, r)
841			}
842		})
843	}
844
845	for _, ln := range m.PeerListeners {
846		cm := cmux.New(ln)
847		// don't hang on matcher after closing listener
848		cm.SetReadTimeout(time.Second)
849
850		if m.grpcServer != nil {
851			grpcl := cm.Match(cmux.HTTP2())
852			go m.grpcServerPeer.Serve(grpcl)
853		}
854
855		// serve http1/http2 rafthttp/grpc
856		ll := cm.Match(cmux.Any())
857		if peerTLScfg != nil {
858			if ll, err = transport.NewTLSListener(ll, m.PeerTLSInfo); err != nil {
859				return err
860			}
861		}
862		hs := &httptest.Server{
863			Listener: ll,
864			Config: &http.Server{
865				Handler:   h,
866				TLSConfig: peerTLScfg,
867				ErrorLog:  log.New(ioutil.Discard, "net/http", 0),
868			},
869			TLS: peerTLScfg,
870		}
871		hs.Start()
872
873		donec := make(chan struct{})
874		go func() {
875			defer close(donec)
876			cm.Serve()
877		}()
878		closer := func() {
879			ll.Close()
880			hs.CloseClientConnections()
881			hs.Close()
882			<-donec
883		}
884		m.serverClosers = append(m.serverClosers, closer)
885	}
886	for _, ln := range m.ClientListeners {
887		hs := &httptest.Server{
888			Listener: ln,
889			Config: &http.Server{
890				Handler: v2http.NewClientHandler(
891					m.Logger,
892					m.s,
893					m.ServerConfig.ReqTimeout(),
894				),
895				ErrorLog: log.New(ioutil.Discard, "net/http", 0),
896			},
897		}
898		if m.ClientTLSInfo == nil {
899			hs.Start()
900		} else {
901			info := m.ClientTLSInfo
902			hs.TLS, err = info.ServerConfig()
903			if err != nil {
904				return err
905			}
906
907			// baseConfig is called on initial TLS handshake start.
908			//
909			// Previously,
910			// 1. Server has non-empty (*tls.Config).Certificates on client hello
911			// 2. Server calls (*tls.Config).GetCertificate iff:
912			//    - Server's (*tls.Config).Certificates is not empty, or
913			//    - Client supplies SNI; non-empty (*tls.ClientHelloInfo).ServerName
914			//
915			// When (*tls.Config).Certificates is always populated on initial handshake,
916			// client is expected to provide a valid matching SNI to pass the TLS
917			// verification, thus trigger server (*tls.Config).GetCertificate to reload
918			// TLS assets. However, a cert whose SAN field does not include domain names
919			// but only IP addresses, has empty (*tls.ClientHelloInfo).ServerName, thus
920			// it was never able to trigger TLS reload on initial handshake; first
921			// ceritifcate object was being used, never being updated.
922			//
923			// Now, (*tls.Config).Certificates is created empty on initial TLS client
924			// handshake, in order to trigger (*tls.Config).GetCertificate and populate
925			// rest of the certificates on every new TLS connection, even when client
926			// SNI is empty (e.g. cert only includes IPs).
927			//
928			// This introduces another problem with "httptest.Server":
929			// when server initial certificates are empty, certificates
930			// are overwritten by Go's internal test certs, which have
931			// different SAN fields (e.g. example.com). To work around,
932			// re-overwrite (*tls.Config).Certificates before starting
933			// test server.
934			tlsCert, err := tlsutil.NewCert(info.CertFile, info.KeyFile, nil)
935			if err != nil {
936				return err
937			}
938			hs.TLS.Certificates = []tls.Certificate{*tlsCert}
939
940			hs.StartTLS()
941		}
942		closer := func() {
943			ln.Close()
944			hs.CloseClientConnections()
945			hs.Close()
946		}
947		m.serverClosers = append(m.serverClosers, closer)
948	}
949
950	lg.Info(
951		"launched a member",
952		zap.String("name", m.Name),
953		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
954		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
955		zap.String("grpc-address", m.grpcAddr),
956	)
957	return nil
958}
959
960func (m *member) WaitOK(t testing.TB) {
961	m.WaitStarted(t)
962	for m.s.Leader() == 0 {
963		time.Sleep(tickDuration)
964	}
965}
966
967func (m *member) WaitStarted(t testing.TB) {
968	cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
969	kapi := client.NewKeysAPI(cc)
970	for {
971		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
972		_, err := kapi.Get(ctx, "/", nil)
973		if err != nil {
974			time.Sleep(tickDuration)
975			continue
976		}
977		cancel()
978		break
979	}
980}
981
982func WaitClientV3(t testing.TB, kv clientv3.KV) {
983	timeout := time.Now().Add(requestTimeout)
984	var err error
985	for time.Now().Before(timeout) {
986		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
987		_, err = kv.Get(ctx, "/")
988		cancel()
989		if err == nil {
990			return
991		}
992		time.Sleep(tickDuration)
993	}
994	if err != nil {
995		t.Fatalf("timed out waiting for client: %v", err)
996	}
997}
998
999func (m *member) URL() string { return m.ClientURLs[0].String() }
1000
1001func (m *member) Pause() {
1002	m.raftHandler.Pause()
1003	m.s.PauseSending()
1004}
1005
1006func (m *member) Resume() {
1007	m.raftHandler.Resume()
1008	m.s.ResumeSending()
1009}
1010
1011// Close stops the member's etcdserver and closes its connections
1012func (m *member) Close() {
1013	if m.grpcBridge != nil {
1014		m.grpcBridge.Close()
1015		m.grpcBridge = nil
1016	}
1017	if m.serverClient != nil {
1018		m.serverClient.Close()
1019		m.serverClient = nil
1020	}
1021	if m.grpcServer != nil {
1022		ch := make(chan struct{})
1023		go func() {
1024			defer close(ch)
1025			// close listeners to stop accepting new connections,
1026			// will block on any existing transports
1027			m.grpcServer.GracefulStop()
1028		}()
1029		// wait until all pending RPCs are finished
1030		select {
1031		case <-ch:
1032		case <-time.After(2 * time.Second):
1033			// took too long, manually close open transports
1034			// e.g. watch streams
1035			m.grpcServer.Stop()
1036			<-ch
1037		}
1038		m.grpcServer = nil
1039		m.grpcServerPeer.GracefulStop()
1040		m.grpcServerPeer.Stop()
1041		m.grpcServerPeer = nil
1042	}
1043	m.s.HardStop()
1044	for _, f := range m.serverClosers {
1045		f()
1046	}
1047}
1048
1049// Stop stops the member, but the data dir of the member is preserved.
1050func (m *member) Stop(t testing.TB) {
1051	lg.Info(
1052		"stopping a member",
1053		zap.String("name", m.Name),
1054		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1055		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1056		zap.String("grpc-address", m.grpcAddr),
1057	)
1058	m.Close()
1059	m.serverClosers = nil
1060	lg.Info(
1061		"stopped a member",
1062		zap.String("name", m.Name),
1063		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1064		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1065		zap.String("grpc-address", m.grpcAddr),
1066	)
1067}
1068
1069// checkLeaderTransition waits for leader transition, returning the new leader ID.
1070func checkLeaderTransition(m *member, oldLead uint64) uint64 {
1071	interval := time.Duration(m.s.Cfg.TickMs) * time.Millisecond
1072	for m.s.Lead() == 0 || (m.s.Lead() == oldLead) {
1073		time.Sleep(interval)
1074	}
1075	return m.s.Lead()
1076}
1077
1078// StopNotify unblocks when a member stop completes
1079func (m *member) StopNotify() <-chan struct{} {
1080	return m.s.StopNotify()
1081}
1082
1083// Restart starts the member using the preserved data dir.
1084func (m *member) Restart(t testing.TB) error {
1085	lg.Info(
1086		"restarting a member",
1087		zap.String("name", m.Name),
1088		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1089		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1090		zap.String("grpc-address", m.grpcAddr),
1091	)
1092	newPeerListeners := make([]net.Listener, 0)
1093	for _, ln := range m.PeerListeners {
1094		newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
1095	}
1096	m.PeerListeners = newPeerListeners
1097	newClientListeners := make([]net.Listener, 0)
1098	for _, ln := range m.ClientListeners {
1099		newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
1100	}
1101	m.ClientListeners = newClientListeners
1102
1103	if m.grpcListener != nil {
1104		if err := m.listenGRPC(); err != nil {
1105			t.Fatal(err)
1106		}
1107	}
1108
1109	err := m.Launch()
1110	lg.Info(
1111		"restarted a member",
1112		zap.String("name", m.Name),
1113		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1114		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1115		zap.String("grpc-address", m.grpcAddr),
1116		zap.Error(err),
1117	)
1118	return err
1119}
1120
1121// Terminate stops the member and removes the data dir.
1122func (m *member) Terminate(t testing.TB) {
1123	lg.Info(
1124		"terminating a member",
1125		zap.String("name", m.Name),
1126		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1127		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1128		zap.String("grpc-address", m.grpcAddr),
1129	)
1130	m.Close()
1131	if !m.keepDataDirTerminate {
1132		if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
1133			t.Fatal(err)
1134		}
1135	}
1136	lg.Info(
1137		"terminated a member",
1138		zap.String("name", m.Name),
1139		zap.Strings("advertise-peer-urls", m.PeerURLs.StringSlice()),
1140		zap.Strings("listen-client-urls", m.ClientURLs.StringSlice()),
1141		zap.String("grpc-address", m.grpcAddr),
1142	)
1143}
1144
1145// Metric gets the metric value for a member
1146func (m *member) Metric(metricName string, expectLabels ...string) (string, error) {
1147	cfgtls := transport.TLSInfo{}
1148	tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second)
1149	if err != nil {
1150		return "", err
1151	}
1152	cli := &http.Client{Transport: tr}
1153	resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics")
1154	if err != nil {
1155		return "", err
1156	}
1157	defer resp.Body.Close()
1158	b, rerr := ioutil.ReadAll(resp.Body)
1159	if rerr != nil {
1160		return "", rerr
1161	}
1162	lines := strings.Split(string(b), "\n")
1163	for _, l := range lines {
1164		if !strings.HasPrefix(l, metricName) {
1165			continue
1166		}
1167		ok := true
1168		for _, lv := range expectLabels {
1169			if !strings.Contains(l, lv) {
1170				ok = false
1171				break
1172			}
1173		}
1174		if !ok {
1175			continue
1176		}
1177		return strings.Split(l, " ")[1], nil
1178	}
1179	return "", nil
1180}
1181
1182// InjectPartition drops connections from m to others, vice versa.
1183func (m *member) InjectPartition(t testing.TB, others ...*member) {
1184	for _, other := range others {
1185		m.s.CutPeer(other.s.ID())
1186		other.s.CutPeer(m.s.ID())
1187	}
1188}
1189
1190// RecoverPartition recovers connections from m to others, vice versa.
1191func (m *member) RecoverPartition(t testing.TB, others ...*member) {
1192	for _, other := range others {
1193		m.s.MendPeer(other.s.ID())
1194		other.s.MendPeer(m.s.ID())
1195	}
1196}
1197
1198func (m *member) ReadyNotify() <-chan struct{} {
1199	return m.s.ReadyNotify()
1200}
1201
1202func MustNewHTTPClient(t testing.TB, eps []string, tls *transport.TLSInfo) client.Client {
1203	cfgtls := transport.TLSInfo{}
1204	if tls != nil {
1205		cfgtls = *tls
1206	}
1207	cfg := client.Config{Transport: mustNewTransport(t, cfgtls), Endpoints: eps}
1208	c, err := client.New(cfg)
1209	if err != nil {
1210		t.Fatal(err)
1211	}
1212	return c
1213}
1214
1215func mustNewTransport(t testing.TB, tlsInfo transport.TLSInfo) *http.Transport {
1216	// tick in integration test is short, so 1s dial timeout could play well.
1217	tr, err := transport.NewTimeoutTransport(tlsInfo, time.Second, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
1218	if err != nil {
1219		t.Fatal(err)
1220	}
1221	return tr
1222}
1223
1224type SortableMemberSliceByPeerURLs []client.Member
1225
1226func (p SortableMemberSliceByPeerURLs) Len() int { return len(p) }
1227func (p SortableMemberSliceByPeerURLs) Less(i, j int) bool {
1228	return p[i].PeerURLs[0] < p[j].PeerURLs[0]
1229}
1230func (p SortableMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
1231
1232type ClusterV3 struct {
1233	*cluster
1234
1235	mu      sync.Mutex
1236	clients []*clientv3.Client
1237}
1238
1239// NewClusterV3 returns a launched cluster with a grpc client connection
1240// for each cluster member.
1241func NewClusterV3(t testing.TB, cfg *ClusterConfig) *ClusterV3 {
1242	cfg.UseGRPC = true
1243	if os.Getenv("CLIENT_DEBUG") != "" {
1244		clientv3.SetLogger(grpclog.NewLoggerV2WithVerbosity(os.Stderr, os.Stderr, os.Stderr, 4))
1245	}
1246	clus := &ClusterV3{
1247		cluster: NewClusterByConfig(t, cfg),
1248	}
1249	clus.Launch(t)
1250
1251	if !cfg.SkipCreatingClient {
1252		for _, m := range clus.Members {
1253			client, err := NewClientV3(m)
1254			if err != nil {
1255				t.Fatalf("cannot create client: %v", err)
1256			}
1257			clus.clients = append(clus.clients, client)
1258		}
1259	}
1260
1261	return clus
1262}
1263
1264func (c *ClusterV3) TakeClient(idx int) {
1265	c.mu.Lock()
1266	c.clients[idx] = nil
1267	c.mu.Unlock()
1268}
1269
1270func (c *ClusterV3) Terminate(t testing.TB) {
1271	c.mu.Lock()
1272	for _, client := range c.clients {
1273		if client == nil {
1274			continue
1275		}
1276		if err := client.Close(); err != nil {
1277			t.Error(err)
1278		}
1279	}
1280	c.mu.Unlock()
1281	c.cluster.Terminate(t)
1282}
1283
1284func (c *ClusterV3) RandClient() *clientv3.Client {
1285	return c.clients[rand.Intn(len(c.clients))]
1286}
1287
1288func (c *ClusterV3) Client(i int) *clientv3.Client {
1289	return c.clients[i]
1290}
1291
1292type grpcAPI struct {
1293	// Cluster is the cluster API for the client's connection.
1294	Cluster pb.ClusterClient
1295	// KV is the keyvalue API for the client's connection.
1296	KV pb.KVClient
1297	// Lease is the lease API for the client's connection.
1298	Lease pb.LeaseClient
1299	// Watch is the watch API for the client's connection.
1300	Watch pb.WatchClient
1301	// Maintenance is the maintenance API for the client's connection.
1302	Maintenance pb.MaintenanceClient
1303	// Auth is the authentication API for the client's connection.
1304	Auth pb.AuthClient
1305	// Lock is the lock API for the client's connection.
1306	Lock lockpb.LockClient
1307	// Election is the election API for the client's connection.
1308	Election epb.ElectionClient
1309}
1310
1311// GetLearnerMembers returns the list of learner members in cluster using MemberList API.
1312func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
1313	cli := c.Client(0)
1314	resp, err := cli.MemberList(context.Background())
1315	if err != nil {
1316		return nil, fmt.Errorf("failed to list member %v", err)
1317	}
1318	var learners []*pb.Member
1319	for _, m := range resp.Members {
1320		if m.IsLearner {
1321			learners = append(learners, m)
1322		}
1323	}
1324	return learners, nil
1325}
1326
1327// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
1328// via v3 MemberAdd API, and then launches the new member.
1329func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
1330	m := c.mustNewMember(t)
1331	m.isLearner = true
1332
1333	scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
1334	peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}
1335
1336	cli := c.Client(0)
1337	_, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
1338	if err != nil {
1339		t.Fatalf("failed to add learner member %v", err)
1340	}
1341
1342	m.InitialPeerURLsMap = types.URLsMap{}
1343	for _, mm := range c.Members {
1344		m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
1345	}
1346	m.InitialPeerURLsMap[m.Name] = m.PeerURLs
1347	m.NewCluster = false
1348
1349	if err := m.Launch(); err != nil {
1350		t.Fatal(err)
1351	}
1352
1353	c.Members = append(c.Members, m)
1354
1355	c.waitMembersMatch(t)
1356}
1357
1358// getMembers returns a list of members in cluster, in format of etcdserverpb.Member
1359func (c *ClusterV3) getMembers() []*pb.Member {
1360	var mems []*pb.Member
1361	for _, m := range c.Members {
1362		mem := &pb.Member{
1363			Name:       m.Name,
1364			PeerURLs:   m.PeerURLs.StringSlice(),
1365			ClientURLs: m.ClientURLs.StringSlice(),
1366			IsLearner:  m.isLearner,
1367		}
1368		mems = append(mems, mem)
1369	}
1370	return mems
1371}
1372
1373// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
1374// local 'c.Members', which is the local recording of members in the testing cluster. With
1375// the exception that the local recording c.Members does not have info on Member.ID, which
1376// is generated when the member is been added to cluster.
1377//
1378// Note:
1379// A successful match means the Member.clientURLs are matched. This means member has already
1380// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
1381// write request (in v2 server). Therefore, at this point, any raft log entries prior to this
1382// would have already been applied.
1383//
1384// If a new member was added to an existing cluster, at this point, it has finished publishing
1385// its own server attributes to the cluster. And therefore by the same argument, it has already
1386// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
1387// the new member has the correct view of the cluster configuration.
1388//
1389// Special note on learner member:
1390// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
1391// the learner member, its initial view of the cluster created by peerURLs map does not have info
1392// on whether or not the new member itself is learner. But at this point, a successful match does
1393// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
1394// which was used to add the learner itself to the cluster, and therefore it has the correct info
1395// on learner.
1396func (c *ClusterV3) waitMembersMatch(t testing.TB) {
1397	wMembers := c.getMembers()
1398	sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
1399	cli := c.Client(0)
1400	for {
1401		resp, err := cli.MemberList(context.Background())
1402		if err != nil {
1403			t.Fatalf("failed to list member %v", err)
1404		}
1405
1406		if len(resp.Members) != len(wMembers) {
1407			continue
1408		}
1409		sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
1410		for _, m := range resp.Members {
1411			m.ID = 0
1412		}
1413		if reflect.DeepEqual(resp.Members, wMembers) {
1414			return
1415		}
1416
1417		time.Sleep(tickDuration)
1418	}
1419}
1420
1421type SortableProtoMemberSliceByPeerURLs []*pb.Member
1422
1423func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
1424func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
1425	return p[i].PeerURLs[0] < p[j].PeerURLs[0]
1426}
1427func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
1428
1429// MustNewMember creates a new member instance based on the response of V3 Member Add API.
1430func (c *ClusterV3) MustNewMember(t testing.TB, resp *clientv3.MemberAddResponse) *member {
1431	m := c.mustNewMember(t)
1432	m.isLearner = resp.Member.IsLearner
1433	m.NewCluster = false
1434
1435	m.InitialPeerURLsMap = types.URLsMap{}
1436	for _, mm := range c.Members {
1437		m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
1438	}
1439	m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs)
1440
1441	return m
1442}
1443