1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package e2e
16
17import (
18	"fmt"
19	"io/ioutil"
20	"net/url"
21	"os"
22	"strings"
23	"time"
24
25	"go.etcd.io/etcd/etcdserver"
26)
27
28const etcdProcessBasePort = 20000
29
30type clientConnType int
31
32const (
33	clientNonTLS clientConnType = iota
34	clientTLS
35	clientTLSAndNonTLS
36)
37
38var (
39	configNoTLS = etcdProcessClusterConfig{
40		clusterSize:  3,
41		initialToken: "new",
42	}
43	configAutoTLS = etcdProcessClusterConfig{
44		clusterSize:   3,
45		isPeerTLS:     true,
46		isPeerAutoTLS: true,
47		initialToken:  "new",
48	}
49	configTLS = etcdProcessClusterConfig{
50		clusterSize:  3,
51		clientTLS:    clientTLS,
52		isPeerTLS:    true,
53		initialToken: "new",
54	}
55	configClientTLS = etcdProcessClusterConfig{
56		clusterSize:  3,
57		clientTLS:    clientTLS,
58		initialToken: "new",
59	}
60	configClientBoth = etcdProcessClusterConfig{
61		clusterSize:  1,
62		clientTLS:    clientTLSAndNonTLS,
63		initialToken: "new",
64	}
65	configClientAutoTLS = etcdProcessClusterConfig{
66		clusterSize:     1,
67		isClientAutoTLS: true,
68		clientTLS:       clientTLS,
69		initialToken:    "new",
70	}
71	configPeerTLS = etcdProcessClusterConfig{
72		clusterSize:  3,
73		isPeerTLS:    true,
74		initialToken: "new",
75	}
76	configClientTLSCertAuth = etcdProcessClusterConfig{
77		clusterSize:           1,
78		clientTLS:             clientTLS,
79		initialToken:          "new",
80		clientCertAuthEnabled: true,
81	}
82	configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{
83		clusterSize:           1,
84		clientTLS:             clientTLS,
85		initialToken:          "new",
86		clientCertAuthEnabled: true,
87		noCN:                  true,
88	}
89	configJWT = etcdProcessClusterConfig{
90		clusterSize:   1,
91		initialToken:  "new",
92		authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
93	}
94)
95
96func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
97	ret := cfg
98	ret.clusterSize = 1
99	return &ret
100}
101
102type etcdProcessCluster struct {
103	cfg   *etcdProcessClusterConfig
104	procs []etcdProcess
105}
106
107type etcdProcessClusterConfig struct {
108	execPath    string
109	dataDirPath string
110	keepDataDir bool
111
112	clusterSize int
113
114	baseScheme string
115	basePort   int
116
117	metricsURLScheme string
118
119	snapshotCount int // default is 10000
120
121	clientTLS             clientConnType
122	clientCertAuthEnabled bool
123	isPeerTLS             bool
124	isPeerAutoTLS         bool
125	isClientAutoTLS       bool
126	isClientCRL           bool
127	noCN                  bool
128
129	cipherSuites []string
130
131	forceNewCluster     bool
132	initialToken        string
133	quotaBackendBytes   int64
134	noStrictReconfig    bool
135	enableV2            bool
136	initialCorruptCheck bool
137	authTokenOpts       string
138
139	rollingStart bool
140}
141
142// newEtcdProcessCluster launches a new cluster from etcd processes, returning
143// a new etcdProcessCluster once all nodes are ready to accept client requests.
144func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
145	etcdCfgs := cfg.etcdServerProcessConfigs()
146	epc := &etcdProcessCluster{
147		cfg:   cfg,
148		procs: make([]etcdProcess, cfg.clusterSize),
149	}
150
151	// launch etcd processes
152	for i := range etcdCfgs {
153		proc, err := newEtcdProcess(etcdCfgs[i])
154		if err != nil {
155			epc.Close()
156			return nil, err
157		}
158		epc.procs[i] = proc
159	}
160
161	if cfg.rollingStart {
162		if err := epc.RollingStart(); err != nil {
163			return nil, err
164		}
165	} else {
166		if err := epc.Start(); err != nil {
167			return nil, err
168		}
169	}
170	return epc, nil
171}
172
173func (cfg *etcdProcessClusterConfig) clientScheme() string {
174	if cfg.clientTLS == clientTLS {
175		return "https"
176	}
177	return "http"
178}
179
180func (cfg *etcdProcessClusterConfig) peerScheme() string {
181	peerScheme := cfg.baseScheme
182	if peerScheme == "" {
183		peerScheme = "http"
184	}
185	if cfg.isPeerTLS {
186		peerScheme += "s"
187	}
188	return peerScheme
189}
190
191func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
192	if cfg.basePort == 0 {
193		cfg.basePort = etcdProcessBasePort
194	}
195	if cfg.execPath == "" {
196		cfg.execPath = binPath
197	}
198	if cfg.snapshotCount == 0 {
199		cfg.snapshotCount = etcdserver.DefaultSnapshotCount
200	}
201
202	etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
203	initialCluster := make([]string, cfg.clusterSize)
204	for i := 0; i < cfg.clusterSize; i++ {
205		var curls []string
206		var curl, curltls string
207		port := cfg.basePort + 5*i
208		curlHost := fmt.Sprintf("localhost:%d", port)
209
210		switch cfg.clientTLS {
211		case clientNonTLS, clientTLS:
212			curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
213			curls = []string{curl}
214		case clientTLSAndNonTLS:
215			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
216			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
217			curls = []string{curl, curltls}
218		}
219
220		purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
221		name := fmt.Sprintf("testname%d", i)
222		dataDirPath := cfg.dataDirPath
223		if cfg.dataDirPath == "" {
224			var derr error
225			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
226			if derr != nil {
227				panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
228			}
229		}
230		initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
231
232		args := []string{
233			"--name", name,
234			"--listen-client-urls", strings.Join(curls, ","),
235			"--advertise-client-urls", strings.Join(curls, ","),
236			"--listen-peer-urls", purl.String(),
237			"--initial-advertise-peer-urls", purl.String(),
238			"--initial-cluster-token", cfg.initialToken,
239			"--data-dir", dataDirPath,
240			"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
241		}
242		args = addV2Args(args)
243		if cfg.forceNewCluster {
244			args = append(args, "--force-new-cluster")
245		}
246		if cfg.quotaBackendBytes > 0 {
247			args = append(args,
248				"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
249			)
250		}
251		if cfg.noStrictReconfig {
252			args = append(args, "--strict-reconfig-check=false")
253		}
254		if cfg.enableV2 {
255			args = append(args, "--enable-v2")
256		}
257		if cfg.initialCorruptCheck {
258			args = append(args, "--experimental-initial-corrupt-check")
259		}
260		var murl string
261		if cfg.metricsURLScheme != "" {
262			murl = (&url.URL{
263				Scheme: cfg.metricsURLScheme,
264				Host:   fmt.Sprintf("localhost:%d", port+2),
265			}).String()
266			args = append(args, "--listen-metrics-urls", murl)
267		}
268
269		args = append(args, cfg.tlsArgs()...)
270
271		if cfg.authTokenOpts != "" {
272			args = append(args, "--auth-token", cfg.authTokenOpts)
273		}
274
275		etcdCfgs[i] = &etcdServerProcessConfig{
276			execPath:     cfg.execPath,
277			args:         args,
278			tlsArgs:      cfg.tlsArgs(),
279			dataDirPath:  dataDirPath,
280			keepDataDir:  cfg.keepDataDir,
281			name:         name,
282			purl:         purl,
283			acurl:        curl,
284			murl:         murl,
285			initialToken: cfg.initialToken,
286		}
287	}
288
289	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
290	for i := range etcdCfgs {
291		etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
292		etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
293	}
294
295	return etcdCfgs
296}
297
298func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
299	if cfg.clientTLS != clientNonTLS {
300		if cfg.isClientAutoTLS {
301			args = append(args, "--auto-tls")
302		} else {
303			tlsClientArgs := []string{
304				"--cert-file", certPath,
305				"--key-file", privateKeyPath,
306				"--trusted-ca-file", caPath,
307			}
308			args = append(args, tlsClientArgs...)
309
310			if cfg.clientCertAuthEnabled {
311				args = append(args, "--client-cert-auth")
312			}
313		}
314	}
315
316	if cfg.isPeerTLS {
317		if cfg.isPeerAutoTLS {
318			args = append(args, "--peer-auto-tls")
319		} else {
320			tlsPeerArgs := []string{
321				"--peer-cert-file", certPath,
322				"--peer-key-file", privateKeyPath,
323				"--peer-trusted-ca-file", caPath,
324			}
325			args = append(args, tlsPeerArgs...)
326		}
327	}
328
329	if cfg.isClientCRL {
330		args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
331	}
332
333	if len(cfg.cipherSuites) > 0 {
334		args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
335	}
336
337	return args
338}
339
340func (epc *etcdProcessCluster) EndpointsV2() []string {
341	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
342}
343
344func (epc *etcdProcessCluster) EndpointsV3() []string {
345	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
346}
347
348func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
349	for _, p := range epc.procs {
350		ret = append(ret, f(p)...)
351	}
352	return ret
353}
354
355func (epc *etcdProcessCluster) Start() error {
356	return epc.start(func(ep etcdProcess) error { return ep.Start() })
357}
358
359func (epc *etcdProcessCluster) RollingStart() error {
360	return epc.rollingStart(func(ep etcdProcess) error { return ep.Start() })
361}
362
363func (epc *etcdProcessCluster) Restart() error {
364	return epc.start(func(ep etcdProcess) error { return ep.Restart() })
365}
366
367func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
368	readyC := make(chan error, len(epc.procs))
369	for i := range epc.procs {
370		go func(n int) { readyC <- f(epc.procs[n]) }(i)
371	}
372	for range epc.procs {
373		if err := <-readyC; err != nil {
374			epc.Close()
375			return err
376		}
377	}
378	return nil
379}
380
381func (epc *etcdProcessCluster) rollingStart(f func(ep etcdProcess) error) error {
382	readyC := make(chan error, len(epc.procs))
383	for i := range epc.procs {
384		go func(n int) { readyC <- f(epc.procs[n]) }(i)
385		// make sure the servers do not start at the same time
386		time.Sleep(time.Second)
387	}
388	for range epc.procs {
389		if err := <-readyC; err != nil {
390			epc.Close()
391			return err
392		}
393	}
394	return nil
395}
396
397func (epc *etcdProcessCluster) Stop() (err error) {
398	for _, p := range epc.procs {
399		if p == nil {
400			continue
401		}
402		if curErr := p.Stop(); curErr != nil {
403			if err != nil {
404				err = fmt.Errorf("%v; %v", err, curErr)
405			} else {
406				err = curErr
407			}
408		}
409	}
410	return err
411}
412
413func (epc *etcdProcessCluster) Close() error {
414	err := epc.Stop()
415	for _, p := range epc.procs {
416		// p is nil when newEtcdProcess fails in the middle
417		// Close still gets called to clean up test data
418		if p == nil {
419			continue
420		}
421		if cerr := p.Close(); cerr != nil {
422			err = cerr
423		}
424	}
425	return err
426}
427
428func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
429	for _, p := range epc.procs {
430		ret = p.WithStopSignal(sig)
431	}
432	return ret
433}
434