1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"context"
19	"fmt"
20	"log"
21	"math/rand"
22	"os"
23	"strconv"
24	"strings"
25	"testing"
26	"time"
27
28	"go.etcd.io/etcd/client"
29	"go.etcd.io/etcd/etcdserver"
30	"go.etcd.io/etcd/pkg/testutil"
31)
32
33func init() {
34	// open microsecond-level time log for integration test debugging
35	log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)
36	if t := os.Getenv("ETCD_ELECTION_TIMEOUT_TICKS"); t != "" {
37		if i, err := strconv.ParseInt(t, 10, 64); err == nil {
38			electionTicks = int(i)
39		}
40	}
41}
42
43func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
44func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
45
46func testCluster(t *testing.T, size int) {
47	defer testutil.AfterTest(t)
48	c := NewCluster(t, size)
49	c.Launch(t)
50	defer c.Terminate(t)
51	clusterMustProgress(t, c.Members)
52}
53
54func TestTLSClusterOf3(t *testing.T) {
55	defer testutil.AfterTest(t)
56	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
57	c.Launch(t)
58	defer c.Terminate(t)
59	clusterMustProgress(t, c.Members)
60}
61
62func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
63func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
64
65func testClusterUsingDiscovery(t *testing.T, size int) {
66	defer testutil.AfterTest(t)
67	dc := NewCluster(t, 1)
68	dc.Launch(t)
69	defer dc.Terminate(t)
70	// init discovery token space
71	dcc := MustNewHTTPClient(t, dc.URLs(), nil)
72	dkapi := client.NewKeysAPI(dcc)
73	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
74	if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
75		t.Fatal(err)
76	}
77	cancel()
78
79	c := NewClusterByConfig(
80		t,
81		&ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
82	)
83	c.Launch(t)
84	defer c.Terminate(t)
85	clusterMustProgress(t, c.Members)
86}
87
88func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
89	defer testutil.AfterTest(t)
90	dc := NewCluster(t, 1)
91	dc.Launch(t)
92	defer dc.Terminate(t)
93	// init discovery token space
94	dcc := MustNewHTTPClient(t, dc.URLs(), nil)
95	dkapi := client.NewKeysAPI(dcc)
96	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
97	if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
98		t.Fatal(err)
99	}
100	cancel()
101
102	c := NewClusterByConfig(t,
103		&ClusterConfig{
104			Size:         3,
105			PeerTLS:      &testTLSInfo,
106			DiscoveryURL: dc.URL(0) + "/v2/keys"},
107	)
108	c.Launch(t)
109	defer c.Terminate(t)
110	clusterMustProgress(t, c.Members)
111}
112
113func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
114func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
115
116func testDoubleClusterSize(t *testing.T, size int) {
117	defer testutil.AfterTest(t)
118	c := NewCluster(t, size)
119	c.Launch(t)
120	defer c.Terminate(t)
121
122	for i := 0; i < size; i++ {
123		c.AddMember(t)
124	}
125	clusterMustProgress(t, c.Members)
126}
127
128func TestDoubleTLSClusterSizeOf3(t *testing.T) {
129	defer testutil.AfterTest(t)
130	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
131	c.Launch(t)
132	defer c.Terminate(t)
133
134	for i := 0; i < 3; i++ {
135		c.AddMember(t)
136	}
137	clusterMustProgress(t, c.Members)
138}
139
140func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
141func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
142
143func testDecreaseClusterSize(t *testing.T, size int) {
144	defer testutil.AfterTest(t)
145	c := NewCluster(t, size)
146	c.Launch(t)
147	defer c.Terminate(t)
148
149	// TODO: remove the last but one member
150	for i := 0; i < size-1; i++ {
151		id := c.Members[len(c.Members)-1].s.ID()
152		// may hit second leader election on slow machines
153		if err := c.removeMember(t, uint64(id)); err != nil {
154			if strings.Contains(err.Error(), "no leader") {
155				t.Logf("got leader error (%v)", err)
156				i--
157				continue
158			}
159			t.Fatal(err)
160		}
161		c.waitLeader(t, c.Members)
162	}
163	clusterMustProgress(t, c.Members)
164}
165
166func TestForceNewCluster(t *testing.T) {
167	c := NewCluster(t, 3)
168	c.Launch(t)
169	cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
170	kapi := client.NewKeysAPI(cc)
171	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
172	resp, err := kapi.Create(ctx, "/foo", "bar")
173	if err != nil {
174		t.Fatalf("unexpected create error: %v", err)
175	}
176	cancel()
177	// ensure create has been applied in this machine
178	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
179	if _, err = kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
180		t.Fatalf("unexpected watch error: %v", err)
181	}
182	cancel()
183
184	c.Members[0].Stop(t)
185	c.Members[1].Terminate(t)
186	c.Members[2].Terminate(t)
187	c.Members[0].ForceNewCluster = true
188	err = c.Members[0].Restart(t)
189	if err != nil {
190		t.Fatalf("unexpected ForceRestart error: %v", err)
191	}
192	defer c.Members[0].Terminate(t)
193	c.waitLeader(t, c.Members[:1])
194
195	// use new http client to init new connection
196	cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
197	kapi = client.NewKeysAPI(cc)
198	// ensure force restart keep the old data, and new cluster can make progress
199	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
200	if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
201		t.Fatalf("unexpected watch error: %v", err)
202	}
203	cancel()
204	clusterMustProgress(t, c.Members[:1])
205}
206
207func TestAddMemberAfterClusterFullRotation(t *testing.T) {
208	defer testutil.AfterTest(t)
209	c := NewCluster(t, 3)
210	c.Launch(t)
211	defer c.Terminate(t)
212
213	// remove all the previous three members and add in three new members.
214	for i := 0; i < 3; i++ {
215		c.RemoveMember(t, uint64(c.Members[0].s.ID()))
216		c.waitLeader(t, c.Members)
217
218		c.AddMember(t)
219		c.waitLeader(t, c.Members)
220	}
221
222	c.AddMember(t)
223	c.waitLeader(t, c.Members)
224
225	clusterMustProgress(t, c.Members)
226}
227
228// Ensure we can remove a member then add a new one back immediately.
229func TestIssue2681(t *testing.T) {
230	defer testutil.AfterTest(t)
231	c := NewCluster(t, 5)
232	c.Launch(t)
233	defer c.Terminate(t)
234
235	c.RemoveMember(t, uint64(c.Members[4].s.ID()))
236	c.waitLeader(t, c.Members)
237
238	c.AddMember(t)
239	c.waitLeader(t, c.Members)
240	clusterMustProgress(t, c.Members)
241}
242
243// Ensure we can remove a member after a snapshot then add a new one back.
244func TestIssue2746(t *testing.T) { testIssue2746(t, 5) }
245
246// With 3 nodes TestIssue2476 sometimes had a shutdown with an inflight snapshot.
247func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) }
248
249func testIssue2746(t *testing.T, members int) {
250	defer testutil.AfterTest(t)
251	c := NewCluster(t, members)
252
253	for _, m := range c.Members {
254		m.SnapshotCount = 10
255	}
256
257	c.Launch(t)
258	defer c.Terminate(t)
259
260	// force a snapshot
261	for i := 0; i < 20; i++ {
262		clusterMustProgress(t, c.Members)
263	}
264
265	c.RemoveMember(t, uint64(c.Members[members-1].s.ID()))
266	c.waitLeader(t, c.Members)
267
268	c.AddMember(t)
269	c.waitLeader(t, c.Members)
270	clusterMustProgress(t, c.Members)
271}
272
273// Ensure etcd will not panic when removing a just started member.
274func TestIssue2904(t *testing.T) {
275	defer testutil.AfterTest(t)
276	// start 1-member cluster to ensure member 0 is the leader of the cluster.
277	c := NewCluster(t, 1)
278	c.Launch(t)
279	defer c.Terminate(t)
280
281	c.AddMember(t)
282	c.Members[1].Stop(t)
283
284	// send remove member-1 request to the cluster.
285	cc := MustNewHTTPClient(t, c.URLs(), nil)
286	ma := client.NewMembersAPI(cc)
287	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
288	// the proposal is not committed because member 1 is stopped, but the
289	// proposal is appended to leader's raft log.
290	ma.Remove(ctx, c.Members[1].s.ID().String())
291	cancel()
292
293	// restart member, and expect it to send UpdateAttributes request.
294	// the log in the leader is like this:
295	// [..., remove 1, ..., update attr 1, ...]
296	c.Members[1].Restart(t)
297	// when the member comes back, it ack the proposal to remove itself,
298	// and apply it.
299	<-c.Members[1].s.StopNotify()
300
301	// terminate removed member
302	c.Members[1].Terminate(t)
303	c.Members = c.Members[:1]
304	// wait member to be removed.
305	c.waitMembersMatch(t, c.HTTPMembers())
306}
307
308// TestIssue3699 tests minority failure during cluster configuration; it was
309// deadlocking.
310func TestIssue3699(t *testing.T) {
311	// start a cluster of 3 nodes a, b, c
312	defer testutil.AfterTest(t)
313	c := NewCluster(t, 3)
314	c.Launch(t)
315	defer c.Terminate(t)
316
317	// make node a unavailable
318	c.Members[0].Stop(t)
319
320	// add node d
321	c.AddMember(t)
322
323	// electing node d as leader makes node a unable to participate
324	leaderID := c.waitLeader(t, c.Members)
325	for leaderID != 3 {
326		c.Members[leaderID].Stop(t)
327		<-c.Members[leaderID].s.StopNotify()
328		// do not restart the killed member immediately.
329		// the member will advance its election timeout after restart,
330		// so it will have a better chance to become the leader again.
331		time.Sleep(time.Duration(electionTicks * int(tickDuration)))
332		c.Members[leaderID].Restart(t)
333		leaderID = c.waitLeader(t, c.Members)
334	}
335
336	// bring back node a
337	// node a will remain useless as long as d is the leader.
338	if err := c.Members[0].Restart(t); err != nil {
339		t.Fatal(err)
340	}
341	select {
342	// waiting for ReadyNotify can take several seconds
343	case <-time.After(10 * time.Second):
344		t.Fatalf("waited too long for ready notification")
345	case <-c.Members[0].s.StopNotify():
346		t.Fatalf("should not be stopped")
347	case <-c.Members[0].s.ReadyNotify():
348	}
349	// must waitLeader so goroutines don't leak on terminate
350	c.waitLeader(t, c.Members)
351
352	// try to participate in cluster
353	cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
354	kapi := client.NewKeysAPI(cc)
355	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
356	if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
357		t.Fatalf("unexpected error on Set (%v)", err)
358	}
359	cancel()
360}
361
362// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
363func TestRejectUnhealthyAdd(t *testing.T) {
364	defer testutil.AfterTest(t)
365	c := NewCluster(t, 3)
366	for _, m := range c.Members {
367		m.ServerConfig.StrictReconfigCheck = true
368	}
369	c.Launch(t)
370	defer c.Terminate(t)
371
372	// make cluster unhealthy and wait for downed peer
373	c.Members[0].Stop(t)
374	c.WaitLeader(t)
375
376	// all attempts to add member should fail
377	for i := 1; i < len(c.Members); i++ {
378		err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345")
379		if err == nil {
380			t.Fatalf("should have failed adding peer")
381		}
382		// TODO: client should return descriptive error codes for internal errors
383		if !strings.Contains(err.Error(), "has no leader") {
384			t.Errorf("unexpected error (%v)", err)
385		}
386	}
387
388	// make cluster healthy
389	c.Members[0].Restart(t)
390	c.WaitLeader(t)
391	time.Sleep(2 * etcdserver.HealthInterval)
392
393	// add member should succeed now that it's healthy
394	var err error
395	for i := 1; i < len(c.Members); i++ {
396		if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil {
397			break
398		}
399	}
400	if err != nil {
401		t.Fatalf("should have added peer to healthy cluster (%v)", err)
402	}
403}
404
405// TestRejectUnhealthyRemove ensures an unhealthy cluster rejects removing members
406// if quorum will be lost.
407func TestRejectUnhealthyRemove(t *testing.T) {
408	defer testutil.AfterTest(t)
409	c := NewCluster(t, 5)
410	for _, m := range c.Members {
411		m.ServerConfig.StrictReconfigCheck = true
412	}
413	c.Launch(t)
414	defer c.Terminate(t)
415
416	// make cluster unhealthy and wait for downed peer; (3 up, 2 down)
417	c.Members[0].Stop(t)
418	c.Members[1].Stop(t)
419	c.WaitLeader(t)
420
421	// reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum
422	err := c.removeMember(t, uint64(c.Members[2].s.ID()))
423	if err == nil {
424		t.Fatalf("should reject quorum breaking remove")
425	}
426	// TODO: client should return more descriptive error codes for internal errors
427	if !strings.Contains(err.Error(), "has no leader") {
428		t.Errorf("unexpected error (%v)", err)
429	}
430
431	// member stopped after launch; wait for missing heartbeats
432	time.Sleep(time.Duration(electionTicks * int(tickDuration)))
433
434	// permit remove dead member since (3,2) - (0,1) => (3,1) has quorum
435	if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
436		t.Fatalf("should accept removing down member")
437	}
438
439	// bring cluster to (4,1)
440	c.Members[0].Restart(t)
441
442	// restarted member must be connected for a HealthInterval before remove is accepted
443	time.Sleep((3 * etcdserver.HealthInterval) / 2)
444
445	// accept remove member since (4,1)-(1,0) => (3,1) has quorum
446	if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
447		t.Fatalf("expected to remove member, got error %v", err)
448	}
449}
450
451// TestRestartRemoved ensures that restarting removed member must exit
452// if 'initial-cluster-state' is set 'new' and old data directory still exists
453// (see https://github.com/etcd-io/etcd/issues/7512 for more).
454func TestRestartRemoved(t *testing.T) {
455	defer testutil.AfterTest(t)
456
457	// 1. start single-member cluster
458	c := NewCluster(t, 1)
459	for _, m := range c.Members {
460		m.ServerConfig.StrictReconfigCheck = true
461	}
462	c.Launch(t)
463	defer c.Terminate(t)
464
465	// 2. add a new member
466	c.AddMember(t)
467	c.WaitLeader(t)
468
469	oldm := c.Members[0]
470	oldm.keepDataDirTerminate = true
471
472	// 3. remove first member, shut down without deleting data
473	if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
474		t.Fatalf("expected to remove member, got error %v", err)
475	}
476	c.WaitLeader(t)
477
478	// 4. restart first member with 'initial-cluster-state=new'
479	// wrong config, expects exit within ReqTimeout
480	oldm.ServerConfig.NewCluster = false
481	if err := oldm.Restart(t); err != nil {
482		t.Fatalf("unexpected ForceRestart error: %v", err)
483	}
484	defer func() {
485		oldm.Close()
486		os.RemoveAll(oldm.ServerConfig.DataDir)
487	}()
488	select {
489	case <-oldm.s.StopNotify():
490	case <-time.After(time.Minute):
491		t.Fatalf("removed member didn't exit within %v", time.Minute)
492	}
493}
494
495// clusterMustProgress ensures that cluster can make progress. It creates
496// a random key first, and check the new key could be got from all client urls
497// of the cluster.
498func clusterMustProgress(t *testing.T, membs []*member) {
499	cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
500	kapi := client.NewKeysAPI(cc)
501	key := fmt.Sprintf("foo%d", rand.Int())
502	var (
503		err  error
504		resp *client.Response
505	)
506	// retry in case of leader loss induced by slow CI
507	for i := 0; i < 3; i++ {
508		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
509		resp, err = kapi.Create(ctx, "/"+key, "bar")
510		cancel()
511		if err == nil {
512			break
513		}
514		t.Logf("failed to create key on %q (%v)", membs[0].URL(), err)
515	}
516	if err != nil {
517		t.Fatalf("create on %s error: %v", membs[0].URL(), err)
518	}
519
520	for i, m := range membs {
521		u := m.URL()
522		mcc := MustNewHTTPClient(t, []string{u}, nil)
523		mkapi := client.NewKeysAPI(mcc)
524		mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
525		if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
526			t.Fatalf("#%d: watch on %s error: %v", i, u, err)
527		}
528		mcancel()
529	}
530}
531
532func TestSpeedyTerminate(t *testing.T) {
533	defer testutil.AfterTest(t)
534	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
535	// Stop/Restart so requests will time out on lost leaders
536	for i := 0; i < 3; i++ {
537		clus.Members[i].Stop(t)
538		clus.Members[i].Restart(t)
539	}
540	donec := make(chan struct{})
541	go func() {
542		defer close(donec)
543		clus.Terminate(t)
544	}()
545	select {
546	case <-time.After(10 * time.Second):
547		t.Fatalf("cluster took too long to terminate")
548	case <-donec:
549	}
550}
551