1package cluster
2
3import (
4	"fmt"
5	"net"
6	"strings"
7	"time"
8
9	apitypes "github.com/docker/docker/api/types"
10	"github.com/docker/docker/api/types/filters"
11	types "github.com/docker/docker/api/types/swarm"
12	"github.com/docker/docker/daemon/cluster/convert"
13	"github.com/docker/docker/opts"
14	"github.com/docker/docker/pkg/signal"
15	swarmapi "github.com/docker/swarmkit/api"
16	"github.com/docker/swarmkit/manager/encryption"
17	swarmnode "github.com/docker/swarmkit/node"
18	"github.com/pkg/errors"
19	"github.com/sirupsen/logrus"
20	"golang.org/x/net/context"
21)
22
23// Init initializes new cluster from user provided request.
24func (c *Cluster) Init(req types.InitRequest) (string, error) {
25	c.controlMutex.Lock()
26	defer c.controlMutex.Unlock()
27	if c.nr != nil {
28		if req.ForceNewCluster {
29
30			// Take c.mu temporarily to wait for presently running
31			// API handlers to finish before shutting down the node.
32			c.mu.Lock()
33			if !c.nr.nodeState.IsManager() {
34				return "", errSwarmNotManager
35			}
36			c.mu.Unlock()
37
38			if err := c.nr.Stop(); err != nil {
39				return "", err
40			}
41		} else {
42			return "", errSwarmExists
43		}
44	}
45
46	if err := validateAndSanitizeInitRequest(&req); err != nil {
47		return "", validationError{err}
48	}
49
50	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
51	if err != nil {
52		return "", err
53	}
54
55	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
56	if err != nil {
57		return "", err
58	}
59
60	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
61	if err != nil {
62		return "", err
63	}
64
65	localAddr := listenHost
66
67	// If the local address is undetermined, the advertise address
68	// will be used as local address, if it belongs to this system.
69	// If the advertise address is not local, then we try to find
70	// a system address to use as local address. If this fails,
71	// we give up and ask the user to pass the listen address.
72	if net.ParseIP(localAddr).IsUnspecified() {
73		advertiseIP := net.ParseIP(advertiseHost)
74
75		found := false
76		for _, systemIP := range listSystemIPs() {
77			if systemIP.Equal(advertiseIP) {
78				localAddr = advertiseIP.String()
79				found = true
80				break
81			}
82		}
83
84		if !found {
85			ip, err := c.resolveSystemAddr()
86			if err != nil {
87				logrus.Warnf("Could not find a local address: %v", err)
88				return "", errMustSpecifyListenAddr
89			}
90			localAddr = ip.String()
91		}
92	}
93
94	nr, err := c.newNodeRunner(nodeStartConfig{
95		forceNewCluster: req.ForceNewCluster,
96		autolock:        req.AutoLockManagers,
97		LocalAddr:       localAddr,
98		ListenAddr:      net.JoinHostPort(listenHost, listenPort),
99		AdvertiseAddr:   net.JoinHostPort(advertiseHost, advertisePort),
100		DataPathAddr:    dataPathAddr,
101		availability:    req.Availability,
102	})
103	if err != nil {
104		return "", err
105	}
106	c.mu.Lock()
107	c.nr = nr
108	c.mu.Unlock()
109
110	if err := <-nr.Ready(); err != nil {
111		c.mu.Lock()
112		c.nr = nil
113		c.mu.Unlock()
114		if !req.ForceNewCluster { // if failure on first attempt don't keep state
115			if err := clearPersistentState(c.root); err != nil {
116				return "", err
117			}
118		}
119		return "", err
120	}
121	state := nr.State()
122	if state.swarmNode == nil { // should never happen but protect from panic
123		return "", errors.New("invalid cluster state for spec initialization")
124	}
125	if err := initClusterSpec(state.swarmNode, req.Spec); err != nil {
126		return "", err
127	}
128	return state.NodeID(), nil
129}
130
131// Join makes current Cluster part of an existing swarm cluster.
132func (c *Cluster) Join(req types.JoinRequest) error {
133	c.controlMutex.Lock()
134	defer c.controlMutex.Unlock()
135	c.mu.Lock()
136	if c.nr != nil {
137		c.mu.Unlock()
138		return errors.WithStack(errSwarmExists)
139	}
140	c.mu.Unlock()
141
142	if err := validateAndSanitizeJoinRequest(&req); err != nil {
143		return validationError{err}
144	}
145
146	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
147	if err != nil {
148		return err
149	}
150
151	var advertiseAddr string
152	if req.AdvertiseAddr != "" {
153		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
154		// For joining, we don't need to provide an advertise address,
155		// since the remote side can detect it.
156		if err == nil {
157			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
158		}
159	}
160
161	dataPathAddr, err := resolveDataPathAddr(req.DataPathAddr)
162	if err != nil {
163		return err
164	}
165
166	nr, err := c.newNodeRunner(nodeStartConfig{
167		RemoteAddr:    req.RemoteAddrs[0],
168		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
169		AdvertiseAddr: advertiseAddr,
170		DataPathAddr:  dataPathAddr,
171		joinAddr:      req.RemoteAddrs[0],
172		joinToken:     req.JoinToken,
173		availability:  req.Availability,
174	})
175	if err != nil {
176		return err
177	}
178
179	c.mu.Lock()
180	c.nr = nr
181	c.mu.Unlock()
182
183	select {
184	case <-time.After(swarmConnectTimeout):
185		return errSwarmJoinTimeoutReached
186	case err := <-nr.Ready():
187		if err != nil {
188			c.mu.Lock()
189			c.nr = nil
190			c.mu.Unlock()
191			if err := clearPersistentState(c.root); err != nil {
192				return err
193			}
194		}
195		return err
196	}
197}
198
199// Inspect retrieves the configuration properties of a managed swarm cluster.
200func (c *Cluster) Inspect() (types.Swarm, error) {
201	var swarm types.Swarm
202	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
203		s, err := c.inspect(ctx, state)
204		if err != nil {
205			return err
206		}
207		swarm = s
208		return nil
209	}); err != nil {
210		return types.Swarm{}, err
211	}
212	return swarm, nil
213}
214
215func (c *Cluster) inspect(ctx context.Context, state nodeState) (types.Swarm, error) {
216	s, err := getSwarm(ctx, state.controlClient)
217	if err != nil {
218		return types.Swarm{}, err
219	}
220	return convert.SwarmFromGRPC(*s), nil
221}
222
223// Update updates configuration of a managed swarm cluster.
224func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
225	return c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
226		swarm, err := getSwarm(ctx, state.controlClient)
227		if err != nil {
228			return err
229		}
230
231		// Validate spec name.
232		if spec.Annotations.Name == "" {
233			spec.Annotations.Name = "default"
234		} else if spec.Annotations.Name != "default" {
235			return validationError{errors.New(`swarm spec must be named "default"`)}
236		}
237
238		// In update, client should provide the complete spec of the swarm, including
239		// Name and Labels. If a field is specified with 0 or nil, then the default value
240		// will be used to swarmkit.
241		clusterSpec, err := convert.SwarmSpecToGRPC(spec)
242		if err != nil {
243			return convertError{err}
244		}
245
246		_, err = state.controlClient.UpdateCluster(
247			ctx,
248			&swarmapi.UpdateClusterRequest{
249				ClusterID: swarm.ID,
250				Spec:      &clusterSpec,
251				ClusterVersion: &swarmapi.Version{
252					Index: version,
253				},
254				Rotation: swarmapi.KeyRotation{
255					WorkerJoinToken:  flags.RotateWorkerToken,
256					ManagerJoinToken: flags.RotateManagerToken,
257					ManagerUnlockKey: flags.RotateManagerUnlockKey,
258				},
259			},
260		)
261		return err
262	})
263}
264
265// GetUnlockKey returns the unlock key for the swarm.
266func (c *Cluster) GetUnlockKey() (string, error) {
267	var resp *swarmapi.GetUnlockKeyResponse
268	if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error {
269		client := swarmapi.NewCAClient(state.grpcConn)
270
271		r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
272		if err != nil {
273			return err
274		}
275		resp = r
276		return nil
277	}); err != nil {
278		return "", err
279	}
280	if len(resp.UnlockKey) == 0 {
281		// no key
282		return "", nil
283	}
284	return encryption.HumanReadableKey(resp.UnlockKey), nil
285}
286
287// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
288func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
289	c.controlMutex.Lock()
290	defer c.controlMutex.Unlock()
291
292	c.mu.RLock()
293	state := c.currentNodeState()
294
295	if !state.IsActiveManager() {
296		// when manager is not active,
297		// unless it is locked, otherwise return error.
298		if err := c.errNoManager(state); err != errSwarmLocked {
299			c.mu.RUnlock()
300			return err
301		}
302	} else {
303		// when manager is active, return an error of "not locked"
304		c.mu.RUnlock()
305		return notLockedError{}
306	}
307
308	// only when swarm is locked, code running reaches here
309	nr := c.nr
310	c.mu.RUnlock()
311
312	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
313	if err != nil {
314		return validationError{err}
315	}
316
317	config := nr.config
318	config.lockKey = key
319	if err := nr.Stop(); err != nil {
320		return err
321	}
322	nr, err = c.newNodeRunner(config)
323	if err != nil {
324		return err
325	}
326
327	c.mu.Lock()
328	c.nr = nr
329	c.mu.Unlock()
330
331	if err := <-nr.Ready(); err != nil {
332		if errors.Cause(err) == errSwarmLocked {
333			return invalidUnlockKey{}
334		}
335		return errors.Errorf("swarm component could not be started: %v", err)
336	}
337	return nil
338}
339
340// Leave shuts down Cluster and removes current state.
341func (c *Cluster) Leave(force bool) error {
342	c.controlMutex.Lock()
343	defer c.controlMutex.Unlock()
344
345	c.mu.Lock()
346	nr := c.nr
347	if nr == nil {
348		c.mu.Unlock()
349		return errors.WithStack(errNoSwarm)
350	}
351
352	state := c.currentNodeState()
353
354	c.mu.Unlock()
355
356	if errors.Cause(state.err) == errSwarmLocked && !force {
357		// leave a locked swarm without --force is not allowed
358		return errors.WithStack(notAvailableError("Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message."))
359	}
360
361	if state.IsManager() && !force {
362		msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
363		if state.IsActiveManager() {
364			active, reachable, unreachable, err := managerStats(state.controlClient, state.NodeID())
365			if err == nil {
366				if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
367					if isLastManager(reachable, unreachable) {
368						msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
369						return errors.WithStack(notAvailableError(msg))
370					}
371					msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
372				}
373			}
374		} else {
375			msg += "Doing so may lose the consensus of your cluster. "
376		}
377
378		msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
379		return errors.WithStack(notAvailableError(msg))
380	}
381	// release readers in here
382	if err := nr.Stop(); err != nil {
383		logrus.Errorf("failed to shut down cluster node: %v", err)
384		signal.DumpStacks("")
385		return err
386	}
387
388	c.mu.Lock()
389	c.nr = nil
390	c.mu.Unlock()
391
392	if nodeID := state.NodeID(); nodeID != "" {
393		nodeContainers, err := c.listContainerForNode(nodeID)
394		if err != nil {
395			return err
396		}
397		for _, id := range nodeContainers {
398			if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
399				logrus.Errorf("error removing %v: %v", id, err)
400			}
401		}
402	}
403
404	// todo: cleanup optional?
405	if err := clearPersistentState(c.root); err != nil {
406		return err
407	}
408	c.config.Backend.DaemonLeavesCluster()
409	return nil
410}
411
412// Info returns information about the current cluster state.
413func (c *Cluster) Info() types.Info {
414	info := types.Info{
415		NodeAddr: c.GetAdvertiseAddress(),
416	}
417	c.mu.RLock()
418	defer c.mu.RUnlock()
419
420	state := c.currentNodeState()
421	info.LocalNodeState = state.status
422	if state.err != nil {
423		info.Error = state.err.Error()
424	}
425
426	ctx, cancel := c.getRequestContext()
427	defer cancel()
428
429	if state.IsActiveManager() {
430		info.ControlAvailable = true
431		swarm, err := c.inspect(ctx, state)
432		if err != nil {
433			info.Error = err.Error()
434		}
435
436		info.Cluster = &swarm.ClusterInfo
437
438		if r, err := state.controlClient.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err != nil {
439			info.Error = err.Error()
440		} else {
441			info.Nodes = len(r.Nodes)
442			for _, n := range r.Nodes {
443				if n.ManagerStatus != nil {
444					info.Managers = info.Managers + 1
445				}
446			}
447		}
448	}
449
450	if state.swarmNode != nil {
451		for _, r := range state.swarmNode.Remotes() {
452			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
453		}
454		info.NodeID = state.swarmNode.NodeID()
455	}
456
457	return info
458}
459
460func validateAndSanitizeInitRequest(req *types.InitRequest) error {
461	var err error
462	req.ListenAddr, err = validateAddr(req.ListenAddr)
463	if err != nil {
464		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
465	}
466
467	if req.Spec.Annotations.Name == "" {
468		req.Spec.Annotations.Name = "default"
469	} else if req.Spec.Annotations.Name != "default" {
470		return errors.New(`swarm spec must be named "default"`)
471	}
472
473	return nil
474}
475
476func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
477	var err error
478	req.ListenAddr, err = validateAddr(req.ListenAddr)
479	if err != nil {
480		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
481	}
482	if len(req.RemoteAddrs) == 0 {
483		return errors.New("at least 1 RemoteAddr is required to join")
484	}
485	for i := range req.RemoteAddrs {
486		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
487		if err != nil {
488			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
489		}
490	}
491	return nil
492}
493
494func validateAddr(addr string) (string, error) {
495	if addr == "" {
496		return addr, errors.New("invalid empty address")
497	}
498	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
499	if err != nil {
500		return addr, nil
501	}
502	return strings.TrimPrefix(newaddr, "tcp://"), nil
503}
504
505func initClusterSpec(node *swarmnode.Node, spec types.Spec) error {
506	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
507	for conn := range node.ListenControlSocket(ctx) {
508		if ctx.Err() != nil {
509			return ctx.Err()
510		}
511		if conn != nil {
512			client := swarmapi.NewControlClient(conn)
513			var cluster *swarmapi.Cluster
514			for i := 0; ; i++ {
515				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
516				if err != nil {
517					return fmt.Errorf("error on listing clusters: %v", err)
518				}
519				if len(lcr.Clusters) == 0 {
520					if i < 10 {
521						time.Sleep(200 * time.Millisecond)
522						continue
523					}
524					return errors.New("empty list of clusters was returned")
525				}
526				cluster = lcr.Clusters[0]
527				break
528			}
529			// In init, we take the initial default values from swarmkit, and merge
530			// any non nil or 0 value from spec to GRPC spec. This will leave the
531			// default value alone.
532			// Note that this is different from Update(), as in Update() we expect
533			// user to specify the complete spec of the cluster (as they already know
534			// the existing one and knows which field to update)
535			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
536			if err != nil {
537				return fmt.Errorf("error updating cluster settings: %v", err)
538			}
539			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
540				ClusterID:      cluster.ID,
541				ClusterVersion: &cluster.Meta.Version,
542				Spec:           &clusterSpec,
543			})
544			if err != nil {
545				return fmt.Errorf("error updating cluster settings: %v", err)
546			}
547			return nil
548		}
549	}
550	return ctx.Err()
551}
552
553func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
554	var ids []string
555	filters := filters.NewArgs()
556	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
557	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
558		Filters: filters,
559	})
560	if err != nil {
561		return []string{}, err
562	}
563	for _, c := range containers {
564		ids = append(ids, c.ID)
565	}
566	return ids, nil
567}
568