1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// +build cluster_proxy
16
17package e2e
18
19import (
20	"fmt"
21	"net"
22	"net/url"
23	"os"
24	"strconv"
25	"strings"
26
27	"github.com/coreos/etcd/pkg/expect"
28)
29
30type proxyEtcdProcess struct {
31	etcdProc etcdProcess
32	proxyV2  *proxyV2Proc
33	proxyV3  *proxyV3Proc
34}
35
36func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
37	return newProxyEtcdProcess(cfg)
38}
39
40func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) {
41	ep, err := newEtcdServerProcess(cfg)
42	if err != nil {
43		return nil, err
44	}
45	pep := &proxyEtcdProcess{
46		etcdProc: ep,
47		proxyV2:  newProxyV2Proc(cfg),
48		proxyV3:  newProxyV3Proc(cfg),
49	}
50	return pep, nil
51}
52
53func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }
54
55func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
56func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }
57func (p *proxyEtcdProcess) EndpointsMetrics() []string {
58	panic("not implemented; proxy doesn't provide health information")
59}
60
61func (p *proxyEtcdProcess) Start() error {
62	if err := p.etcdProc.Start(); err != nil {
63		return err
64	}
65	if err := p.proxyV2.Start(); err != nil {
66		return err
67	}
68	return p.proxyV3.Start()
69}
70
71func (p *proxyEtcdProcess) Restart() error {
72	if err := p.etcdProc.Restart(); err != nil {
73		return err
74	}
75	if err := p.proxyV2.Restart(); err != nil {
76		return err
77	}
78	return p.proxyV3.Restart()
79}
80
81func (p *proxyEtcdProcess) Stop() error {
82	err := p.proxyV2.Stop()
83	if v3err := p.proxyV3.Stop(); err == nil {
84		err = v3err
85	}
86	if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
87		// fails on go-grpc issue #1384
88		if !strings.Contains(eerr.Error(), "exit status 2") {
89			err = eerr
90		}
91	}
92	return err
93}
94
95func (p *proxyEtcdProcess) Close() error {
96	err := p.proxyV2.Close()
97	if v3err := p.proxyV3.Close(); err == nil {
98		err = v3err
99	}
100	if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
101		// fails on go-grpc issue #1384
102		if !strings.Contains(eerr.Error(), "exit status 2") {
103			err = eerr
104		}
105	}
106	return err
107}
108
109func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
110	p.proxyV3.WithStopSignal(sig)
111	p.proxyV3.WithStopSignal(sig)
112	return p.etcdProc.WithStopSignal(sig)
113}
114
115type proxyProc struct {
116	execPath string
117	args     []string
118	ep       string
119	murl     string
120	donec    chan struct{}
121
122	proc *expect.ExpectProcess
123}
124
125func (pp *proxyProc) endpoints() []string { return []string{pp.ep} }
126
127func (pp *proxyProc) start() error {
128	if pp.proc != nil {
129		panic("already started")
130	}
131	proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...))
132	if err != nil {
133		return err
134	}
135	pp.proc = proc
136	return nil
137}
138
139func (pp *proxyProc) waitReady(readyStr string) error {
140	defer close(pp.donec)
141	return waitReadyExpectProc(pp.proc, []string{readyStr})
142}
143
144func (pp *proxyProc) Stop() error {
145	if pp.proc == nil {
146		return nil
147	}
148	if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") {
149		// v2proxy exits with status 1 on auto tls; not sure why
150		return err
151	}
152	pp.proc = nil
153	<-pp.donec
154	pp.donec = make(chan struct{})
155	return nil
156}
157
158func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal {
159	ret := pp.proc.StopSignal
160	pp.proc.StopSignal = sig
161	return ret
162}
163
164func (pp *proxyProc) Close() error { return pp.Stop() }
165
166type proxyV2Proc struct {
167	proxyProc
168	dataDir string
169}
170
171func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string {
172	u, err := url.Parse(cfg.acurl)
173	if err != nil {
174		panic(err)
175	}
176	host, port, _ := net.SplitHostPort(u.Host)
177	p, _ := strconv.ParseInt(port, 10, 16)
178	u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset)
179	return u.String()
180}
181
182func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc {
183	listenAddr := proxyListenURL(cfg, 2)
184	name := fmt.Sprintf("testname-proxy-%p", cfg)
185	args := []string{
186		"--name", name,
187		"--proxy", "on",
188		"--listen-client-urls", listenAddr,
189		"--initial-cluster", cfg.name + "=" + cfg.purl.String(),
190	}
191	return &proxyV2Proc{
192		proxyProc{
193			execPath: cfg.execPath,
194			args:     append(args, cfg.tlsArgs...),
195			ep:       listenAddr,
196			donec:    make(chan struct{}),
197		},
198		name + ".etcd",
199	}
200}
201
202func (v2p *proxyV2Proc) Start() error {
203	os.RemoveAll(v2p.dataDir)
204	if err := v2p.start(); err != nil {
205		return err
206	}
207	return v2p.waitReady("httpproxy: endpoints found")
208}
209
210func (v2p *proxyV2Proc) Restart() error {
211	if err := v2p.Stop(); err != nil {
212		return err
213	}
214	return v2p.Start()
215}
216
217func (v2p *proxyV2Proc) Stop() error {
218	if err := v2p.proxyProc.Stop(); err != nil {
219		return err
220	}
221	// v2 proxy caches members; avoid reuse of directory
222	return os.RemoveAll(v2p.dataDir)
223}
224
225type proxyV3Proc struct {
226	proxyProc
227}
228
229func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
230	listenAddr := proxyListenURL(cfg, 3)
231	args := []string{
232		"grpc-proxy",
233		"start",
234		"--listen-addr", strings.Split(listenAddr, "/")[2],
235		"--endpoints", cfg.acurl,
236		// pass-through member RPCs
237		"--advertise-client-url", "",
238	}
239	murl := ""
240	if cfg.murl != "" {
241		murl = proxyListenURL(cfg, 4)
242		args = append(args, "--metrics-addr", murl)
243	}
244	tlsArgs := []string{}
245	for i := 0; i < len(cfg.tlsArgs); i++ {
246		switch cfg.tlsArgs[i] {
247		case "--cert-file":
248			tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1])
249			i++
250		case "--key-file":
251			tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1])
252			i++
253		case "--ca-file":
254			tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1])
255			i++
256		case "--auto-tls":
257			tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify")
258		case "--peer-ca-file", "--peer-cert-file", "--peer-key-file":
259			i++ // skip arg
260		case "--client-cert-auth", "--peer-auto-tls":
261		default:
262			tlsArgs = append(tlsArgs, cfg.tlsArgs[i])
263		}
264	}
265	return &proxyV3Proc{
266		proxyProc{
267			execPath: cfg.execPath,
268			args:     append(args, tlsArgs...),
269			ep:       listenAddr,
270			murl:     murl,
271			donec:    make(chan struct{}),
272		},
273	}
274}
275
276func (v3p *proxyV3Proc) Restart() error {
277	if err := v3p.Stop(); err != nil {
278		return err
279	}
280	return v3p.Start()
281}
282
283func (v3p *proxyV3Proc) Start() error {
284	if err := v3p.start(); err != nil {
285		return err
286	}
287	return v3p.waitReady("listening for grpc-proxy client requests")
288}
289