1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package e2e 16 17import ( 18 "fmt" 19 "io/ioutil" 20 "net/url" 21 "os" 22 "strings" 23 "time" 24 25 "go.etcd.io/etcd/etcdserver" 26) 27 28const etcdProcessBasePort = 20000 29 30type clientConnType int 31 32const ( 33 clientNonTLS clientConnType = iota 34 clientTLS 35 clientTLSAndNonTLS 36) 37 38var ( 39 configNoTLS = etcdProcessClusterConfig{ 40 clusterSize: 3, 41 initialToken: "new", 42 } 43 configAutoTLS = etcdProcessClusterConfig{ 44 clusterSize: 3, 45 isPeerTLS: true, 46 isPeerAutoTLS: true, 47 initialToken: "new", 48 } 49 configTLS = etcdProcessClusterConfig{ 50 clusterSize: 3, 51 clientTLS: clientTLS, 52 isPeerTLS: true, 53 initialToken: "new", 54 } 55 configClientTLS = etcdProcessClusterConfig{ 56 clusterSize: 3, 57 clientTLS: clientTLS, 58 initialToken: "new", 59 } 60 configClientBoth = etcdProcessClusterConfig{ 61 clusterSize: 1, 62 clientTLS: clientTLSAndNonTLS, 63 initialToken: "new", 64 } 65 configClientAutoTLS = etcdProcessClusterConfig{ 66 clusterSize: 1, 67 isClientAutoTLS: true, 68 clientTLS: clientTLS, 69 initialToken: "new", 70 } 71 configPeerTLS = etcdProcessClusterConfig{ 72 clusterSize: 3, 73 isPeerTLS: true, 74 initialToken: "new", 75 } 76 configClientTLSCertAuth = etcdProcessClusterConfig{ 77 clusterSize: 1, 78 clientTLS: clientTLS, 79 initialToken: "new", 80 clientCertAuthEnabled: true, 81 } 82 configClientTLSCertAuthWithNoCN = etcdProcessClusterConfig{ 83 clusterSize: 1, 84 clientTLS: clientTLS, 85 initialToken: "new", 86 clientCertAuthEnabled: true, 87 noCN: true, 88 } 89 configJWT = etcdProcessClusterConfig{ 90 clusterSize: 1, 91 initialToken: "new", 92 authTokenOpts: "jwt,pub-key=../../integration/fixtures/server.crt,priv-key=../../integration/fixtures/server.key.insecure,sign-method=RS256,ttl=1s", 93 } 94) 95 96func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { 97 ret := cfg 98 ret.clusterSize = 1 99 return &ret 100} 101 102type etcdProcessCluster struct { 103 cfg *etcdProcessClusterConfig 104 procs []etcdProcess 105} 106 107type etcdProcessClusterConfig struct { 108 execPath string 109 dataDirPath string 110 keepDataDir bool 111 112 clusterSize int 113 114 baseScheme string 115 basePort int 116 117 metricsURLScheme string 118 119 snapshotCount int // default is 10000 120 121 clientTLS clientConnType 122 clientCertAuthEnabled bool 123 isPeerTLS bool 124 isPeerAutoTLS bool 125 isClientAutoTLS bool 126 isClientCRL bool 127 noCN bool 128 129 cipherSuites []string 130 131 forceNewCluster bool 132 initialToken string 133 quotaBackendBytes int64 134 noStrictReconfig bool 135 enableV2 bool 136 initialCorruptCheck bool 137 authTokenOpts string 138 139 rollingStart bool 140} 141 142// newEtcdProcessCluster launches a new cluster from etcd processes, returning 143// a new etcdProcessCluster once all nodes are ready to accept client requests. 144func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { 145 etcdCfgs := cfg.etcdServerProcessConfigs() 146 epc := &etcdProcessCluster{ 147 cfg: cfg, 148 procs: make([]etcdProcess, cfg.clusterSize), 149 } 150 151 // launch etcd processes 152 for i := range etcdCfgs { 153 proc, err := newEtcdProcess(etcdCfgs[i]) 154 if err != nil { 155 epc.Close() 156 return nil, err 157 } 158 epc.procs[i] = proc 159 } 160 161 if cfg.rollingStart { 162 if err := epc.RollingStart(); err != nil { 163 return nil, err 164 } 165 } else { 166 if err := epc.Start(); err != nil { 167 return nil, err 168 } 169 } 170 return epc, nil 171} 172 173func (cfg *etcdProcessClusterConfig) clientScheme() string { 174 if cfg.clientTLS == clientTLS { 175 return "https" 176 } 177 return "http" 178} 179 180func (cfg *etcdProcessClusterConfig) peerScheme() string { 181 peerScheme := cfg.baseScheme 182 if peerScheme == "" { 183 peerScheme = "http" 184 } 185 if cfg.isPeerTLS { 186 peerScheme += "s" 187 } 188 return peerScheme 189} 190 191func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig { 192 if cfg.basePort == 0 { 193 cfg.basePort = etcdProcessBasePort 194 } 195 if cfg.execPath == "" { 196 cfg.execPath = binPath 197 } 198 if cfg.snapshotCount == 0 { 199 cfg.snapshotCount = etcdserver.DefaultSnapshotCount 200 } 201 202 etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize) 203 initialCluster := make([]string, cfg.clusterSize) 204 for i := 0; i < cfg.clusterSize; i++ { 205 var curls []string 206 var curl, curltls string 207 port := cfg.basePort + 5*i 208 curlHost := fmt.Sprintf("localhost:%d", port) 209 210 switch cfg.clientTLS { 211 case clientNonTLS, clientTLS: 212 curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() 213 curls = []string{curl} 214 case clientTLSAndNonTLS: 215 curl = (&url.URL{Scheme: "http", Host: curlHost}).String() 216 curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() 217 curls = []string{curl, curltls} 218 } 219 220 purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} 221 name := fmt.Sprintf("testname%d", i) 222 dataDirPath := cfg.dataDirPath 223 if cfg.dataDirPath == "" { 224 var derr error 225 dataDirPath, derr = ioutil.TempDir("", name+".etcd") 226 if derr != nil { 227 panic(fmt.Sprintf("could not get tempdir for datadir: %s", derr)) 228 } 229 } 230 initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) 231 232 args := []string{ 233 "--name", name, 234 "--listen-client-urls", strings.Join(curls, ","), 235 "--advertise-client-urls", strings.Join(curls, ","), 236 "--listen-peer-urls", purl.String(), 237 "--initial-advertise-peer-urls", purl.String(), 238 "--initial-cluster-token", cfg.initialToken, 239 "--data-dir", dataDirPath, 240 "--snapshot-count", fmt.Sprintf("%d", cfg.snapshotCount), 241 } 242 args = addV2Args(args) 243 if cfg.forceNewCluster { 244 args = append(args, "--force-new-cluster") 245 } 246 if cfg.quotaBackendBytes > 0 { 247 args = append(args, 248 "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), 249 ) 250 } 251 if cfg.noStrictReconfig { 252 args = append(args, "--strict-reconfig-check=false") 253 } 254 if cfg.enableV2 { 255 args = append(args, "--enable-v2") 256 } 257 if cfg.initialCorruptCheck { 258 args = append(args, "--experimental-initial-corrupt-check") 259 } 260 var murl string 261 if cfg.metricsURLScheme != "" { 262 murl = (&url.URL{ 263 Scheme: cfg.metricsURLScheme, 264 Host: fmt.Sprintf("localhost:%d", port+2), 265 }).String() 266 args = append(args, "--listen-metrics-urls", murl) 267 } 268 269 args = append(args, cfg.tlsArgs()...) 270 271 if cfg.authTokenOpts != "" { 272 args = append(args, "--auth-token", cfg.authTokenOpts) 273 } 274 275 etcdCfgs[i] = &etcdServerProcessConfig{ 276 execPath: cfg.execPath, 277 args: args, 278 tlsArgs: cfg.tlsArgs(), 279 dataDirPath: dataDirPath, 280 keepDataDir: cfg.keepDataDir, 281 name: name, 282 purl: purl, 283 acurl: curl, 284 murl: murl, 285 initialToken: cfg.initialToken, 286 } 287 } 288 289 initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} 290 for i := range etcdCfgs { 291 etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") 292 etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) 293 } 294 295 return etcdCfgs 296} 297 298func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { 299 if cfg.clientTLS != clientNonTLS { 300 if cfg.isClientAutoTLS { 301 args = append(args, "--auto-tls") 302 } else { 303 tlsClientArgs := []string{ 304 "--cert-file", certPath, 305 "--key-file", privateKeyPath, 306 "--trusted-ca-file", caPath, 307 } 308 args = append(args, tlsClientArgs...) 309 310 if cfg.clientCertAuthEnabled { 311 args = append(args, "--client-cert-auth") 312 } 313 } 314 } 315 316 if cfg.isPeerTLS { 317 if cfg.isPeerAutoTLS { 318 args = append(args, "--peer-auto-tls") 319 } else { 320 tlsPeerArgs := []string{ 321 "--peer-cert-file", certPath, 322 "--peer-key-file", privateKeyPath, 323 "--peer-trusted-ca-file", caPath, 324 } 325 args = append(args, tlsPeerArgs...) 326 } 327 } 328 329 if cfg.isClientCRL { 330 args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") 331 } 332 333 if len(cfg.cipherSuites) > 0 { 334 args = append(args, "--cipher-suites", strings.Join(cfg.cipherSuites, ",")) 335 } 336 337 return args 338} 339 340func (epc *etcdProcessCluster) EndpointsV2() []string { 341 return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() }) 342} 343 344func (epc *etcdProcessCluster) EndpointsV3() []string { 345 return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() }) 346} 347 348func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) { 349 for _, p := range epc.procs { 350 ret = append(ret, f(p)...) 351 } 352 return ret 353} 354 355func (epc *etcdProcessCluster) Start() error { 356 return epc.start(func(ep etcdProcess) error { return ep.Start() }) 357} 358 359func (epc *etcdProcessCluster) RollingStart() error { 360 return epc.rollingStart(func(ep etcdProcess) error { return ep.Start() }) 361} 362 363func (epc *etcdProcessCluster) Restart() error { 364 return epc.start(func(ep etcdProcess) error { return ep.Restart() }) 365} 366 367func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error { 368 readyC := make(chan error, len(epc.procs)) 369 for i := range epc.procs { 370 go func(n int) { readyC <- f(epc.procs[n]) }(i) 371 } 372 for range epc.procs { 373 if err := <-readyC; err != nil { 374 epc.Close() 375 return err 376 } 377 } 378 return nil 379} 380 381func (epc *etcdProcessCluster) rollingStart(f func(ep etcdProcess) error) error { 382 readyC := make(chan error, len(epc.procs)) 383 for i := range epc.procs { 384 go func(n int) { readyC <- f(epc.procs[n]) }(i) 385 // make sure the servers do not start at the same time 386 time.Sleep(time.Second) 387 } 388 for range epc.procs { 389 if err := <-readyC; err != nil { 390 epc.Close() 391 return err 392 } 393 } 394 return nil 395} 396 397func (epc *etcdProcessCluster) Stop() (err error) { 398 for _, p := range epc.procs { 399 if p == nil { 400 continue 401 } 402 if curErr := p.Stop(); curErr != nil { 403 if err != nil { 404 err = fmt.Errorf("%v; %v", err, curErr) 405 } else { 406 err = curErr 407 } 408 } 409 } 410 return err 411} 412 413func (epc *etcdProcessCluster) Close() error { 414 err := epc.Stop() 415 for _, p := range epc.procs { 416 // p is nil when newEtcdProcess fails in the middle 417 // Close still gets called to clean up test data 418 if p == nil { 419 continue 420 } 421 if cerr := p.Close(); cerr != nil { 422 err = cerr 423 } 424 } 425 return err 426} 427 428func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { 429 for _, p := range epc.procs { 430 ret = p.WithStopSignal(sig) 431 } 432 return ret 433} 434