1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package e2e 16 17import ( 18 "fmt" 19 "io/ioutil" 20 "net/url" 21 "os" 22 "strings" 23 24 "github.com/coreos/etcd/etcdserver" 25) 26 27const etcdProcessBasePort = 20000 28 29type clientConnType int 30 31const ( 32 clientNonTLS clientConnType = iota 33 clientTLS 34 clientTLSAndNonTLS 35) 36 37var ( 38 configNoTLS = etcdProcessClusterConfig{ 39 clusterSize: 3, 40 initialToken: "new", 41 } 42 configAutoTLS = etcdProcessClusterConfig{ 43 clusterSize: 3, 44 isPeerTLS: true, 45 isPeerAutoTLS: true, 46 initialToken: "new", 47 } 48 configTLS = etcdProcessClusterConfig{ 49 clusterSize: 3, 50 clientTLS: clientTLS, 51 isPeerTLS: true, 52 initialToken: "new", 53 } 54 configClientTLS = etcdProcessClusterConfig{ 55 clusterSize: 3, 56 clientTLS: clientTLS, 57 initialToken: "new", 58 } 59 configClientBoth = etcdProcessClusterConfig{ 60 clusterSize: 1, 61 clientTLS: clientTLSAndNonTLS, 62 initialToken: "new", 63 } 64 configClientAutoTLS = etcdProcessClusterConfig{ 65 clusterSize: 1, 66 isClientAutoTLS: true, 67 clientTLS: clientTLS, 68 initialToken: "new", 69 } 70 configPeerTLS = etcdProcessClusterConfig{ 71 clusterSize: 3, 72 isPeerTLS: true, 73 initialToken: "new", 74 } 75 configClientTLSCertAuth = etcdProcessClusterConfig{ 76 clusterSize: 1, 77 clientTLS: clientTLS, 78 initialToken: "new", 79 clientCertAuthEnabled: true, 80 } 81) 82 83func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { 84 ret := cfg 85 ret.clusterSize = 1 86 return &ret 87} 88 89type etcdProcessCluster struct { 90 cfg *etcdProcessClusterConfig 91 procs []etcdProcess 92} 93 94type etcdProcessClusterConfig struct { 95 execPath string 96 dataDirPath string 97 keepDataDir bool 98 99 clusterSize int 100 101 baseScheme string 102 basePort int 103 104 metricsURLScheme string 105 106 snapCount int // default is 10000 107 108 clientTLS clientConnType 109 clientCertAuthEnabled bool 110 isPeerTLS bool 111 isPeerAutoTLS bool 112 isClientAutoTLS bool 113 isClientCRL bool 114 115 cipherSuites []string 116 117 forceNewCluster bool 118 initialToken string 119 quotaBackendBytes int64 120 noStrictReconfig bool 121 initialCorruptCheck bool 122} 123 124// newEtcdProcessCluster launches a new cluster from etcd processes, returning 125// a new etcdProcessCluster once all nodes are ready to accept client requests. 126func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { 127 etcdCfgs := cfg.etcdServerProcessConfigs() 128 epc := &etcdProcessCluster{ 129 cfg: cfg, 130 procs: make([]etcdProcess, cfg.clusterSize), 131 } 132 133 // launch etcd processes 134 for i := range etcdCfgs { 135 proc, err := newEtcdProcess(etcdCfgs[i]) 136 if err != nil { 137 epc.Close() 138 return nil, err 139 } 140 epc.procs[i] = proc 141 } 142 143 if err := epc.Start(); err != nil { 144 return nil, err 145 } 146 return epc, nil 147} 148 149func (cfg *etcdProcessClusterConfig) clientScheme() string { 150 if cfg.clientTLS == clientTLS { 151 return "https" 152 } 153 return "http" 154} 155 156func (cfg *etcdProcessClusterConfig) peerScheme() string { 157 peerScheme := cfg.baseScheme 158 if peerScheme == "" { 159 peerScheme = "http" 160 } 161 if cfg.isPeerTLS { 162 peerScheme += "s" 163 } 164 return peerScheme 165} 166 167func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig { 168 if cfg.basePort == 0 { 169 cfg.basePort = etcdProcessBasePort 170 } 171 if cfg.execPath == "" { 172 cfg.execPath = binPath 173 } 174 if cfg.snapCount == 0 { 175 cfg.snapCount = etcdserver.DefaultSnapCount 176 } 177 178 etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize) 179 initialCluster := make([]string, cfg.clusterSize) 180 for i := 0; i < cfg.clusterSize; i++ { 181 var curls []string 182 var curl, curltls string 183 port := cfg.basePort + 5*i 184 curlHost := fmt.Sprintf("localhost:%d", port) 185 186 switch cfg.clientTLS { 187 case clientNonTLS, clientTLS: 188 curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() 189 curls = []string{curl} 190 case clientTLSAndNonTLS: 191 curl = (&url.URL{Scheme: "http", Host: curlHost}).String() 192 curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() 193 curls = []string{curl, curltls} 194 } 195 196 purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} 197 name := fmt.Sprintf("testname%d", i) 198 dataDirPath := cfg.dataDirPath 199 if cfg.dataDirPath == "" { 200 var derr error 201 dataDirPath, derr = ioutil.TempDir("", name+".etcd") 202 if derr != nil { 203 panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr)) 204 } 205 } 206 initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) 207 208 args := []string{ 209 "--name", name, 210 "--listen-client-urls", strings.Join(curls, ","), 211 "--advertise-client-urls", strings.Join(curls, ","), 212 "--listen-peer-urls", purl.String(), 213 "--initial-advertise-peer-urls", purl.String(), 214 "--initial-cluster-token", cfg.initialToken, 215 "--data-dir", dataDirPath, 216 "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), 217 } 218 args = addV2Args(args) 219 if cfg.forceNewCluster { 220 args = append(args, "--force-new-cluster") 221 } 222 if cfg.quotaBackendBytes > 0 { 223 args = append(args, 224 "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), 225 ) 226 } 227 if cfg.noStrictReconfig { 228 args = append(args, "--strict-reconfig-check=false") 229 } 230 if cfg.initialCorruptCheck { 231 args = append(args, "--experimental-initial-corrupt-check") 232 } 233 var murl string 234 if cfg.metricsURLScheme != "" { 235 murl = (&url.URL{ 236 Scheme: cfg.metricsURLScheme, 237 Host: fmt.Sprintf("localhost:%d", port+2), 238 }).String() 239 args = append(args, "--listen-metrics-urls", murl) 240 } 241 242 args = append(args, cfg.tlsArgs()...) 243 etcdCfgs[i] = &etcdServerProcessConfig{ 244 execPath: cfg.execPath, 245 args: args, 246 tlsArgs: cfg.tlsArgs(), 247 dataDirPath: dataDirPath, 248 keepDataDir: cfg.keepDataDir, 249 name: name, 250 purl: purl, 251 acurl: curl, 252 murl: murl, 253 initialToken: cfg.initialToken, 254 } 255 } 256 257 initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} 258 for i := range etcdCfgs { 259 etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") 260 etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) 261 } 262 263 return etcdCfgs 264} 265 266func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { 267 if cfg.clientTLS != clientNonTLS { 268 if cfg.isClientAutoTLS { 269 args = append(args, "--auto-tls") 270 } else { 271 tlsClientArgs := []string{ 272 "--cert-file", certPath, 273 "--key-file", privateKeyPath, 274 "--ca-file", caPath, 275 } 276 args = append(args, tlsClientArgs...) 277 278 if cfg.clientCertAuthEnabled { 279 args = append(args, "--client-cert-auth") 280 } 281 } 282 } 283 284 if cfg.isPeerTLS { 285 if cfg.isPeerAutoTLS { 286 args = append(args, "--peer-auto-tls") 287 } else { 288 tlsPeerArgs := []string{ 289 "--peer-cert-file", certPath, 290 "--peer-key-file", privateKeyPath, 291 "--peer-ca-file", caPath, 292 } 293 args = append(args, tlsPeerArgs...) 294 } 295 } 296 297 if cfg.isClientCRL { 298 args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") 299 } 300 301 if len(cfg.cipherSuites) > 0 { 302 args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ",")) 303 } 304 305 return args 306} 307 308func (epc *etcdProcessCluster) EndpointsV2() []string { 309 return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() }) 310} 311 312func (epc *etcdProcessCluster) EndpointsV3() []string { 313 return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() }) 314} 315 316func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) { 317 for _, p := range epc.procs { 318 ret = append(ret, f(p)...) 319 } 320 return ret 321} 322 323func (epc *etcdProcessCluster) Start() error { 324 return epc.start(func(ep etcdProcess) error { return ep.Start() }) 325} 326 327func (epc *etcdProcessCluster) Restart() error { 328 return epc.start(func(ep etcdProcess) error { return ep.Restart() }) 329} 330 331func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error { 332 readyC := make(chan error, len(epc.procs)) 333 for i := range epc.procs { 334 go func(n int) { readyC <- f(epc.procs[n]) }(i) 335 } 336 for range epc.procs { 337 if err := <-readyC; err != nil { 338 epc.Close() 339 return err 340 } 341 } 342 return nil 343} 344 345func (epc *etcdProcessCluster) Stop() (err error) { 346 for _, p := range epc.procs { 347 if p == nil { 348 continue 349 } 350 if curErr := p.Stop(); curErr != nil { 351 if err != nil { 352 err = fmt.Errorf("%v; %v", err, curErr) 353 } else { 354 err = curErr 355 } 356 } 357 } 358 return err 359} 360 361func (epc *etcdProcessCluster) Close() error { 362 err := epc.Stop() 363 for _, p := range epc.procs { 364 // p is nil when newEtcdProcess fails in the middle 365 // Close still gets called to clean up test data 366 if p == nil { 367 continue 368 } 369 if cerr := p.Close(); cerr != nil { 370 err = cerr 371 } 372 } 373 return err 374} 375 376func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { 377 for _, p := range epc.procs { 378 ret = p.WithStopSignal(sig) 379 } 380 return ret 381} 382