1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package v2discovery provides an implementation of the cluster discovery that 16// is used by etcd with v2 client. 17package v2discovery 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "math" 24 "net/http" 25 "net/url" 26 "path" 27 "sort" 28 "strconv" 29 "strings" 30 "time" 31 32 "go.etcd.io/etcd/client" 33 "go.etcd.io/etcd/pkg/transport" 34 "go.etcd.io/etcd/pkg/types" 35 36 "github.com/coreos/pkg/capnslog" 37 "github.com/jonboulle/clockwork" 38 "go.uber.org/zap" 39) 40 41var ( 42 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "discovery") 43 44 ErrInvalidURL = errors.New("discovery: invalid URL") 45 ErrBadSizeKey = errors.New("discovery: size key is bad") 46 ErrSizeNotFound = errors.New("discovery: size key not found") 47 ErrTokenNotFound = errors.New("discovery: token not found") 48 ErrDuplicateID = errors.New("discovery: found duplicate id") 49 ErrDuplicateName = errors.New("discovery: found duplicate name") 50 ErrFullCluster = errors.New("discovery: cluster is full") 51 ErrTooManyRetries = errors.New("discovery: too many retries") 52 ErrBadDiscoveryEndpoint = errors.New("discovery: bad discovery endpoint") 53) 54 55var ( 56 // Number of retries discovery will attempt before giving up and erroring out. 57 nRetries = uint(math.MaxUint32) 58 maxExpoentialRetries = uint(8) 59) 60 61// JoinCluster will connect to the discovery service at the given url, and 62// register the server represented by the given id and config to the cluster 63func JoinCluster(lg *zap.Logger, durl, dproxyurl string, id types.ID, config string) (string, error) { 64 d, err := newDiscovery(lg, durl, dproxyurl, id) 65 if err != nil { 66 return "", err 67 } 68 return d.joinCluster(config) 69} 70 71// GetCluster will connect to the discovery service at the given url and 72// retrieve a string describing the cluster 73func GetCluster(lg *zap.Logger, durl, dproxyurl string) (string, error) { 74 d, err := newDiscovery(lg, durl, dproxyurl, 0) 75 if err != nil { 76 return "", err 77 } 78 return d.getCluster() 79} 80 81type discovery struct { 82 lg *zap.Logger 83 cluster string 84 id types.ID 85 c client.KeysAPI 86 retries uint 87 url *url.URL 88 89 clock clockwork.Clock 90} 91 92// newProxyFunc builds a proxy function from the given string, which should 93// represent a URL that can be used as a proxy. It performs basic 94// sanitization of the URL and returns any error encountered. 95func newProxyFunc(lg *zap.Logger, proxy string) (func(*http.Request) (*url.URL, error), error) { 96 if proxy == "" { 97 return nil, nil 98 } 99 // Do a small amount of URL sanitization to help the user 100 // Derived from net/http.ProxyFromEnvironment 101 proxyURL, err := url.Parse(proxy) 102 if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") { 103 // proxy was bogus. Try prepending "http://" to it and 104 // see if that parses correctly. If not, we ignore the 105 // error and complain about the original one 106 var err2 error 107 proxyURL, err2 = url.Parse("http://" + proxy) 108 if err2 == nil { 109 err = nil 110 } 111 } 112 if err != nil { 113 return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err) 114 } 115 116 if lg != nil { 117 lg.Info("running proxy with discovery", zap.String("proxy-url", proxyURL.String())) 118 } else { 119 plog.Infof("using proxy %q", proxyURL.String()) 120 } 121 return http.ProxyURL(proxyURL), nil 122} 123 124func newDiscovery(lg *zap.Logger, durl, dproxyurl string, id types.ID) (*discovery, error) { 125 u, err := url.Parse(durl) 126 if err != nil { 127 return nil, err 128 } 129 token := u.Path 130 u.Path = "" 131 pf, err := newProxyFunc(lg, dproxyurl) 132 if err != nil { 133 return nil, err 134 } 135 136 // TODO: add ResponseHeaderTimeout back when watch on discovery service writes header early 137 tr, err := transport.NewTransport(transport.TLSInfo{}, 30*time.Second) 138 if err != nil { 139 return nil, err 140 } 141 tr.Proxy = pf 142 cfg := client.Config{ 143 Transport: tr, 144 Endpoints: []string{u.String()}, 145 } 146 c, err := client.New(cfg) 147 if err != nil { 148 return nil, err 149 } 150 dc := client.NewKeysAPIWithPrefix(c, "") 151 return &discovery{ 152 lg: lg, 153 cluster: token, 154 c: dc, 155 id: id, 156 url: u, 157 clock: clockwork.NewRealClock(), 158 }, nil 159} 160 161func (d *discovery) joinCluster(config string) (string, error) { 162 // fast path: if the cluster is full, return the error 163 // do not need to register to the cluster in this case. 164 if _, _, _, err := d.checkCluster(); err != nil { 165 return "", err 166 } 167 168 if err := d.createSelf(config); err != nil { 169 // Fails, even on a timeout, if createSelf times out. 170 // TODO(barakmich): Retrying the same node might want to succeed here 171 // (ie, createSelf should be idempotent for discovery). 172 return "", err 173 } 174 175 nodes, size, index, err := d.checkCluster() 176 if err != nil { 177 return "", err 178 } 179 180 all, err := d.waitNodes(nodes, size, index) 181 if err != nil { 182 return "", err 183 } 184 185 return nodesToCluster(all, size) 186} 187 188func (d *discovery) getCluster() (string, error) { 189 nodes, size, index, err := d.checkCluster() 190 if err != nil { 191 if err == ErrFullCluster { 192 return nodesToCluster(nodes, size) 193 } 194 return "", err 195 } 196 197 all, err := d.waitNodes(nodes, size, index) 198 if err != nil { 199 return "", err 200 } 201 return nodesToCluster(all, size) 202} 203 204func (d *discovery) createSelf(contents string) error { 205 ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) 206 resp, err := d.c.Create(ctx, d.selfKey(), contents) 207 cancel() 208 if err != nil { 209 if eerr, ok := err.(client.Error); ok && eerr.Code == client.ErrorCodeNodeExist { 210 return ErrDuplicateID 211 } 212 return err 213 } 214 215 // ensure self appears on the server we connected to 216 w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex - 1}) 217 _, err = w.Next(context.Background()) 218 return err 219} 220 221func (d *discovery) checkCluster() ([]*client.Node, uint64, uint64, error) { 222 configKey := path.Join("/", d.cluster, "_config") 223 ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout) 224 // find cluster size 225 resp, err := d.c.Get(ctx, path.Join(configKey, "size"), nil) 226 cancel() 227 if err != nil { 228 if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound { 229 return nil, 0, 0, ErrSizeNotFound 230 } 231 if err == client.ErrInvalidJSON { 232 return nil, 0, 0, ErrBadDiscoveryEndpoint 233 } 234 if ce, ok := err.(*client.ClusterError); ok { 235 if d.lg != nil { 236 d.lg.Warn( 237 "failed to get from discovery server", 238 zap.String("discovery-url", d.url.String()), 239 zap.String("path", path.Join(configKey, "size")), 240 zap.Error(err), 241 zap.String("err-detail", ce.Detail()), 242 ) 243 } else { 244 plog.Error(ce.Detail()) 245 } 246 return d.checkClusterRetry() 247 } 248 return nil, 0, 0, err 249 } 250 size, err := strconv.ParseUint(resp.Node.Value, 10, 0) 251 if err != nil { 252 return nil, 0, 0, ErrBadSizeKey 253 } 254 255 ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout) 256 resp, err = d.c.Get(ctx, d.cluster, nil) 257 cancel() 258 if err != nil { 259 if ce, ok := err.(*client.ClusterError); ok { 260 if d.lg != nil { 261 d.lg.Warn( 262 "failed to get from discovery server", 263 zap.String("discovery-url", d.url.String()), 264 zap.String("path", d.cluster), 265 zap.Error(err), 266 zap.String("err-detail", ce.Detail()), 267 ) 268 } else { 269 plog.Error(ce.Detail()) 270 } 271 return d.checkClusterRetry() 272 } 273 return nil, 0, 0, err 274 } 275 var nodes []*client.Node 276 // append non-config keys to nodes 277 for _, n := range resp.Node.Nodes { 278 if path.Base(n.Key) != path.Base(configKey) { 279 nodes = append(nodes, n) 280 } 281 } 282 283 snodes := sortableNodes{nodes} 284 sort.Sort(snodes) 285 286 // find self position 287 for i := range nodes { 288 if path.Base(nodes[i].Key) == path.Base(d.selfKey()) { 289 break 290 } 291 if uint64(i) >= size-1 { 292 return nodes[:size], size, resp.Index, ErrFullCluster 293 } 294 } 295 return nodes, size, resp.Index, nil 296} 297 298func (d *discovery) logAndBackoffForRetry(step string) { 299 d.retries++ 300 // logAndBackoffForRetry stops exponential backoff when the retries are more than maxExpoentialRetries and is set to a constant backoff afterward. 301 retries := d.retries 302 if retries > maxExpoentialRetries { 303 retries = maxExpoentialRetries 304 } 305 retryTimeInSecond := time.Duration(0x1<<retries) * time.Second 306 if d.lg != nil { 307 d.lg.Info( 308 "retry connecting to discovery service", 309 zap.String("url", d.url.String()), 310 zap.String("reason", step), 311 zap.Duration("backoff", retryTimeInSecond), 312 ) 313 } else { 314 plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTimeInSecond) 315 } 316 d.clock.Sleep(retryTimeInSecond) 317} 318 319func (d *discovery) checkClusterRetry() ([]*client.Node, uint64, uint64, error) { 320 if d.retries < nRetries { 321 d.logAndBackoffForRetry("cluster status check") 322 return d.checkCluster() 323 } 324 return nil, 0, 0, ErrTooManyRetries 325} 326 327func (d *discovery) waitNodesRetry() ([]*client.Node, error) { 328 if d.retries < nRetries { 329 d.logAndBackoffForRetry("waiting for other nodes") 330 nodes, n, index, err := d.checkCluster() 331 if err != nil { 332 return nil, err 333 } 334 return d.waitNodes(nodes, n, index) 335 } 336 return nil, ErrTooManyRetries 337} 338 339func (d *discovery) waitNodes(nodes []*client.Node, size uint64, index uint64) ([]*client.Node, error) { 340 if uint64(len(nodes)) > size { 341 nodes = nodes[:size] 342 } 343 // watch from the next index 344 w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index, Recursive: true}) 345 all := make([]*client.Node, len(nodes)) 346 copy(all, nodes) 347 for _, n := range all { 348 if path.Base(n.Key) == path.Base(d.selfKey()) { 349 if d.lg != nil { 350 d.lg.Info( 351 "found self from discovery server", 352 zap.String("discovery-url", d.url.String()), 353 zap.String("self", path.Base(d.selfKey())), 354 ) 355 } else { 356 plog.Noticef("found self %s in the cluster", path.Base(d.selfKey())) 357 } 358 } else { 359 if d.lg != nil { 360 d.lg.Info( 361 "found peer from discovery server", 362 zap.String("discovery-url", d.url.String()), 363 zap.String("peer", path.Base(n.Key)), 364 ) 365 } else { 366 plog.Noticef("found peer %s in the cluster", path.Base(n.Key)) 367 } 368 } 369 } 370 371 // wait for others 372 for uint64(len(all)) < size { 373 if d.lg != nil { 374 d.lg.Info( 375 "found peers from discovery server; waiting for more", 376 zap.String("discovery-url", d.url.String()), 377 zap.Int("found-peers", len(all)), 378 zap.Int("needed-peers", int(size-uint64(len(all)))), 379 ) 380 } else { 381 plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-uint64(len(all))) 382 } 383 resp, err := w.Next(context.Background()) 384 if err != nil { 385 if ce, ok := err.(*client.ClusterError); ok { 386 plog.Error(ce.Detail()) 387 return d.waitNodesRetry() 388 } 389 return nil, err 390 } 391 if d.lg != nil { 392 d.lg.Info( 393 "found peer from discovery server", 394 zap.String("discovery-url", d.url.String()), 395 zap.String("peer", path.Base(resp.Node.Key)), 396 ) 397 } else { 398 plog.Noticef("found peer %s in the cluster", path.Base(resp.Node.Key)) 399 } 400 all = append(all, resp.Node) 401 } 402 if d.lg != nil { 403 d.lg.Info( 404 "found all needed peers from discovery server", 405 zap.String("discovery-url", d.url.String()), 406 zap.Int("found-peers", len(all)), 407 ) 408 } else { 409 plog.Noticef("found %d needed peer(s)", len(all)) 410 } 411 return all, nil 412} 413 414func (d *discovery) selfKey() string { 415 return path.Join("/", d.cluster, d.id.String()) 416} 417 418func nodesToCluster(ns []*client.Node, size uint64) (string, error) { 419 s := make([]string, len(ns)) 420 for i, n := range ns { 421 s[i] = n.Value 422 } 423 us := strings.Join(s, ",") 424 m, err := types.NewURLsMap(us) 425 if err != nil { 426 return us, ErrInvalidURL 427 } 428 if uint64(m.Len()) != size { 429 return us, ErrDuplicateName 430 } 431 return us, nil 432} 433 434type sortableNodes struct{ Nodes []*client.Node } 435 436func (ns sortableNodes) Len() int { return len(ns.Nodes) } 437func (ns sortableNodes) Less(i, j int) bool { 438 return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex 439} 440func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] } 441