1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package e2e
16
17import (
18	"fmt"
19	"io/ioutil"
20	"net/url"
21	"os"
22	"strings"
23
24	"github.com/coreos/etcd/etcdserver"
25)
26
27const etcdProcessBasePort = 20000
28
29type clientConnType int
30
31const (
32	clientNonTLS clientConnType = iota
33	clientTLS
34	clientTLSAndNonTLS
35)
36
37var (
38	configNoTLS = etcdProcessClusterConfig{
39		clusterSize:  3,
40		initialToken: "new",
41	}
42	configAutoTLS = etcdProcessClusterConfig{
43		clusterSize:   3,
44		isPeerTLS:     true,
45		isPeerAutoTLS: true,
46		initialToken:  "new",
47	}
48	configTLS = etcdProcessClusterConfig{
49		clusterSize:  3,
50		clientTLS:    clientTLS,
51		isPeerTLS:    true,
52		initialToken: "new",
53	}
54	configClientTLS = etcdProcessClusterConfig{
55		clusterSize:  3,
56		clientTLS:    clientTLS,
57		initialToken: "new",
58	}
59	configClientBoth = etcdProcessClusterConfig{
60		clusterSize:  1,
61		clientTLS:    clientTLSAndNonTLS,
62		initialToken: "new",
63	}
64	configClientAutoTLS = etcdProcessClusterConfig{
65		clusterSize:     1,
66		isClientAutoTLS: true,
67		clientTLS:       clientTLS,
68		initialToken:    "new",
69	}
70	configPeerTLS = etcdProcessClusterConfig{
71		clusterSize:  3,
72		isPeerTLS:    true,
73		initialToken: "new",
74	}
75	configClientTLSCertAuth = etcdProcessClusterConfig{
76		clusterSize:           1,
77		clientTLS:             clientTLS,
78		initialToken:          "new",
79		clientCertAuthEnabled: true,
80	}
81)
82
83func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig {
84	ret := cfg
85	ret.clusterSize = 1
86	return &ret
87}
88
89type etcdProcessCluster struct {
90	cfg   *etcdProcessClusterConfig
91	procs []etcdProcess
92}
93
94type etcdProcessClusterConfig struct {
95	execPath    string
96	dataDirPath string
97	keepDataDir bool
98
99	clusterSize int
100
101	baseScheme string
102	basePort   int
103
104	metricsURLScheme string
105
106	snapCount int // default is 10000
107
108	clientTLS             clientConnType
109	clientCertAuthEnabled bool
110	isPeerTLS             bool
111	isPeerAutoTLS         bool
112	isClientAutoTLS       bool
113	isClientCRL           bool
114
115	cipherSuites []string
116
117	forceNewCluster     bool
118	initialToken        string
119	quotaBackendBytes   int64
120	noStrictReconfig    bool
121	initialCorruptCheck bool
122}
123
124// newEtcdProcessCluster launches a new cluster from etcd processes, returning
125// a new etcdProcessCluster once all nodes are ready to accept client requests.
126func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
127	etcdCfgs := cfg.etcdServerProcessConfigs()
128	epc := &etcdProcessCluster{
129		cfg:   cfg,
130		procs: make([]etcdProcess, cfg.clusterSize),
131	}
132
133	// launch etcd processes
134	for i := range etcdCfgs {
135		proc, err := newEtcdProcess(etcdCfgs[i])
136		if err != nil {
137			epc.Close()
138			return nil, err
139		}
140		epc.procs[i] = proc
141	}
142
143	if err := epc.Start(); err != nil {
144		return nil, err
145	}
146	return epc, nil
147}
148
149func (cfg *etcdProcessClusterConfig) clientScheme() string {
150	if cfg.clientTLS == clientTLS {
151		return "https"
152	}
153	return "http"
154}
155
156func (cfg *etcdProcessClusterConfig) peerScheme() string {
157	peerScheme := cfg.baseScheme
158	if peerScheme == "" {
159		peerScheme = "http"
160	}
161	if cfg.isPeerTLS {
162		peerScheme += "s"
163	}
164	return peerScheme
165}
166
167func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig {
168	if cfg.basePort == 0 {
169		cfg.basePort = etcdProcessBasePort
170	}
171	if cfg.execPath == "" {
172		cfg.execPath = binPath
173	}
174	if cfg.snapCount == 0 {
175		cfg.snapCount = etcdserver.DefaultSnapCount
176	}
177
178	etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize)
179	initialCluster := make([]string, cfg.clusterSize)
180	for i := 0; i < cfg.clusterSize; i++ {
181		var curls []string
182		var curl, curltls string
183		port := cfg.basePort + 5*i
184		curlHost := fmt.Sprintf("localhost:%d", port)
185
186		switch cfg.clientTLS {
187		case clientNonTLS, clientTLS:
188			curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String()
189			curls = []string{curl}
190		case clientTLSAndNonTLS:
191			curl = (&url.URL{Scheme: "http", Host: curlHost}).String()
192			curltls = (&url.URL{Scheme: "https", Host: curlHost}).String()
193			curls = []string{curl, curltls}
194		}
195
196		purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
197		name := fmt.Sprintf("testname%d", i)
198		dataDirPath := cfg.dataDirPath
199		if cfg.dataDirPath == "" {
200			var derr error
201			dataDirPath, derr = ioutil.TempDir("", name+".etcd")
202			if derr != nil {
203				panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr))
204			}
205		}
206		initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String())
207
208		args := []string{
209			"--name", name,
210			"--listen-client-urls", strings.Join(curls, ","),
211			"--advertise-client-urls", strings.Join(curls, ","),
212			"--listen-peer-urls", purl.String(),
213			"--initial-advertise-peer-urls", purl.String(),
214			"--initial-cluster-token", cfg.initialToken,
215			"--data-dir", dataDirPath,
216			"--snapshot-count", fmt.Sprintf("%d", cfg.snapCount),
217		}
218		args = addV2Args(args)
219		if cfg.forceNewCluster {
220			args = append(args, "--force-new-cluster")
221		}
222		if cfg.quotaBackendBytes > 0 {
223			args = append(args,
224				"--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes),
225			)
226		}
227		if cfg.noStrictReconfig {
228			args = append(args, "--strict-reconfig-check=false")
229		}
230		if cfg.initialCorruptCheck {
231			args = append(args, "--experimental-initial-corrupt-check")
232		}
233		var murl string
234		if cfg.metricsURLScheme != "" {
235			murl = (&url.URL{
236				Scheme: cfg.metricsURLScheme,
237				Host:   fmt.Sprintf("localhost:%d", port+2),
238			}).String()
239			args = append(args, "--listen-metrics-urls", murl)
240		}
241
242		args = append(args, cfg.tlsArgs()...)
243		etcdCfgs[i] = &etcdServerProcessConfig{
244			execPath:     cfg.execPath,
245			args:         args,
246			tlsArgs:      cfg.tlsArgs(),
247			dataDirPath:  dataDirPath,
248			keepDataDir:  cfg.keepDataDir,
249			name:         name,
250			purl:         purl,
251			acurl:        curl,
252			murl:         murl,
253			initialToken: cfg.initialToken,
254		}
255	}
256
257	initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
258	for i := range etcdCfgs {
259		etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",")
260		etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
261	}
262
263	return etcdCfgs
264}
265
266func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) {
267	if cfg.clientTLS != clientNonTLS {
268		if cfg.isClientAutoTLS {
269			args = append(args, "--auto-tls")
270		} else {
271			tlsClientArgs := []string{
272				"--cert-file", certPath,
273				"--key-file", privateKeyPath,
274				"--ca-file", caPath,
275			}
276			args = append(args, tlsClientArgs...)
277
278			if cfg.clientCertAuthEnabled {
279				args = append(args, "--client-cert-auth")
280			}
281		}
282	}
283
284	if cfg.isPeerTLS {
285		if cfg.isPeerAutoTLS {
286			args = append(args, "--peer-auto-tls")
287		} else {
288			tlsPeerArgs := []string{
289				"--peer-cert-file", certPath,
290				"--peer-key-file", privateKeyPath,
291				"--peer-ca-file", caPath,
292			}
293			args = append(args, tlsPeerArgs...)
294		}
295	}
296
297	if cfg.isClientCRL {
298		args = append(args, "--client-crl-file", crlPath, "--client-cert-auth")
299	}
300
301	if len(cfg.cipherSuites) > 0 {
302		args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ","))
303	}
304
305	return args
306}
307
308func (epc *etcdProcessCluster) EndpointsV2() []string {
309	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() })
310}
311
312func (epc *etcdProcessCluster) EndpointsV3() []string {
313	return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() })
314}
315
316func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) {
317	for _, p := range epc.procs {
318		ret = append(ret, f(p)...)
319	}
320	return ret
321}
322
323func (epc *etcdProcessCluster) Start() error {
324	return epc.start(func(ep etcdProcess) error { return ep.Start() })
325}
326
327func (epc *etcdProcessCluster) Restart() error {
328	return epc.start(func(ep etcdProcess) error { return ep.Restart() })
329}
330
331func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error {
332	readyC := make(chan error, len(epc.procs))
333	for i := range epc.procs {
334		go func(n int) { readyC <- f(epc.procs[n]) }(i)
335	}
336	for range epc.procs {
337		if err := <-readyC; err != nil {
338			epc.Close()
339			return err
340		}
341	}
342	return nil
343}
344
345func (epc *etcdProcessCluster) Stop() (err error) {
346	for _, p := range epc.procs {
347		if p == nil {
348			continue
349		}
350		if curErr := p.Stop(); curErr != nil {
351			if err != nil {
352				err = fmt.Errorf("%v; %v", err, curErr)
353			} else {
354				err = curErr
355			}
356		}
357	}
358	return err
359}
360
361func (epc *etcdProcessCluster) Close() error {
362	err := epc.Stop()
363	for _, p := range epc.procs {
364		// p is nil when newEtcdProcess fails in the middle
365		// Close still gets called to clean up test data
366		if p == nil {
367			continue
368		}
369		if cerr := p.Close(); cerr != nil {
370			err = cerr
371		}
372	}
373	return err
374}
375
376func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
377	for _, p := range epc.procs {
378		ret = p.WithStopSignal(sig)
379	}
380	return ret
381}
382