1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package v2discovery provides an implementation of the cluster discovery that
16// is used by etcd with v2 client.
17package v2discovery
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"math"
24	"net/http"
25	"net/url"
26	"path"
27	"sort"
28	"strconv"
29	"strings"
30	"time"
31
32	"go.etcd.io/etcd/client"
33	"go.etcd.io/etcd/pkg/transport"
34	"go.etcd.io/etcd/pkg/types"
35
36	"github.com/coreos/pkg/capnslog"
37	"github.com/jonboulle/clockwork"
38	"go.uber.org/zap"
39)
40
41var (
42	plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "discovery")
43
44	ErrInvalidURL           = errors.New("discovery: invalid URL")
45	ErrBadSizeKey           = errors.New("discovery: size key is bad")
46	ErrSizeNotFound         = errors.New("discovery: size key not found")
47	ErrTokenNotFound        = errors.New("discovery: token not found")
48	ErrDuplicateID          = errors.New("discovery: found duplicate id")
49	ErrDuplicateName        = errors.New("discovery: found duplicate name")
50	ErrFullCluster          = errors.New("discovery: cluster is full")
51	ErrTooManyRetries       = errors.New("discovery: too many retries")
52	ErrBadDiscoveryEndpoint = errors.New("discovery: bad discovery endpoint")
53)
54
55var (
56	// Number of retries discovery will attempt before giving up and erroring out.
57	nRetries             = uint(math.MaxUint32)
58	maxExpoentialRetries = uint(8)
59)
60
61// JoinCluster will connect to the discovery service at the given url, and
62// register the server represented by the given id and config to the cluster
63func JoinCluster(lg *zap.Logger, durl, dproxyurl string, id types.ID, config string) (string, error) {
64	d, err := newDiscovery(lg, durl, dproxyurl, id)
65	if err != nil {
66		return "", err
67	}
68	return d.joinCluster(config)
69}
70
71// GetCluster will connect to the discovery service at the given url and
72// retrieve a string describing the cluster
73func GetCluster(lg *zap.Logger, durl, dproxyurl string) (string, error) {
74	d, err := newDiscovery(lg, durl, dproxyurl, 0)
75	if err != nil {
76		return "", err
77	}
78	return d.getCluster()
79}
80
81type discovery struct {
82	lg      *zap.Logger
83	cluster string
84	id      types.ID
85	c       client.KeysAPI
86	retries uint
87	url     *url.URL
88
89	clock clockwork.Clock
90}
91
92// newProxyFunc builds a proxy function from the given string, which should
93// represent a URL that can be used as a proxy. It performs basic
94// sanitization of the URL and returns any error encountered.
95func newProxyFunc(lg *zap.Logger, proxy string) (func(*http.Request) (*url.URL, error), error) {
96	if proxy == "" {
97		return nil, nil
98	}
99	// Do a small amount of URL sanitization to help the user
100	// Derived from net/http.ProxyFromEnvironment
101	proxyURL, err := url.Parse(proxy)
102	if err != nil || !strings.HasPrefix(proxyURL.Scheme, "http") {
103		// proxy was bogus. Try prepending "http://" to it and
104		// see if that parses correctly. If not, we ignore the
105		// error and complain about the original one
106		var err2 error
107		proxyURL, err2 = url.Parse("http://" + proxy)
108		if err2 == nil {
109			err = nil
110		}
111	}
112	if err != nil {
113		return nil, fmt.Errorf("invalid proxy address %q: %v", proxy, err)
114	}
115
116	if lg != nil {
117		lg.Info("running proxy with discovery", zap.String("proxy-url", proxyURL.String()))
118	} else {
119		plog.Infof("using proxy %q", proxyURL.String())
120	}
121	return http.ProxyURL(proxyURL), nil
122}
123
124func newDiscovery(lg *zap.Logger, durl, dproxyurl string, id types.ID) (*discovery, error) {
125	u, err := url.Parse(durl)
126	if err != nil {
127		return nil, err
128	}
129	token := u.Path
130	u.Path = ""
131	pf, err := newProxyFunc(lg, dproxyurl)
132	if err != nil {
133		return nil, err
134	}
135
136	// TODO: add ResponseHeaderTimeout back when watch on discovery service writes header early
137	tr, err := transport.NewTransport(transport.TLSInfo{}, 30*time.Second)
138	if err != nil {
139		return nil, err
140	}
141	tr.Proxy = pf
142	cfg := client.Config{
143		Transport: tr,
144		Endpoints: []string{u.String()},
145	}
146	c, err := client.New(cfg)
147	if err != nil {
148		return nil, err
149	}
150	dc := client.NewKeysAPIWithPrefix(c, "")
151	return &discovery{
152		lg:      lg,
153		cluster: token,
154		c:       dc,
155		id:      id,
156		url:     u,
157		clock:   clockwork.NewRealClock(),
158	}, nil
159}
160
161func (d *discovery) joinCluster(config string) (string, error) {
162	// fast path: if the cluster is full, return the error
163	// do not need to register to the cluster in this case.
164	if _, _, _, err := d.checkCluster(); err != nil {
165		return "", err
166	}
167
168	if err := d.createSelf(config); err != nil {
169		// Fails, even on a timeout, if createSelf times out.
170		// TODO(barakmich): Retrying the same node might want to succeed here
171		// (ie, createSelf should be idempotent for discovery).
172		return "", err
173	}
174
175	nodes, size, index, err := d.checkCluster()
176	if err != nil {
177		return "", err
178	}
179
180	all, err := d.waitNodes(nodes, size, index)
181	if err != nil {
182		return "", err
183	}
184
185	return nodesToCluster(all, size)
186}
187
188func (d *discovery) getCluster() (string, error) {
189	nodes, size, index, err := d.checkCluster()
190	if err != nil {
191		if err == ErrFullCluster {
192			return nodesToCluster(nodes, size)
193		}
194		return "", err
195	}
196
197	all, err := d.waitNodes(nodes, size, index)
198	if err != nil {
199		return "", err
200	}
201	return nodesToCluster(all, size)
202}
203
204func (d *discovery) createSelf(contents string) error {
205	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
206	resp, err := d.c.Create(ctx, d.selfKey(), contents)
207	cancel()
208	if err != nil {
209		if eerr, ok := err.(client.Error); ok && eerr.Code == client.ErrorCodeNodeExist {
210			return ErrDuplicateID
211		}
212		return err
213	}
214
215	// ensure self appears on the server we connected to
216	w := d.c.Watcher(d.selfKey(), &client.WatcherOptions{AfterIndex: resp.Node.CreatedIndex - 1})
217	_, err = w.Next(context.Background())
218	return err
219}
220
221func (d *discovery) checkCluster() ([]*client.Node, uint64, uint64, error) {
222	configKey := path.Join("/", d.cluster, "_config")
223	ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
224	// find cluster size
225	resp, err := d.c.Get(ctx, path.Join(configKey, "size"), nil)
226	cancel()
227	if err != nil {
228		if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound {
229			return nil, 0, 0, ErrSizeNotFound
230		}
231		if err == client.ErrInvalidJSON {
232			return nil, 0, 0, ErrBadDiscoveryEndpoint
233		}
234		if ce, ok := err.(*client.ClusterError); ok {
235			if d.lg != nil {
236				d.lg.Warn(
237					"failed to get from discovery server",
238					zap.String("discovery-url", d.url.String()),
239					zap.String("path", path.Join(configKey, "size")),
240					zap.Error(err),
241					zap.String("err-detail", ce.Detail()),
242				)
243			} else {
244				plog.Error(ce.Detail())
245			}
246			return d.checkClusterRetry()
247		}
248		return nil, 0, 0, err
249	}
250	size, err := strconv.ParseUint(resp.Node.Value, 10, 0)
251	if err != nil {
252		return nil, 0, 0, ErrBadSizeKey
253	}
254
255	ctx, cancel = context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
256	resp, err = d.c.Get(ctx, d.cluster, nil)
257	cancel()
258	if err != nil {
259		if ce, ok := err.(*client.ClusterError); ok {
260			if d.lg != nil {
261				d.lg.Warn(
262					"failed to get from discovery server",
263					zap.String("discovery-url", d.url.String()),
264					zap.String("path", d.cluster),
265					zap.Error(err),
266					zap.String("err-detail", ce.Detail()),
267				)
268			} else {
269				plog.Error(ce.Detail())
270			}
271			return d.checkClusterRetry()
272		}
273		return nil, 0, 0, err
274	}
275	var nodes []*client.Node
276	// append non-config keys to nodes
277	for _, n := range resp.Node.Nodes {
278		if path.Base(n.Key) != path.Base(configKey) {
279			nodes = append(nodes, n)
280		}
281	}
282
283	snodes := sortableNodes{nodes}
284	sort.Sort(snodes)
285
286	// find self position
287	for i := range nodes {
288		if path.Base(nodes[i].Key) == path.Base(d.selfKey()) {
289			break
290		}
291		if uint64(i) >= size-1 {
292			return nodes[:size], size, resp.Index, ErrFullCluster
293		}
294	}
295	return nodes, size, resp.Index, nil
296}
297
298func (d *discovery) logAndBackoffForRetry(step string) {
299	d.retries++
300	// logAndBackoffForRetry stops exponential backoff when the retries are more than maxExpoentialRetries and is set to a constant backoff afterward.
301	retries := d.retries
302	if retries > maxExpoentialRetries {
303		retries = maxExpoentialRetries
304	}
305	retryTimeInSecond := time.Duration(0x1<<retries) * time.Second
306	if d.lg != nil {
307		d.lg.Info(
308			"retry connecting to discovery service",
309			zap.String("url", d.url.String()),
310			zap.String("reason", step),
311			zap.Duration("backoff", retryTimeInSecond),
312		)
313	} else {
314		plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTimeInSecond)
315	}
316	d.clock.Sleep(retryTimeInSecond)
317}
318
319func (d *discovery) checkClusterRetry() ([]*client.Node, uint64, uint64, error) {
320	if d.retries < nRetries {
321		d.logAndBackoffForRetry("cluster status check")
322		return d.checkCluster()
323	}
324	return nil, 0, 0, ErrTooManyRetries
325}
326
327func (d *discovery) waitNodesRetry() ([]*client.Node, error) {
328	if d.retries < nRetries {
329		d.logAndBackoffForRetry("waiting for other nodes")
330		nodes, n, index, err := d.checkCluster()
331		if err != nil {
332			return nil, err
333		}
334		return d.waitNodes(nodes, n, index)
335	}
336	return nil, ErrTooManyRetries
337}
338
339func (d *discovery) waitNodes(nodes []*client.Node, size uint64, index uint64) ([]*client.Node, error) {
340	if uint64(len(nodes)) > size {
341		nodes = nodes[:size]
342	}
343	// watch from the next index
344	w := d.c.Watcher(d.cluster, &client.WatcherOptions{AfterIndex: index, Recursive: true})
345	all := make([]*client.Node, len(nodes))
346	copy(all, nodes)
347	for _, n := range all {
348		if path.Base(n.Key) == path.Base(d.selfKey()) {
349			if d.lg != nil {
350				d.lg.Info(
351					"found self from discovery server",
352					zap.String("discovery-url", d.url.String()),
353					zap.String("self", path.Base(d.selfKey())),
354				)
355			} else {
356				plog.Noticef("found self %s in the cluster", path.Base(d.selfKey()))
357			}
358		} else {
359			if d.lg != nil {
360				d.lg.Info(
361					"found peer from discovery server",
362					zap.String("discovery-url", d.url.String()),
363					zap.String("peer", path.Base(n.Key)),
364				)
365			} else {
366				plog.Noticef("found peer %s in the cluster", path.Base(n.Key))
367			}
368		}
369	}
370
371	// wait for others
372	for uint64(len(all)) < size {
373		if d.lg != nil {
374			d.lg.Info(
375				"found peers from discovery server; waiting for more",
376				zap.String("discovery-url", d.url.String()),
377				zap.Int("found-peers", len(all)),
378				zap.Int("needed-peers", int(size-uint64(len(all)))),
379			)
380		} else {
381			plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-uint64(len(all)))
382		}
383		resp, err := w.Next(context.Background())
384		if err != nil {
385			if ce, ok := err.(*client.ClusterError); ok {
386				plog.Error(ce.Detail())
387				return d.waitNodesRetry()
388			}
389			return nil, err
390		}
391		if d.lg != nil {
392			d.lg.Info(
393				"found peer from discovery server",
394				zap.String("discovery-url", d.url.String()),
395				zap.String("peer", path.Base(resp.Node.Key)),
396			)
397		} else {
398			plog.Noticef("found peer %s in the cluster", path.Base(resp.Node.Key))
399		}
400		all = append(all, resp.Node)
401	}
402	if d.lg != nil {
403		d.lg.Info(
404			"found all needed peers from discovery server",
405			zap.String("discovery-url", d.url.String()),
406			zap.Int("found-peers", len(all)),
407		)
408	} else {
409		plog.Noticef("found %d needed peer(s)", len(all))
410	}
411	return all, nil
412}
413
414func (d *discovery) selfKey() string {
415	return path.Join("/", d.cluster, d.id.String())
416}
417
418func nodesToCluster(ns []*client.Node, size uint64) (string, error) {
419	s := make([]string, len(ns))
420	for i, n := range ns {
421		s[i] = n.Value
422	}
423	us := strings.Join(s, ",")
424	m, err := types.NewURLsMap(us)
425	if err != nil {
426		return us, ErrInvalidURL
427	}
428	if uint64(m.Len()) != size {
429		return us, ErrDuplicateName
430	}
431	return us, nil
432}
433
434type sortableNodes struct{ Nodes []*client.Node }
435
436func (ns sortableNodes) Len() int { return len(ns.Nodes) }
437func (ns sortableNodes) Less(i, j int) bool {
438	return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
439}
440func (ns sortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }
441