1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package integration
16
17import (
18	"context"
19	"fmt"
20	"log"
21	"math/rand"
22	"os"
23	"strconv"
24	"strings"
25	"testing"
26	"time"
27
28	"github.com/coreos/etcd/client"
29	"github.com/coreos/etcd/etcdserver"
30	"github.com/coreos/etcd/pkg/testutil"
31
32	"github.com/coreos/pkg/capnslog"
33)
34
35func init() {
36	// open microsecond-level time log for integration test debugging
37	log.SetFlags(log.Ltime | log.Lmicroseconds | log.Lshortfile)
38	if t := os.Getenv("ETCD_ELECTION_TIMEOUT_TICKS"); t != "" {
39		if i, err := strconv.ParseInt(t, 10, 64); err == nil {
40			electionTicks = int(i)
41		}
42	}
43}
44
45func TestClusterOf1(t *testing.T) { testCluster(t, 1) }
46func TestClusterOf3(t *testing.T) { testCluster(t, 3) }
47
48func testCluster(t *testing.T, size int) {
49	defer testutil.AfterTest(t)
50	c := NewCluster(t, size)
51	c.Launch(t)
52	defer c.Terminate(t)
53	clusterMustProgress(t, c.Members)
54}
55
56func TestTLSClusterOf3(t *testing.T) {
57	defer testutil.AfterTest(t)
58	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
59	c.Launch(t)
60	defer c.Terminate(t)
61	clusterMustProgress(t, c.Members)
62}
63
64func TestClusterOf1UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 1) }
65func TestClusterOf3UsingDiscovery(t *testing.T) { testClusterUsingDiscovery(t, 3) }
66
67func testClusterUsingDiscovery(t *testing.T, size int) {
68	defer testutil.AfterTest(t)
69	dc := NewCluster(t, 1)
70	dc.Launch(t)
71	defer dc.Terminate(t)
72	// init discovery token space
73	dcc := MustNewHTTPClient(t, dc.URLs(), nil)
74	dkapi := client.NewKeysAPI(dcc)
75	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
76	if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
77		t.Fatal(err)
78	}
79	cancel()
80
81	c := NewClusterByConfig(
82		t,
83		&ClusterConfig{Size: size, DiscoveryURL: dc.URL(0) + "/v2/keys"},
84	)
85	c.Launch(t)
86	defer c.Terminate(t)
87	clusterMustProgress(t, c.Members)
88}
89
90func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
91	defer testutil.AfterTest(t)
92	dc := NewCluster(t, 1)
93	dc.Launch(t)
94	defer dc.Terminate(t)
95	// init discovery token space
96	dcc := MustNewHTTPClient(t, dc.URLs(), nil)
97	dkapi := client.NewKeysAPI(dcc)
98	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
99	if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
100		t.Fatal(err)
101	}
102	cancel()
103
104	c := NewClusterByConfig(t,
105		&ClusterConfig{
106			Size:         3,
107			PeerTLS:      &testTLSInfo,
108			DiscoveryURL: dc.URL(0) + "/v2/keys"},
109	)
110	c.Launch(t)
111	defer c.Terminate(t)
112	clusterMustProgress(t, c.Members)
113}
114
115func TestDoubleClusterSizeOf1(t *testing.T) { testDoubleClusterSize(t, 1) }
116func TestDoubleClusterSizeOf3(t *testing.T) { testDoubleClusterSize(t, 3) }
117
118func testDoubleClusterSize(t *testing.T, size int) {
119	defer testutil.AfterTest(t)
120	c := NewCluster(t, size)
121	c.Launch(t)
122	defer c.Terminate(t)
123
124	for i := 0; i < size; i++ {
125		c.AddMember(t)
126	}
127	clusterMustProgress(t, c.Members)
128}
129
130func TestDoubleTLSClusterSizeOf3(t *testing.T) {
131	defer testutil.AfterTest(t)
132	c := NewClusterByConfig(t, &ClusterConfig{Size: 3, PeerTLS: &testTLSInfo})
133	c.Launch(t)
134	defer c.Terminate(t)
135
136	for i := 0; i < 3; i++ {
137		c.AddMember(t)
138	}
139	clusterMustProgress(t, c.Members)
140}
141
142func TestDecreaseClusterSizeOf3(t *testing.T) { testDecreaseClusterSize(t, 3) }
143func TestDecreaseClusterSizeOf5(t *testing.T) { testDecreaseClusterSize(t, 5) }
144
145func testDecreaseClusterSize(t *testing.T, size int) {
146	defer testutil.AfterTest(t)
147	c := NewCluster(t, size)
148	c.Launch(t)
149	defer c.Terminate(t)
150
151	// TODO: remove the last but one member
152	for i := 0; i < size-1; i++ {
153		id := c.Members[len(c.Members)-1].s.ID()
154		// may hit second leader election on slow machines
155		if err := c.removeMember(t, uint64(id)); err != nil {
156			if strings.Contains(err.Error(), "no leader") {
157				t.Logf("got leader error (%v)", err)
158				i--
159				continue
160			}
161			t.Fatal(err)
162		}
163		c.waitLeader(t, c.Members)
164	}
165	clusterMustProgress(t, c.Members)
166}
167
168func TestForceNewCluster(t *testing.T) {
169	c := NewCluster(t, 3)
170	c.Launch(t)
171	cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
172	kapi := client.NewKeysAPI(cc)
173	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
174	resp, err := kapi.Create(ctx, "/foo", "bar")
175	if err != nil {
176		t.Fatalf("unexpected create error: %v", err)
177	}
178	cancel()
179	// ensure create has been applied in this machine
180	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
181	if _, err = kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
182		t.Fatalf("unexpected watch error: %v", err)
183	}
184	cancel()
185
186	c.Members[0].Stop(t)
187	c.Members[1].Terminate(t)
188	c.Members[2].Terminate(t)
189	c.Members[0].ForceNewCluster = true
190	err = c.Members[0].Restart(t)
191	if err != nil {
192		t.Fatalf("unexpected ForceRestart error: %v", err)
193	}
194	defer c.Members[0].Terminate(t)
195	c.waitLeader(t, c.Members[:1])
196
197	// use new http client to init new connection
198	cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
199	kapi = client.NewKeysAPI(cc)
200	// ensure force restart keep the old data, and new cluster can make progress
201	ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
202	if _, err := kapi.Watcher("/foo", &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(ctx); err != nil {
203		t.Fatalf("unexpected watch error: %v", err)
204	}
205	cancel()
206	clusterMustProgress(t, c.Members[:1])
207}
208
209func TestAddMemberAfterClusterFullRotation(t *testing.T) {
210	defer testutil.AfterTest(t)
211	c := NewCluster(t, 3)
212	c.Launch(t)
213	defer c.Terminate(t)
214
215	// remove all the previous three members and add in three new members.
216	for i := 0; i < 3; i++ {
217		c.RemoveMember(t, uint64(c.Members[0].s.ID()))
218		c.waitLeader(t, c.Members)
219
220		c.AddMember(t)
221		c.waitLeader(t, c.Members)
222	}
223
224	c.AddMember(t)
225	c.waitLeader(t, c.Members)
226
227	clusterMustProgress(t, c.Members)
228}
229
230// Ensure we can remove a member then add a new one back immediately.
231func TestIssue2681(t *testing.T) {
232	defer testutil.AfterTest(t)
233	c := NewCluster(t, 5)
234	c.Launch(t)
235	defer c.Terminate(t)
236
237	c.RemoveMember(t, uint64(c.Members[4].s.ID()))
238	c.waitLeader(t, c.Members)
239
240	c.AddMember(t)
241	c.waitLeader(t, c.Members)
242	clusterMustProgress(t, c.Members)
243}
244
245// Ensure we can remove a member after a snapshot then add a new one back.
246func TestIssue2746(t *testing.T) { testIssue2746(t, 5) }
247
248// With 3 nodes TestIssue2476 sometimes had a shutdown with an inflight snapshot.
249func TestIssue2746WithThree(t *testing.T) { testIssue2746(t, 3) }
250
251func testIssue2746(t *testing.T, members int) {
252	defer testutil.AfterTest(t)
253	c := NewCluster(t, members)
254
255	for _, m := range c.Members {
256		m.SnapCount = 10
257	}
258
259	c.Launch(t)
260	defer c.Terminate(t)
261
262	// force a snapshot
263	for i := 0; i < 20; i++ {
264		clusterMustProgress(t, c.Members)
265	}
266
267	c.RemoveMember(t, uint64(c.Members[members-1].s.ID()))
268	c.waitLeader(t, c.Members)
269
270	c.AddMember(t)
271	c.waitLeader(t, c.Members)
272	clusterMustProgress(t, c.Members)
273}
274
275// Ensure etcd will not panic when removing a just started member.
276func TestIssue2904(t *testing.T) {
277	defer testutil.AfterTest(t)
278	// start 1-member cluster to ensure member 0 is the leader of the cluster.
279	c := NewCluster(t, 1)
280	c.Launch(t)
281	defer c.Terminate(t)
282
283	c.AddMember(t)
284	c.Members[1].Stop(t)
285
286	// send remove member-1 request to the cluster.
287	cc := MustNewHTTPClient(t, c.URLs(), nil)
288	ma := client.NewMembersAPI(cc)
289	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
290	// the proposal is not committed because member 1 is stopped, but the
291	// proposal is appended to leader's raft log.
292	ma.Remove(ctx, c.Members[1].s.ID().String())
293	cancel()
294
295	// restart member, and expect it to send UpdateAttributes request.
296	// the log in the leader is like this:
297	// [..., remove 1, ..., update attr 1, ...]
298	c.Members[1].Restart(t)
299	// when the member comes back, it ack the proposal to remove itself,
300	// and apply it.
301	<-c.Members[1].s.StopNotify()
302
303	// terminate removed member
304	c.Members[1].Terminate(t)
305	c.Members = c.Members[:1]
306	// wait member to be removed.
307	c.waitMembersMatch(t, c.HTTPMembers())
308}
309
310// TestIssue3699 tests minority failure during cluster configuration; it was
311// deadlocking.
312func TestIssue3699(t *testing.T) {
313	// start a cluster of 3 nodes a, b, c
314	defer testutil.AfterTest(t)
315	c := NewCluster(t, 3)
316	c.Launch(t)
317	defer c.Terminate(t)
318
319	// make node a unavailable
320	c.Members[0].Stop(t)
321
322	// add node d
323	c.AddMember(t)
324
325	// electing node d as leader makes node a unable to participate
326	leaderID := c.waitLeader(t, c.Members)
327	for leaderID != 3 {
328		c.Members[leaderID].Stop(t)
329		<-c.Members[leaderID].s.StopNotify()
330		// do not restart the killed member immediately.
331		// the member will advance its election timeout after restart,
332		// so it will have a better chance to become the leader again.
333		time.Sleep(time.Duration(electionTicks * int(tickDuration)))
334		c.Members[leaderID].Restart(t)
335		leaderID = c.waitLeader(t, c.Members)
336	}
337
338	// bring back node a
339	// node a will remain useless as long as d is the leader.
340	if err := c.Members[0].Restart(t); err != nil {
341		t.Fatal(err)
342	}
343	select {
344	// waiting for ReadyNotify can take several seconds
345	case <-time.After(10 * time.Second):
346		t.Fatalf("waited too long for ready notification")
347	case <-c.Members[0].s.StopNotify():
348		t.Fatalf("should not be stopped")
349	case <-c.Members[0].s.ReadyNotify():
350	}
351	// must waitLeader so goroutines don't leak on terminate
352	c.waitLeader(t, c.Members)
353
354	// try to participate in cluster
355	cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
356	kapi := client.NewKeysAPI(cc)
357	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
358	if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
359		t.Fatalf("unexpected error on Set (%v)", err)
360	}
361	cancel()
362}
363
364// TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members.
365func TestRejectUnhealthyAdd(t *testing.T) {
366	defer testutil.AfterTest(t)
367	c := NewCluster(t, 3)
368	for _, m := range c.Members {
369		m.ServerConfig.StrictReconfigCheck = true
370	}
371	c.Launch(t)
372	defer c.Terminate(t)
373
374	// make cluster unhealthy and wait for downed peer
375	c.Members[0].Stop(t)
376	c.WaitLeader(t)
377
378	// all attempts to add member should fail
379	for i := 1; i < len(c.Members); i++ {
380		err := c.addMemberByURL(t, c.URL(i), "unix://foo:12345")
381		if err == nil {
382			t.Fatalf("should have failed adding peer")
383		}
384		// TODO: client should return descriptive error codes for internal errors
385		if !strings.Contains(err.Error(), "has no leader") {
386			t.Errorf("unexpected error (%v)", err)
387		}
388	}
389
390	// make cluster healthy
391	c.Members[0].Restart(t)
392	c.WaitLeader(t)
393	time.Sleep(2 * etcdserver.HealthInterval)
394
395	// add member should succeed now that it's healthy
396	var err error
397	for i := 1; i < len(c.Members); i++ {
398		if err = c.addMemberByURL(t, c.URL(i), "unix://foo:12345"); err == nil {
399			break
400		}
401	}
402	if err != nil {
403		t.Fatalf("should have added peer to healthy cluster (%v)", err)
404	}
405}
406
407// TestRejectUnhealthyRemove ensures an unhealthy cluster rejects removing members
408// if quorum will be lost.
409func TestRejectUnhealthyRemove(t *testing.T) {
410	defer testutil.AfterTest(t)
411	c := NewCluster(t, 5)
412	for _, m := range c.Members {
413		m.ServerConfig.StrictReconfigCheck = true
414	}
415	c.Launch(t)
416	defer c.Terminate(t)
417
418	// make cluster unhealthy and wait for downed peer; (3 up, 2 down)
419	c.Members[0].Stop(t)
420	c.Members[1].Stop(t)
421	c.WaitLeader(t)
422
423	// reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum
424	err := c.removeMember(t, uint64(c.Members[2].s.ID()))
425	if err == nil {
426		t.Fatalf("should reject quorum breaking remove")
427	}
428	// TODO: client should return more descriptive error codes for internal errors
429	if !strings.Contains(err.Error(), "has no leader") {
430		t.Errorf("unexpected error (%v)", err)
431	}
432
433	// member stopped after launch; wait for missing heartbeats
434	time.Sleep(time.Duration(electionTicks * int(tickDuration)))
435
436	// permit remove dead member since (3,2) - (0,1) => (3,1) has quorum
437	if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
438		t.Fatalf("should accept removing down member")
439	}
440
441	// bring cluster to (4,1)
442	c.Members[0].Restart(t)
443
444	// restarted member must be connected for a HealthInterval before remove is accepted
445	time.Sleep((3 * etcdserver.HealthInterval) / 2)
446
447	// accept remove member since (4,1)-(1,0) => (3,1) has quorum
448	if err = c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
449		t.Fatalf("expected to remove member, got error %v", err)
450	}
451}
452
453// TestRestartRemoved ensures that restarting removed member must exit
454// if 'initial-cluster-state' is set 'new' and old data directory still exists
455// (see https://github.com/coreos/etcd/issues/7512 for more).
456func TestRestartRemoved(t *testing.T) {
457	defer testutil.AfterTest(t)
458
459	capnslog.SetGlobalLogLevel(capnslog.INFO)
460	defer capnslog.SetGlobalLogLevel(defaultLogLevel)
461
462	// 1. start single-member cluster
463	c := NewCluster(t, 1)
464	for _, m := range c.Members {
465		m.ServerConfig.StrictReconfigCheck = true
466	}
467	c.Launch(t)
468	defer c.Terminate(t)
469
470	// 2. add a new member
471	c.AddMember(t)
472	c.WaitLeader(t)
473
474	oldm := c.Members[0]
475	oldm.keepDataDirTerminate = true
476
477	// 3. remove first member, shut down without deleting data
478	if err := c.removeMember(t, uint64(c.Members[0].s.ID())); err != nil {
479		t.Fatalf("expected to remove member, got error %v", err)
480	}
481	c.WaitLeader(t)
482
483	// 4. restart first member with 'initial-cluster-state=new'
484	// wrong config, expects exit within ReqTimeout
485	oldm.ServerConfig.NewCluster = false
486	if err := oldm.Restart(t); err != nil {
487		t.Fatalf("unexpected ForceRestart error: %v", err)
488	}
489	defer func() {
490		oldm.Close()
491		os.RemoveAll(oldm.ServerConfig.DataDir)
492	}()
493	select {
494	case <-oldm.s.StopNotify():
495	case <-time.After(time.Minute):
496		t.Fatalf("removed member didn't exit within %v", time.Minute)
497	}
498}
499
500// clusterMustProgress ensures that cluster can make progress. It creates
501// a random key first, and check the new key could be got from all client urls
502// of the cluster.
503func clusterMustProgress(t *testing.T, membs []*member) {
504	cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
505	kapi := client.NewKeysAPI(cc)
506	key := fmt.Sprintf("foo%d", rand.Int())
507	var (
508		err  error
509		resp *client.Response
510	)
511	// retry in case of leader loss induced by slow CI
512	for i := 0; i < 3; i++ {
513		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
514		resp, err = kapi.Create(ctx, "/"+key, "bar")
515		cancel()
516		if err == nil {
517			break
518		}
519		t.Logf("failed to create key on %q (%v)", membs[0].URL(), err)
520	}
521	if err != nil {
522		t.Fatalf("create on %s error: %v", membs[0].URL(), err)
523	}
524
525	for i, m := range membs {
526		u := m.URL()
527		mcc := MustNewHTTPClient(t, []string{u}, nil)
528		mkapi := client.NewKeysAPI(mcc)
529		mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
530		if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
531			t.Fatalf("#%d: watch on %s error: %v", i, u, err)
532		}
533		mcancel()
534	}
535}
536
537func TestSpeedyTerminate(t *testing.T) {
538	defer testutil.AfterTest(t)
539	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
540	// Stop/Restart so requests will time out on lost leaders
541	for i := 0; i < 3; i++ {
542		clus.Members[i].Stop(t)
543		clus.Members[i].Restart(t)
544	}
545	donec := make(chan struct{})
546	go func() {
547		defer close(donec)
548		clus.Terminate(t)
549	}()
550	select {
551	case <-time.After(10 * time.Second):
552		t.Fatalf("cluster took too long to terminate")
553	case <-donec:
554	}
555}
556