1// Copyright 2017 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// +build cluster_proxy 16 17package e2e 18 19import ( 20 "fmt" 21 "net" 22 "net/url" 23 "os" 24 "strconv" 25 "strings" 26 27 "github.com/coreos/etcd/pkg/expect" 28) 29 30type proxyEtcdProcess struct { 31 etcdProc etcdProcess 32 proxyV2 *proxyV2Proc 33 proxyV3 *proxyV3Proc 34} 35 36func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { 37 return newProxyEtcdProcess(cfg) 38} 39 40func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) { 41 ep, err := newEtcdServerProcess(cfg) 42 if err != nil { 43 return nil, err 44 } 45 pep := &proxyEtcdProcess{ 46 etcdProc: ep, 47 proxyV2: newProxyV2Proc(cfg), 48 proxyV3: newProxyV3Proc(cfg), 49 } 50 return pep, nil 51} 52 53func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } 54 55func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } 56func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } 57func (p *proxyEtcdProcess) EndpointsMetrics() []string { 58 panic("not implemented; proxy doesn't provide health information") 59} 60 61func (p *proxyEtcdProcess) Start() error { 62 if err := p.etcdProc.Start(); err != nil { 63 return err 64 } 65 if err := p.proxyV2.Start(); err != nil { 66 return err 67 } 68 return p.proxyV3.Start() 69} 70 71func (p *proxyEtcdProcess) Restart() error { 72 if err := p.etcdProc.Restart(); err != nil { 73 return err 74 } 75 if err := p.proxyV2.Restart(); err != nil { 76 return err 77 } 78 return p.proxyV3.Restart() 79} 80 81func (p *proxyEtcdProcess) Stop() error { 82 err := p.proxyV2.Stop() 83 if v3err := p.proxyV3.Stop(); err == nil { 84 err = v3err 85 } 86 if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { 87 // fails on go-grpc issue #1384 88 if !strings.Contains(eerr.Error(), "exit status 2") { 89 err = eerr 90 } 91 } 92 return err 93} 94 95func (p *proxyEtcdProcess) Close() error { 96 err := p.proxyV2.Close() 97 if v3err := p.proxyV3.Close(); err == nil { 98 err = v3err 99 } 100 if eerr := p.etcdProc.Close(); eerr != nil && err == nil { 101 // fails on go-grpc issue #1384 102 if !strings.Contains(eerr.Error(), "exit status 2") { 103 err = eerr 104 } 105 } 106 return err 107} 108 109func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { 110 p.proxyV3.WithStopSignal(sig) 111 p.proxyV3.WithStopSignal(sig) 112 return p.etcdProc.WithStopSignal(sig) 113} 114 115type proxyProc struct { 116 execPath string 117 args []string 118 ep string 119 murl string 120 donec chan struct{} 121 122 proc *expect.ExpectProcess 123} 124 125func (pp *proxyProc) endpoints() []string { return []string{pp.ep} } 126 127func (pp *proxyProc) start() error { 128 if pp.proc != nil { 129 panic("already started") 130 } 131 proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...)) 132 if err != nil { 133 return err 134 } 135 pp.proc = proc 136 return nil 137} 138 139func (pp *proxyProc) waitReady(readyStr string) error { 140 defer close(pp.donec) 141 return waitReadyExpectProc(pp.proc, []string{readyStr}) 142} 143 144func (pp *proxyProc) Stop() error { 145 if pp.proc == nil { 146 return nil 147 } 148 if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") { 149 // v2proxy exits with status 1 on auto tls; not sure why 150 return err 151 } 152 pp.proc = nil 153 <-pp.donec 154 pp.donec = make(chan struct{}) 155 return nil 156} 157 158func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal { 159 ret := pp.proc.StopSignal 160 pp.proc.StopSignal = sig 161 return ret 162} 163 164func (pp *proxyProc) Close() error { return pp.Stop() } 165 166type proxyV2Proc struct { 167 proxyProc 168 dataDir string 169} 170 171func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string { 172 u, err := url.Parse(cfg.acurl) 173 if err != nil { 174 panic(err) 175 } 176 host, port, _ := net.SplitHostPort(u.Host) 177 p, _ := strconv.ParseInt(port, 10, 16) 178 u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset) 179 return u.String() 180} 181 182func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc { 183 listenAddr := proxyListenURL(cfg, 2) 184 name := fmt.Sprintf("testname-proxy-%p", cfg) 185 args := []string{ 186 "--name", name, 187 "--proxy", "on", 188 "--listen-client-urls", listenAddr, 189 "--initial-cluster", cfg.name + "=" + cfg.purl.String(), 190 } 191 return &proxyV2Proc{ 192 proxyProc{ 193 execPath: cfg.execPath, 194 args: append(args, cfg.tlsArgs...), 195 ep: listenAddr, 196 donec: make(chan struct{}), 197 }, 198 name + ".etcd", 199 } 200} 201 202func (v2p *proxyV2Proc) Start() error { 203 os.RemoveAll(v2p.dataDir) 204 if err := v2p.start(); err != nil { 205 return err 206 } 207 return v2p.waitReady("httpproxy: endpoints found") 208} 209 210func (v2p *proxyV2Proc) Restart() error { 211 if err := v2p.Stop(); err != nil { 212 return err 213 } 214 return v2p.Start() 215} 216 217func (v2p *proxyV2Proc) Stop() error { 218 if err := v2p.proxyProc.Stop(); err != nil { 219 return err 220 } 221 // v2 proxy caches members; avoid reuse of directory 222 return os.RemoveAll(v2p.dataDir) 223} 224 225type proxyV3Proc struct { 226 proxyProc 227} 228 229func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc { 230 listenAddr := proxyListenURL(cfg, 3) 231 args := []string{ 232 "grpc-proxy", 233 "start", 234 "--listen-addr", strings.Split(listenAddr, "/")[2], 235 "--endpoints", cfg.acurl, 236 // pass-through member RPCs 237 "--advertise-client-url", "", 238 } 239 murl := "" 240 if cfg.murl != "" { 241 murl = proxyListenURL(cfg, 4) 242 args = append(args, "--metrics-addr", murl) 243 } 244 tlsArgs := []string{} 245 for i := 0; i < len(cfg.tlsArgs); i++ { 246 switch cfg.tlsArgs[i] { 247 case "--cert-file": 248 tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1]) 249 i++ 250 case "--key-file": 251 tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1]) 252 i++ 253 case "--ca-file": 254 tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1]) 255 i++ 256 case "--auto-tls": 257 tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify") 258 case "--peer-ca-file", "--peer-cert-file", "--peer-key-file": 259 i++ // skip arg 260 case "--client-cert-auth", "--peer-auto-tls": 261 default: 262 tlsArgs = append(tlsArgs, cfg.tlsArgs[i]) 263 } 264 } 265 return &proxyV3Proc{ 266 proxyProc{ 267 execPath: cfg.execPath, 268 args: append(args, tlsArgs...), 269 ep: listenAddr, 270 murl: murl, 271 donec: make(chan struct{}), 272 }, 273 } 274} 275 276func (v3p *proxyV3Proc) Restart() error { 277 if err := v3p.Stop(); err != nil { 278 return err 279 } 280 return v3p.Start() 281} 282 283func (v3p *proxyV3Proc) Start() error { 284 if err := v3p.start(); err != nil { 285 return err 286 } 287 return v3p.waitReady("listening for grpc-proxy client requests") 288} 289