1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package e2e
16
17import (
18	"fmt"
19	"os"
20	"strings"
21	"testing"
22	"time"
23
24	"go.etcd.io/etcd/pkg/fileutil"
25	"go.etcd.io/etcd/pkg/flags"
26	"go.etcd.io/etcd/pkg/testutil"
27	"go.etcd.io/etcd/version"
28)
29
30func TestCtlV3Version(t *testing.T) { testCtl(t, versionTest) }
31
32func TestClusterVersion(t *testing.T) {
33	tests := []struct {
34		name         string
35		rollingStart bool
36	}{
37		{
38			name:         "When start servers at the same time",
39			rollingStart: false,
40		},
41		{
42			name:         "When start servers one by one",
43			rollingStart: true,
44		},
45	}
46
47	for _, tt := range tests {
48		t.Run(tt.name, func(t *testing.T) {
49			binary := binDir + "/etcd"
50			if !fileutil.Exist(binary) {
51				t.Skipf("%q does not exist", binary)
52			}
53			defer testutil.AfterTest(t)
54			cfg := configNoTLS
55			cfg.execPath = binary
56			cfg.snapshotCount = 3
57			cfg.baseScheme = "unix" // to avoid port conflict
58			cfg.rollingStart = tt.rollingStart
59
60			epc, err := newEtcdProcessCluster(&cfg)
61			if err != nil {
62				t.Fatalf("could not start etcd process cluster (%v)", err)
63			}
64			defer func() {
65				if errC := epc.Close(); errC != nil {
66					t.Fatalf("error closing etcd processes (%v)", errC)
67				}
68			}()
69
70			ctx := ctlCtx{
71				t:   t,
72				cfg: cfg,
73				epc: epc,
74			}
75			cv := version.Cluster(version.Version)
76			clusterVersionTest(ctx, `"etcdcluster":"`+cv)
77		})
78	}
79}
80
81func versionTest(cx ctlCtx) {
82	if err := ctlV3Version(cx); err != nil {
83		cx.t.Fatalf("versionTest ctlV3Version error (%v)", err)
84	}
85}
86
87func clusterVersionTest(cx ctlCtx, expected string) {
88	var err error
89	for i := 0; i < 7; i++ {
90		if err = cURLGet(cx.epc, cURLReq{endpoint: "/version", expected: expected}); err != nil {
91			cx.t.Logf("#%d: v3 is not ready yet (%v)", i, err)
92			time.Sleep(time.Second)
93			continue
94		}
95		break
96	}
97	if err != nil {
98		cx.t.Fatalf("failed cluster version test expected %v got (%v)", expected, err)
99	}
100}
101
102func ctlV3Version(cx ctlCtx) error {
103	cmdArgs := append(cx.PrefixArgs(), "version")
104	return spawnWithExpect(cmdArgs, version.Version)
105}
106
107// TestCtlV3DialWithHTTPScheme ensures that client handles endpoints with HTTPS scheme.
108func TestCtlV3DialWithHTTPScheme(t *testing.T) {
109	testCtl(t, dialWithSchemeTest, withCfg(configClientTLS))
110}
111
112func dialWithSchemeTest(cx ctlCtx) {
113	cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar")
114	if err := spawnWithExpect(cmdArgs, "OK"); err != nil {
115		cx.t.Fatal(err)
116	}
117}
118
119type ctlCtx struct {
120	t                 *testing.T
121	apiPrefix         string
122	cfg               etcdProcessClusterConfig
123	quotaBackendBytes int64
124	corruptFunc       func(string) error
125	noStrictReconfig  bool
126
127	epc *etcdProcessCluster
128
129	envMap map[string]struct{}
130
131	dialTimeout time.Duration
132
133	quorum      bool // if true, set up 3-node cluster and linearizable read
134	interactive bool
135
136	user string
137	pass string
138
139	initialCorruptCheck bool
140
141	// for compaction
142	compactPhysical bool
143}
144
145type ctlOption func(*ctlCtx)
146
147func (cx *ctlCtx) applyOpts(opts []ctlOption) {
148	for _, opt := range opts {
149		opt(cx)
150	}
151	cx.initialCorruptCheck = true
152}
153
154func withCfg(cfg etcdProcessClusterConfig) ctlOption {
155	return func(cx *ctlCtx) { cx.cfg = cfg }
156}
157
158func withDialTimeout(timeout time.Duration) ctlOption {
159	return func(cx *ctlCtx) { cx.dialTimeout = timeout }
160}
161
162func withQuorum() ctlOption {
163	return func(cx *ctlCtx) { cx.quorum = true }
164}
165
166func withInteractive() ctlOption {
167	return func(cx *ctlCtx) { cx.interactive = true }
168}
169
170func withQuota(b int64) ctlOption {
171	return func(cx *ctlCtx) { cx.quotaBackendBytes = b }
172}
173
174func withCompactPhysical() ctlOption {
175	return func(cx *ctlCtx) { cx.compactPhysical = true }
176}
177
178func withInitialCorruptCheck() ctlOption {
179	return func(cx *ctlCtx) { cx.initialCorruptCheck = true }
180}
181
182func withCorruptFunc(f func(string) error) ctlOption {
183	return func(cx *ctlCtx) { cx.corruptFunc = f }
184}
185
186func withNoStrictReconfig() ctlOption {
187	return func(cx *ctlCtx) { cx.noStrictReconfig = true }
188}
189
190func withApiPrefix(p string) ctlOption {
191	return func(cx *ctlCtx) { cx.apiPrefix = p }
192}
193
194func withFlagByEnv() ctlOption {
195	return func(cx *ctlCtx) { cx.envMap = make(map[string]struct{}) }
196}
197
198func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
199	defer testutil.AfterTest(t)
200
201	ret := ctlCtx{
202		t:           t,
203		cfg:         configAutoTLS,
204		dialTimeout: 7 * time.Second,
205	}
206	ret.applyOpts(opts)
207
208	mustEtcdctl(t)
209	if !ret.quorum {
210		ret.cfg = *configStandalone(ret.cfg)
211	}
212	if ret.quotaBackendBytes > 0 {
213		ret.cfg.quotaBackendBytes = ret.quotaBackendBytes
214	}
215	ret.cfg.noStrictReconfig = ret.noStrictReconfig
216	if ret.initialCorruptCheck {
217		ret.cfg.initialCorruptCheck = ret.initialCorruptCheck
218	}
219
220	epc, err := newEtcdProcessCluster(&ret.cfg)
221	if err != nil {
222		t.Fatalf("could not start etcd process cluster (%v)", err)
223	}
224	ret.epc = epc
225
226	defer func() {
227		if ret.envMap != nil {
228			for k := range ret.envMap {
229				os.Unsetenv(k)
230			}
231		}
232		if errC := ret.epc.Close(); errC != nil {
233			t.Fatalf("error closing etcd processes (%v)", errC)
234		}
235	}()
236
237	donec := make(chan struct{})
238	go func() {
239		defer close(donec)
240		testFunc(ret)
241	}()
242
243	timeout := 2*ret.dialTimeout + time.Second
244	if ret.dialTimeout == 0 {
245		timeout = 30 * time.Second
246	}
247	select {
248	case <-time.After(timeout):
249		testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout))
250	case <-donec:
251	}
252}
253
254func (cx *ctlCtx) prefixArgs(eps []string) []string {
255	fmap := make(map[string]string)
256	fmap["endpoints"] = strings.Join(eps, ",")
257	fmap["dial-timeout"] = cx.dialTimeout.String()
258	if cx.epc.cfg.clientTLS == clientTLS {
259		if cx.epc.cfg.isClientAutoTLS {
260			fmap["insecure-transport"] = "false"
261			fmap["insecure-skip-tls-verify"] = "true"
262		} else if cx.epc.cfg.isClientCRL {
263			fmap["cacert"] = caPath
264			fmap["cert"] = revokedCertPath
265			fmap["key"] = revokedPrivateKeyPath
266		} else {
267			fmap["cacert"] = caPath
268			fmap["cert"] = certPath
269			fmap["key"] = privateKeyPath
270		}
271	}
272	if cx.user != "" {
273		fmap["user"] = cx.user + ":" + cx.pass
274	}
275
276	useEnv := cx.envMap != nil
277
278	cmdArgs := []string{ctlBinPath + "3"}
279	for k, v := range fmap {
280		if useEnv {
281			ek := flags.FlagToEnv("ETCDCTL", k)
282			os.Setenv(ek, v)
283			cx.envMap[ek] = struct{}{}
284		} else {
285			cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%s", k, v))
286		}
287	}
288	return cmdArgs
289}
290
291// PrefixArgs prefixes etcdctl command.
292// Make sure to unset environment variables after tests.
293func (cx *ctlCtx) PrefixArgs() []string {
294	return cx.prefixArgs(cx.epc.EndpointsV3())
295}
296
297func isGRPCTimedout(err error) bool {
298	return strings.Contains(err.Error(), "grpc: timed out trying to connect")
299}
300
301func (cx *ctlCtx) memberToRemove() (ep string, memberID string, clusterID string) {
302	n1 := cx.cfg.clusterSize
303	if n1 < 2 {
304		cx.t.Fatalf("%d-node is too small to test 'member remove'", n1)
305	}
306
307	resp, err := getMemberList(*cx)
308	if err != nil {
309		cx.t.Fatal(err)
310	}
311	if n1 != len(resp.Members) {
312		cx.t.Fatalf("expected %d, got %d", n1, len(resp.Members))
313	}
314
315	ep = resp.Members[0].ClientURLs[0]
316	clusterID = fmt.Sprintf("%x", resp.Header.ClusterId)
317	memberID = fmt.Sprintf("%x", resp.Members[1].ID)
318
319	return ep, memberID, clusterID
320}
321