1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package e2e
16
17import (
18	"fmt"
19	"io/ioutil"
20	"net/url"
21	"os"
22	"strings"
23
24	"go.etcd.io/etcd/etcdserver"
25)
26
27const etcdProcessBasePort = 20000
28
29type clientConnType int
30
31const (
32	clientNonTLS clientConnType = iota
33	clientTLS
34	clientTLSAndNonTLS
35)
36
37var (
38	configNoTLS = etcdProcessClusterConfig{
39		clusterSize:  3,
40		initialToken: "new",
41	}
42	configAutoTLS = etcdProcessClusterConfig{
43		clusterSize:   3,
44		isPeerTLS:     true,
45		isPeerAutoTLS: true,
46		initialToken:  "new",
47	}
48	configTLS = etcdProcessClusterConfig{
49		clusterSize:  3,
50		clientTLS:    clientTLS,
51		isPeerTLS:    true,
52		initialToken: "new",
53	}
54	configClientTLS = etcdProcessClusterConfig{
55		clusterSize:  3,
56		clientTLS:    clientTLS,
57		initialToken: "new",
58	}
59	configClientBoth = etcdProcessClusterConfig{
60		clusterSize:  1,
61		clientTLS:    clientTLSAndNonTLS,
62		initialToken: "new",
63	}
64	configClientAutoTLS = etcdProcessClusterConfig{
65		clusterSize:     1,
66		isClientAutoTLS: true,
67		clientTLS:       clientTLS,
68		initialToken:    "new",
69	}
70	configPeerTLS = etcdProcessClusterConfig{
71		clusterSize:  3,
72		isPeerTLS:    true,
73		initialToken: "new",
74	}
75	configClientTLSCertAuth = etcdProcessClusterConfig{
76		clusterSize:           1,
77		clientTLS:             clientTLS,
78		initialToken:          "new",
79		clientCertAuthEnabled: true,
80	}
81	configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{
82		clusterSize:           1,
83		clientTLS:             clientTLS,
84		initialToken:          "new",
85		clientCertAuthEnabled: true,
86		noCN:                  true,
87	}
88	configJWT = etcdProcessClusterConfig{
89		clusterSize:   1,
90		initialToken:  "new",
91		authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s",
92	}
93)
94
95func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
96	ret := cfg
97	ret.clusterSize = 1
98	return &ret
99}
100
101type etcdProcessCluster struct {
102	cfg   *etcdProcessClusterConfig
103	procs []etcdProcess
104}
105
106type etcdProcessClusterConfig struct {
107	execPath    string
108	dataDirPath string
109	keepDataDir bool
110
111	clusterSize int
112
113	baseScheme string
114	basePort   int
115
116	metricsURLScheme string
117
118	snapshotCount int // default is 10000
119
120	clientTLS             clientConnType
121	clientCertAuthEnabled bool
122	isPeerTLS             bool
123	isPeerAutoTLS         bool
124	isClientAutoTLS       bool
125	isClientCRL           bool
126	noCN                  bool
127
128	cipherSuites []string
129
130	forceNewCluster     bool
131	initialToken        string
132	quotaBackendBytes   int64
133	noStrictReconfig    bool
134	enableV2            bool
135	initialCorruptCheck bool
136	authTokenOpts       string
137}
138
139// newEtcdProcessCluster launches a new cluster from etcd processes, returning
140// a new etcdProcessCluster once all nodes are ready to accept client requests.
141func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
142	etcdCfgs := cfg.etcdServerProcessConfigs()
143	epc := &etcdProcessCluster{
144		cfg:   cfg,
145		procs: make([]etcdProcess, cfg.clusterSize),
146	}
147
148	// launch etcd processes
149	for i := range etcdCfgs {
150		proc, err := newEtcdProcess(etcdCfgs[i])
151		if err != nil {
152			epc.Close()
153			return nil, err
154		}
155		epc.procs[i] = proc
156	}
157
158	if err := epc.Start(); err != nil {
159		return nil, err
160	}
161	return epc, nil
162}
163
164func (cfg *etcdProcessClusterConfig) clientScheme() string {
165	if cfg.clientTLS == clientTLS {
166		return "https"
167	}
168	return "http"
169}
170
171func (cfg *etcdProcessClusterConfig) peerScheme() string {
172	peerScheme := cfg.baseScheme
173	if peerScheme == "" {
174		peerScheme = "http"
175	}
176	if cfg.isPeerTLS {
177		peerScheme += "s"
178	}
179	return peerScheme
180}
181
182func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
183	if cfg.basePort == 0 {
184		cfg.basePort = etcdProcessBasePort
185	}
186	if cfg.execPath == "" {
187		cfg.execPath = binPath
188	}
189	if cfg.snapshotCount == 0 {
190		cfg.snapshotCount = etcdserver.DefaultSnapshotCount
191	}
192
193	etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
194	initialCluster := make([]string, cfg.clusterSize)
195	for i := 0; i < cfg.clusterSize; i++ {
196		var curls []string
197		var curl, curltls string
198		port := cfg.basePort + 5*i
199		curlHost := fmt.Sprintf("localhost:%d", port)
200
201		switch cfg.clientTLS {
202		case clientNonTLS, clientTLS:
203			curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
204			curls = []string{curl}
205		case clientTLSAndNonTLS:
206			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
207			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
208			curls = []string{curl, curltls}
209		}
210
211		purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
212		name := fmt.Sprintf("testname%d", i)
213		dataDirPath := cfg.dataDirPath
214		if cfg.dataDirPath == "" {
215			var derr error
216			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
217			if derr != nil {
218				panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
219			}
220		}
221		initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
222
223		args := []string{
224			"--name", name,
225			"--listen-client-urls", strings.Join(curls, ","),
226			"--advertise-client-urls", strings.Join(curls, ","),
227			"--listen-peer-urls", purl.String(),
228			"--initial-advertise-peer-urls", purl.String(),
229			"--initial-cluster-token", cfg.initialToken,
230			"--data-dir", dataDirPath,
231			"--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount),
232		}
233		args = addV2Args(args)
234		if cfg.forceNewCluster {
235			args = append(args, "--force-new-cluster")
236		}
237		if cfg.quotaBackendBytes > 0 {
238			args = append(args,
239				"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
240			)
241		}
242		if cfg.noStrictReconfig {
243			args = append(args, "--strict-reconfig-check=false")
244		}
245		if cfg.enableV2 {
246			args = append(args, "--enable-v2")
247		}
248		if cfg.initialCorruptCheck {
249			args = append(args, "--experimental-initial-corrupt-check")
250		}
251		var murl string
252		if cfg.metricsURLScheme != "" {
253			murl = (&url.URL{
254				Scheme: cfg.metricsURLScheme,
255				Host:   fmt.Sprintf("localhost:%d", port+2),
256			}).String()
257			args = append(args, "--listen-metrics-urls", murl)
258		}
259
260		args = append(args, cfg.tlsArgs()...)
261
262		if cfg.authTokenOpts != "" {
263			args = append(args, "--auth-token", cfg.authTokenOpts)
264		}
265
266		etcdCfgs[i] = &etcdServerProcessConfig{
267			execPath:     cfg.execPath,
268			args:         args,
269			tlsArgs:      cfg.tlsArgs(),
270			dataDirPath:  dataDirPath,
271			keepDataDir:  cfg.keepDataDir,
272			name:         name,
273			purl:         purl,
274			acurl:        curl,
275			murl:         murl,
276			initialToken: cfg.initialToken,
277		}
278	}
279
280	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
281	for i := range etcdCfgs {
282		etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
283		etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
284	}
285
286	return etcdCfgs
287}
288
289func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
290	if cfg.clientTLS != clientNonTLS {
291		if cfg.isClientAutoTLS {
292			args = append(args, "--auto-tls")
293		} else {
294			tlsClientArgs := []string{
295				"--cert-file", certPath,
296				"--key-file", privateKeyPath,
297				"--trusted-ca-file", caPath,
298			}
299			args = append(args, tlsClientArgs...)
300
301			if cfg.clientCertAuthEnabled {
302				args = append(args, "--client-cert-auth")
303			}
304		}
305	}
306
307	if cfg.isPeerTLS {
308		if cfg.isPeerAutoTLS {
309			args = append(args, "--peer-auto-tls")
310		} else {
311			tlsPeerArgs := []string{
312				"--peer-cert-file", certPath,
313				"--peer-key-file", privateKeyPath,
314				"--peer-trusted-ca-file", caPath,
315			}
316			args = append(args, tlsPeerArgs...)
317		}
318	}
319
320	if cfg.isClientCRL {
321		args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
322	}
323
324	if len(cfg.cipherSuites) > 0 {
325		args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
326	}
327
328	return args
329}
330
331func (epc *etcdProcessCluster) EndpointsV2() []string {
332	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
333}
334
335func (epc *etcdProcessCluster) EndpointsV3() []string {
336	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
337}
338
339func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
340	for _, p := range epc.procs {
341		ret = append(ret, f(p)...)
342	}
343	return ret
344}
345
346func (epc *etcdProcessCluster) Start() error {
347	return epc.start(func(ep etcdProcess) error { return ep.Start() })
348}
349
350func (epc *etcdProcessCluster) Restart() error {
351	return epc.start(func(ep etcdProcess) error { return ep.Restart() })
352}
353
354func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
355	readyC := make(chan error, len(epc.procs))
356	for i := range epc.procs {
357		go func(n int) { readyC <- f(epc.procs[n]) }(i)
358	}
359	for range epc.procs {
360		if err := <-readyC; err != nil {
361			epc.Close()
362			return err
363		}
364	}
365	return nil
366}
367
368func (epc *etcdProcessCluster) Stop() (err error) {
369	for _, p := range epc.procs {
370		if p == nil {
371			continue
372		}
373		if curErr := p.Stop(); curErr != nil {
374			if err != nil {
375				err = fmt.Errorf("%v; %v", err, curErr)
376			} else {
377				err = curErr
378			}
379		}
380	}
381	return err
382}
383
384func (epc *etcdProcessCluster) Close() error {
385	err := epc.Stop()
386	for _, p := range epc.procs {
387		// p is nil when newEtcdProcess fails in the middle
388		// Close still gets called to clean up test data
389		if p == nil {
390			continue
391		}
392		if cerr := p.Close(); cerr != nil {
393			err = cerr
394		}
395	}
396	return err
397}
398
399func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
400	for _, p := range epc.procs {
401		ret = p.WithStopSignal(sig)
402	}
403	return ret
404}
405