1package cluster
2
3import (
4	"crypto/x509"
5	"encoding/base64"
6	"encoding/json"
7	"fmt"
8	"io"
9	"io/ioutil"
10	"net"
11	"os"
12	"path/filepath"
13	"runtime"
14	"strings"
15	"sync"
16	"time"
17
18	"github.com/sirupsen/logrus"
19	"github.com/docker/distribution/digest"
20	distreference "github.com/docker/distribution/reference"
21	apierrors "github.com/docker/docker/api/errors"
22	apitypes "github.com/docker/docker/api/types"
23	"github.com/docker/docker/api/types/backend"
24	"github.com/docker/docker/api/types/filters"
25	"github.com/docker/docker/api/types/network"
26	types "github.com/docker/docker/api/types/swarm"
27	"github.com/docker/docker/daemon/cluster/convert"
28	executorpkg "github.com/docker/docker/daemon/cluster/executor"
29	"github.com/docker/docker/daemon/cluster/executor/container"
30	"github.com/docker/docker/daemon/logger"
31	"github.com/docker/docker/opts"
32	"github.com/docker/docker/pkg/ioutils"
33	"github.com/docker/docker/pkg/signal"
34	"github.com/docker/docker/pkg/stdcopy"
35	"github.com/docker/docker/reference"
36	"github.com/docker/docker/runconfig"
37	swarmapi "github.com/docker/swarmkit/api"
38	"github.com/docker/swarmkit/manager/encryption"
39	swarmnode "github.com/docker/swarmkit/node"
40	"github.com/docker/swarmkit/protobuf/ptypes"
41	"github.com/pkg/errors"
42	"golang.org/x/net/context"
43	"google.golang.org/grpc"
44)
45
46const swarmDirName = "swarm"
47const controlSocket = "control.sock"
48const swarmConnectTimeout = 20 * time.Second
49const swarmRequestTimeout = 20 * time.Second
50const stateFile = "docker-state.json"
51const defaultAddr = "0.0.0.0:2377"
52
53const (
54	initialReconnectDelay = 100 * time.Millisecond
55	maxReconnectDelay     = 30 * time.Second
56	contextPrefix         = "com.docker.swarm"
57)
58
59// ErrNoSwarm is returned on leaving a cluster that was never initialized
60var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm")
61
62// ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
63var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.")
64
65// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
66var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.")
67
68// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
69var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.")
70
71// ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it.
72var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.")
73
74// ErrSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically.
75var ErrSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.")
76
77// NetworkSubnetsProvider exposes functions for retrieving the subnets
78// of networks managed by Docker, so they can be filtered.
79type NetworkSubnetsProvider interface {
80	V4Subnets() []net.IPNet
81	V6Subnets() []net.IPNet
82}
83
84// Config provides values for Cluster.
85type Config struct {
86	Root                   string
87	Name                   string
88	Backend                executorpkg.Backend
89	NetworkSubnetsProvider NetworkSubnetsProvider
90
91	// DefaultAdvertiseAddr is the default host/IP or network interface to use
92	// if no AdvertiseAddr value is specified.
93	DefaultAdvertiseAddr string
94
95	// path to store runtime state, such as the swarm control socket
96	RuntimeRoot string
97}
98
99// Cluster provides capabilities to participate in a cluster as a worker or a
100// manager.
101type Cluster struct {
102	sync.RWMutex
103	*node
104	root            string
105	runtimeRoot     string
106	config          Config
107	configEvent     chan struct{} // todo: make this array and goroutine safe
108	actualLocalAddr string        // after resolution, not persisted
109	stop            bool
110	err             error
111	cancelDelay     func()
112	attachers       map[string]*attacher
113	locked          bool
114	lastNodeConfig  *nodeStartConfig
115}
116
117// attacher manages the in-memory attachment state of a container
118// attachment to a global scope network managed by swarm manager. It
119// helps in identifying the attachment ID via the taskID and the
120// corresponding attachment configuration obtained from the manager.
121type attacher struct {
122	taskID           string
123	config           *network.NetworkingConfig
124	attachWaitCh     chan *network.NetworkingConfig
125	attachCompleteCh chan struct{}
126	detachWaitCh     chan struct{}
127}
128
129type node struct {
130	*swarmnode.Node
131	done           chan struct{}
132	ready          bool
133	conn           *grpc.ClientConn
134	client         swarmapi.ControlClient
135	logs           swarmapi.LogsClient
136	reconnectDelay time.Duration
137	config         nodeStartConfig
138}
139
140// nodeStartConfig holds configuration needed to start a new node. Exported
141// fields of this structure are saved to disk in json. Unexported fields
142// contain data that shouldn't be persisted between daemon reloads.
143type nodeStartConfig struct {
144	// LocalAddr is this machine's local IP or hostname, if specified.
145	LocalAddr string
146	// RemoteAddr is the address that was given to "swarm join". It is used
147	// to find LocalAddr if necessary.
148	RemoteAddr string
149	// ListenAddr is the address we bind to, including a port.
150	ListenAddr string
151	// AdvertiseAddr is the address other nodes should connect to,
152	// including a port.
153	AdvertiseAddr   string
154	joinAddr        string
155	forceNewCluster bool
156	joinToken       string
157	lockKey         []byte
158	autolock        bool
159}
160
161// New creates a new Cluster instance using provided config.
162func New(config Config) (*Cluster, error) {
163	root := filepath.Join(config.Root, swarmDirName)
164	if err := os.MkdirAll(root, 0700); err != nil {
165		return nil, err
166	}
167	if config.RuntimeRoot == "" {
168		config.RuntimeRoot = root
169	}
170	if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil {
171		return nil, err
172	}
173	c := &Cluster{
174		root:        root,
175		config:      config,
176		configEvent: make(chan struct{}, 10),
177		runtimeRoot: config.RuntimeRoot,
178		attachers:   make(map[string]*attacher),
179	}
180
181	nodeConfig, err := c.loadState()
182	if err != nil {
183		if os.IsNotExist(err) {
184			return c, nil
185		}
186		return nil, err
187	}
188
189	n, err := c.startNewNode(*nodeConfig)
190	if err != nil {
191		return nil, err
192	}
193
194	select {
195	case <-time.After(swarmConnectTimeout):
196		logrus.Error("swarm component could not be started before timeout was reached")
197	case <-n.Ready():
198	case <-n.done:
199		if errors.Cause(c.err) == ErrSwarmLocked {
200			return c, nil
201		}
202		if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired {
203			c.err = ErrSwarmCertificatesExpired
204			return c, nil
205		}
206		return nil, fmt.Errorf("swarm component could not be started: %v", c.err)
207	}
208	go c.reconnectOnFailure(n)
209	return c, nil
210}
211
212func (c *Cluster) loadState() (*nodeStartConfig, error) {
213	dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile))
214	if err != nil {
215		return nil, err
216	}
217	// missing certificate means no actual state to restore from
218	if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil {
219		if os.IsNotExist(err) {
220			c.clearState()
221		}
222		return nil, err
223	}
224	var st nodeStartConfig
225	if err := json.Unmarshal(dt, &st); err != nil {
226		return nil, err
227	}
228	return &st, nil
229}
230
231func (c *Cluster) saveState(config nodeStartConfig) error {
232	dt, err := json.Marshal(config)
233	if err != nil {
234		return err
235	}
236	return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600)
237}
238
239func (c *Cluster) reconnectOnFailure(n *node) {
240	for {
241		<-n.done
242		c.Lock()
243		if c.stop || c.node != nil {
244			c.Unlock()
245			return
246		}
247		n.reconnectDelay *= 2
248		if n.reconnectDelay > maxReconnectDelay {
249			n.reconnectDelay = maxReconnectDelay
250		}
251		logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds())
252		delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay)
253		c.cancelDelay = cancel
254		c.Unlock()
255		<-delayCtx.Done()
256		if delayCtx.Err() != context.DeadlineExceeded {
257			return
258		}
259		c.Lock()
260		if c.node != nil {
261			c.Unlock()
262			return
263		}
264		var err error
265		config := n.config
266		config.RemoteAddr = c.getRemoteAddress()
267		config.joinAddr = config.RemoteAddr
268		n, err = c.startNewNode(config)
269		if err != nil {
270			c.err = err
271			close(n.done)
272		}
273		c.Unlock()
274	}
275}
276
277func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) {
278	if err := c.config.Backend.IsSwarmCompatible(); err != nil {
279		return nil, err
280	}
281
282	actualLocalAddr := conf.LocalAddr
283	if actualLocalAddr == "" {
284		// If localAddr was not specified, resolve it automatically
285		// based on the route to joinAddr. localAddr can only be left
286		// empty on "join".
287		listenHost, _, err := net.SplitHostPort(conf.ListenAddr)
288		if err != nil {
289			return nil, fmt.Errorf("could not parse listen address: %v", err)
290		}
291
292		listenAddrIP := net.ParseIP(listenHost)
293		if listenAddrIP == nil || !listenAddrIP.IsUnspecified() {
294			actualLocalAddr = listenHost
295		} else {
296			if conf.RemoteAddr == "" {
297				// Should never happen except using swarms created by
298				// old versions that didn't save remoteAddr.
299				conf.RemoteAddr = "8.8.8.8:53"
300			}
301			conn, err := net.Dial("udp", conf.RemoteAddr)
302			if err != nil {
303				return nil, fmt.Errorf("could not find local IP address: %v", err)
304			}
305			localHostPort := conn.LocalAddr().String()
306			actualLocalAddr, _, _ = net.SplitHostPort(localHostPort)
307			conn.Close()
308		}
309	}
310
311	var control string
312	if runtime.GOOS == "windows" {
313		control = `\\.\pipe\` + controlSocket
314	} else {
315		control = filepath.Join(c.runtimeRoot, controlSocket)
316	}
317
318	c.node = nil
319	c.cancelDelay = nil
320	c.stop = false
321	n, err := swarmnode.New(&swarmnode.Config{
322		Hostname:           c.config.Name,
323		ForceNewCluster:    conf.forceNewCluster,
324		ListenControlAPI:   control,
325		ListenRemoteAPI:    conf.ListenAddr,
326		AdvertiseRemoteAPI: conf.AdvertiseAddr,
327		JoinAddr:           conf.joinAddr,
328		StateDir:           c.root,
329		JoinToken:          conf.joinToken,
330		Executor:           container.NewExecutor(c.config.Backend),
331		HeartbeatTick:      1,
332		ElectionTick:       3,
333		UnlockKey:          conf.lockKey,
334		AutoLockManagers:   conf.autolock,
335	})
336
337	if err != nil {
338		return nil, err
339	}
340	ctx := context.Background()
341	if err := n.Start(ctx); err != nil {
342		return nil, err
343	}
344	node := &node{
345		Node:           n,
346		done:           make(chan struct{}),
347		reconnectDelay: initialReconnectDelay,
348		config:         conf,
349	}
350	c.node = node
351	c.actualLocalAddr = actualLocalAddr // not saved
352	c.saveState(conf)
353
354	c.config.Backend.SetClusterProvider(c)
355	go func() {
356		err := detectLockedError(n.Err(ctx))
357		if err != nil {
358			logrus.Errorf("cluster exited with error: %v", err)
359		}
360		c.Lock()
361		c.node = nil
362		c.err = err
363		if errors.Cause(err) == ErrSwarmLocked {
364			c.locked = true
365			confClone := conf
366			c.lastNodeConfig = &confClone
367		}
368		c.Unlock()
369		close(node.done)
370	}()
371
372	go func() {
373		select {
374		case <-n.Ready():
375			c.Lock()
376			node.ready = true
377			c.err = nil
378			c.Unlock()
379		case <-ctx.Done():
380		}
381		c.configEvent <- struct{}{}
382	}()
383
384	go func() {
385		for conn := range n.ListenControlSocket(ctx) {
386			c.Lock()
387			if node.conn != conn {
388				if conn == nil {
389					node.client = nil
390					node.logs = nil
391				} else {
392					node.client = swarmapi.NewControlClient(conn)
393					node.logs = swarmapi.NewLogsClient(conn)
394				}
395			}
396			node.conn = conn
397			c.Unlock()
398			c.configEvent <- struct{}{}
399		}
400	}()
401
402	return node, nil
403}
404
405// Init initializes new cluster from user provided request.
406func (c *Cluster) Init(req types.InitRequest) (string, error) {
407	c.Lock()
408	if c.swarmExists() {
409		if !req.ForceNewCluster {
410			c.Unlock()
411			return "", ErrSwarmExists
412		}
413		if err := c.stopNode(); err != nil {
414			c.Unlock()
415			return "", err
416		}
417	}
418
419	if err := validateAndSanitizeInitRequest(&req); err != nil {
420		c.Unlock()
421		return "", err
422	}
423
424	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
425	if err != nil {
426		c.Unlock()
427		return "", err
428	}
429
430	advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
431	if err != nil {
432		c.Unlock()
433		return "", err
434	}
435
436	localAddr := listenHost
437
438	// If the local address is undetermined, the advertise address
439	// will be used as local address, if it belongs to this system.
440	// If the advertise address is not local, then we try to find
441	// a system address to use as local address. If this fails,
442	// we give up and ask user to pass the listen address.
443	if net.ParseIP(localAddr).IsUnspecified() {
444		advertiseIP := net.ParseIP(advertiseHost)
445
446		found := false
447		for _, systemIP := range listSystemIPs() {
448			if systemIP.Equal(advertiseIP) {
449				localAddr = advertiseIP.String()
450				found = true
451				break
452			}
453		}
454
455		if !found {
456			ip, err := c.resolveSystemAddr()
457			if err != nil {
458				c.Unlock()
459				logrus.Warnf("Could not find a local address: %v", err)
460				return "", errMustSpecifyListenAddr
461			}
462			localAddr = ip.String()
463		}
464	}
465
466	// todo: check current state existing
467	n, err := c.startNewNode(nodeStartConfig{
468		forceNewCluster: req.ForceNewCluster,
469		autolock:        req.AutoLockManagers,
470		LocalAddr:       localAddr,
471		ListenAddr:      net.JoinHostPort(listenHost, listenPort),
472		AdvertiseAddr:   net.JoinHostPort(advertiseHost, advertisePort),
473	})
474	if err != nil {
475		c.Unlock()
476		return "", err
477	}
478	c.Unlock()
479
480	select {
481	case <-n.Ready():
482		if err := initClusterSpec(n, req.Spec); err != nil {
483			return "", err
484		}
485		go c.reconnectOnFailure(n)
486		return n.NodeID(), nil
487	case <-n.done:
488		c.RLock()
489		defer c.RUnlock()
490		if !req.ForceNewCluster { // if failure on first attempt don't keep state
491			if err := c.clearState(); err != nil {
492				return "", err
493			}
494		}
495		return "", c.err
496	}
497}
498
499// Join makes current Cluster part of an existing swarm cluster.
500func (c *Cluster) Join(req types.JoinRequest) error {
501	c.Lock()
502	if c.swarmExists() {
503		c.Unlock()
504		return ErrSwarmExists
505	}
506	if err := validateAndSanitizeJoinRequest(&req); err != nil {
507		c.Unlock()
508		return err
509	}
510
511	listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
512	if err != nil {
513		c.Unlock()
514		return err
515	}
516
517	var advertiseAddr string
518	if req.AdvertiseAddr != "" {
519		advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
520		// For joining, we don't need to provide an advertise address,
521		// since the remote side can detect it.
522		if err == nil {
523			advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort)
524		}
525	}
526
527	// todo: check current state existing
528	n, err := c.startNewNode(nodeStartConfig{
529		RemoteAddr:    req.RemoteAddrs[0],
530		ListenAddr:    net.JoinHostPort(listenHost, listenPort),
531		AdvertiseAddr: advertiseAddr,
532		joinAddr:      req.RemoteAddrs[0],
533		joinToken:     req.JoinToken,
534	})
535	if err != nil {
536		c.Unlock()
537		return err
538	}
539	c.Unlock()
540
541	select {
542	case <-time.After(swarmConnectTimeout):
543		// attempt to connect will continue in background, but reconnect only if it didn't fail
544		go func() {
545			select {
546			case <-n.Ready():
547				c.reconnectOnFailure(n)
548			case <-n.done:
549				logrus.Errorf("failed to join the cluster: %+v", c.err)
550			}
551		}()
552		return ErrSwarmJoinTimeoutReached
553	case <-n.Ready():
554		go c.reconnectOnFailure(n)
555		return nil
556	case <-n.done:
557		c.RLock()
558		defer c.RUnlock()
559		return c.err
560	}
561}
562
563// GetUnlockKey returns the unlock key for the swarm.
564func (c *Cluster) GetUnlockKey() (string, error) {
565	c.RLock()
566	defer c.RUnlock()
567
568	if !c.isActiveManager() {
569		return "", c.errNoManager()
570	}
571
572	ctx, cancel := c.getRequestContext()
573	defer cancel()
574
575	client := swarmapi.NewCAClient(c.conn)
576
577	r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{})
578	if err != nil {
579		return "", err
580	}
581
582	if len(r.UnlockKey) == 0 {
583		// no key
584		return "", nil
585	}
586
587	return encryption.HumanReadableKey(r.UnlockKey), nil
588}
589
590// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
591func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error {
592	c.RLock()
593	if !c.isActiveManager() {
594		if err := c.errNoManager(); err != ErrSwarmLocked {
595			c.RUnlock()
596			return err
597		}
598	}
599
600	if c.node != nil || c.locked != true {
601		c.RUnlock()
602		return errors.New("swarm is not locked")
603	}
604	c.RUnlock()
605
606	key, err := encryption.ParseHumanReadableKey(req.UnlockKey)
607	if err != nil {
608		return err
609	}
610
611	c.Lock()
612	config := *c.lastNodeConfig
613	config.lockKey = key
614	n, err := c.startNewNode(config)
615	if err != nil {
616		c.Unlock()
617		return err
618	}
619	c.Unlock()
620	select {
621	case <-n.Ready():
622	case <-n.done:
623		if errors.Cause(c.err) == ErrSwarmLocked {
624			return errors.New("swarm could not be unlocked: invalid key provided")
625		}
626		return fmt.Errorf("swarm component could not be started: %v", c.err)
627	}
628	go c.reconnectOnFailure(n)
629	return nil
630}
631
632// stopNode is a helper that stops the active c.node and waits until it has
633// shut down. Call while keeping the cluster lock.
634func (c *Cluster) stopNode() error {
635	if c.node == nil {
636		return nil
637	}
638	c.stop = true
639	if c.cancelDelay != nil {
640		c.cancelDelay()
641		c.cancelDelay = nil
642	}
643	node := c.node
644	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
645	defer cancel()
646	// TODO: can't hold lock on stop because it calls back to network
647	c.Unlock()
648	defer c.Lock()
649	if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") {
650		return err
651	}
652	<-node.done
653	return nil
654}
655
656func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool {
657	return reachable-2 <= unreachable
658}
659
660func isLastManager(reachable, unreachable int) bool {
661	return reachable == 1 && unreachable == 0
662}
663
664// Leave shuts down Cluster and removes current state.
665func (c *Cluster) Leave(force bool) error {
666	c.Lock()
667	node := c.node
668	if node == nil {
669		if c.locked {
670			c.locked = false
671			c.lastNodeConfig = nil
672			c.Unlock()
673		} else if c.err == ErrSwarmCertificatesExpired {
674			c.err = nil
675			c.Unlock()
676		} else {
677			c.Unlock()
678			return ErrNoSwarm
679		}
680	} else {
681		if node.Manager() != nil && !force {
682			msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
683			if c.isActiveManager() {
684				active, reachable, unreachable, err := c.managerStats()
685				if err == nil {
686					if active && removingManagerCausesLossOfQuorum(reachable, unreachable) {
687						if isLastManager(reachable, unreachable) {
688							msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
689							c.Unlock()
690							return fmt.Errorf(msg)
691						}
692						msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable)
693					}
694				}
695			} else {
696				msg += "Doing so may lose the consensus of your cluster. "
697			}
698
699			msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
700			c.Unlock()
701			return fmt.Errorf(msg)
702		}
703		if err := c.stopNode(); err != nil {
704			logrus.Errorf("failed to shut down cluster node: %v", err)
705			signal.DumpStacks("")
706			c.Unlock()
707			return err
708		}
709		c.Unlock()
710		if nodeID := node.NodeID(); nodeID != "" {
711			nodeContainers, err := c.listContainerForNode(nodeID)
712			if err != nil {
713				return err
714			}
715			for _, id := range nodeContainers {
716				if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil {
717					logrus.Errorf("error removing %v: %v", id, err)
718				}
719			}
720		}
721	}
722	c.configEvent <- struct{}{}
723	// todo: cleanup optional?
724	if err := c.clearState(); err != nil {
725		return err
726	}
727	return nil
728}
729
730func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) {
731	var ids []string
732	filters := filters.NewArgs()
733	filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID))
734	containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{
735		Filters: filters,
736	})
737	if err != nil {
738		return []string{}, err
739	}
740	for _, c := range containers {
741		ids = append(ids, c.ID)
742	}
743	return ids, nil
744}
745
746func (c *Cluster) clearState() error {
747	// todo: backup this data instead of removing?
748	if err := os.RemoveAll(c.root); err != nil {
749		return err
750	}
751	if err := os.MkdirAll(c.root, 0700); err != nil {
752		return err
753	}
754	c.config.Backend.SetClusterProvider(nil)
755	return nil
756}
757
758func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost
759	return context.WithTimeout(context.Background(), swarmRequestTimeout)
760}
761
762// Inspect retrieves the configuration properties of a managed swarm cluster.
763func (c *Cluster) Inspect() (types.Swarm, error) {
764	c.RLock()
765	defer c.RUnlock()
766
767	if !c.isActiveManager() {
768		return types.Swarm{}, c.errNoManager()
769	}
770
771	ctx, cancel := c.getRequestContext()
772	defer cancel()
773
774	swarm, err := getSwarm(ctx, c.client)
775	if err != nil {
776		return types.Swarm{}, err
777	}
778
779	return convert.SwarmFromGRPC(*swarm), nil
780}
781
782// Update updates configuration of a managed swarm cluster.
783func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error {
784	c.RLock()
785	defer c.RUnlock()
786
787	if !c.isActiveManager() {
788		return c.errNoManager()
789	}
790
791	ctx, cancel := c.getRequestContext()
792	defer cancel()
793
794	swarm, err := getSwarm(ctx, c.client)
795	if err != nil {
796		return err
797	}
798
799	// In update, client should provide the complete spec of the swarm, including
800	// Name and Labels. If a field is specified with 0 or nil, then the default value
801	// will be used to swarmkit.
802	clusterSpec, err := convert.SwarmSpecToGRPC(spec)
803	if err != nil {
804		return err
805	}
806
807	_, err = c.client.UpdateCluster(
808		ctx,
809		&swarmapi.UpdateClusterRequest{
810			ClusterID: swarm.ID,
811			Spec:      &clusterSpec,
812			ClusterVersion: &swarmapi.Version{
813				Index: version,
814			},
815			Rotation: swarmapi.KeyRotation{
816				WorkerJoinToken:  flags.RotateWorkerToken,
817				ManagerJoinToken: flags.RotateManagerToken,
818				ManagerUnlockKey: flags.RotateManagerUnlockKey,
819			},
820		},
821	)
822	return err
823}
824
825// IsManager returns true if Cluster is participating as a manager.
826func (c *Cluster) IsManager() bool {
827	c.RLock()
828	defer c.RUnlock()
829	return c.isActiveManager()
830}
831
832// IsAgent returns true if Cluster is participating as a worker/agent.
833func (c *Cluster) IsAgent() bool {
834	c.RLock()
835	defer c.RUnlock()
836	return c.node != nil && c.ready
837}
838
839// GetLocalAddress returns the local address.
840func (c *Cluster) GetLocalAddress() string {
841	c.RLock()
842	defer c.RUnlock()
843	return c.actualLocalAddr
844}
845
846// GetListenAddress returns the listen address.
847func (c *Cluster) GetListenAddress() string {
848	c.RLock()
849	defer c.RUnlock()
850	if c.node != nil {
851		return c.node.config.ListenAddr
852	}
853	return ""
854}
855
856// GetAdvertiseAddress returns the remotely reachable address of this node.
857func (c *Cluster) GetAdvertiseAddress() string {
858	c.RLock()
859	defer c.RUnlock()
860	if c.node != nil && c.node.config.AdvertiseAddr != "" {
861		advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr)
862		return advertiseHost
863	}
864	return c.actualLocalAddr
865}
866
867// GetRemoteAddress returns a known advertise address of a remote manager if
868// available.
869// todo: change to array/connect with info
870func (c *Cluster) GetRemoteAddress() string {
871	c.RLock()
872	defer c.RUnlock()
873	return c.getRemoteAddress()
874}
875
876func (c *Cluster) getRemoteAddress() string {
877	if c.node == nil {
878		return ""
879	}
880	nodeID := c.node.NodeID()
881	for _, r := range c.node.Remotes() {
882		if r.NodeID != nodeID {
883			return r.Addr
884		}
885	}
886	return ""
887}
888
889// ListenClusterEvents returns a channel that receives messages on cluster
890// participation changes.
891// todo: make cancelable and accessible to multiple callers
892func (c *Cluster) ListenClusterEvents() <-chan struct{} {
893	return c.configEvent
894}
895
896// Info returns information about the current cluster state.
897func (c *Cluster) Info() types.Info {
898	info := types.Info{
899		NodeAddr: c.GetAdvertiseAddress(),
900	}
901
902	c.RLock()
903	defer c.RUnlock()
904
905	if c.node == nil {
906		info.LocalNodeState = types.LocalNodeStateInactive
907		if c.cancelDelay != nil {
908			info.LocalNodeState = types.LocalNodeStateError
909		}
910		if c.locked {
911			info.LocalNodeState = types.LocalNodeStateLocked
912		} else if c.err == ErrSwarmCertificatesExpired {
913			info.LocalNodeState = types.LocalNodeStateError
914		}
915	} else {
916		info.LocalNodeState = types.LocalNodeStatePending
917		if c.ready == true {
918			info.LocalNodeState = types.LocalNodeStateActive
919		} else if c.locked {
920			info.LocalNodeState = types.LocalNodeStateLocked
921		}
922	}
923	if c.err != nil {
924		info.Error = c.err.Error()
925	}
926
927	ctx, cancel := c.getRequestContext()
928	defer cancel()
929
930	if c.isActiveManager() {
931		info.ControlAvailable = true
932		swarm, err := c.Inspect()
933		if err != nil {
934			info.Error = err.Error()
935		}
936
937		// Strip JoinTokens
938		info.Cluster = swarm.ClusterInfo
939
940		if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil {
941			info.Nodes = len(r.Nodes)
942			for _, n := range r.Nodes {
943				if n.ManagerStatus != nil {
944					info.Managers = info.Managers + 1
945				}
946			}
947		}
948	}
949
950	if c.node != nil {
951		for _, r := range c.node.Remotes() {
952			info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr})
953		}
954		info.NodeID = c.node.NodeID()
955	}
956
957	return info
958}
959
960// isActiveManager should not be called without a read lock
961func (c *Cluster) isActiveManager() bool {
962	return c.node != nil && c.conn != nil
963}
964
965// swarmExists should not be called without a read lock
966func (c *Cluster) swarmExists() bool {
967	return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired
968}
969
970// errNoManager returns error describing why manager commands can't be used.
971// Call with read lock.
972func (c *Cluster) errNoManager() error {
973	if c.node == nil {
974		if c.locked {
975			return ErrSwarmLocked
976		}
977		if c.err == ErrSwarmCertificatesExpired {
978			return ErrSwarmCertificatesExpired
979		}
980		return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.")
981	}
982	if c.node.Manager() != nil {
983		return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.")
984	}
985	return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.")
986}
987
988// GetServices returns all services of a managed swarm cluster.
989func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) {
990	c.RLock()
991	defer c.RUnlock()
992
993	if !c.isActiveManager() {
994		return nil, c.errNoManager()
995	}
996
997	filters, err := newListServicesFilters(options.Filters)
998	if err != nil {
999		return nil, err
1000	}
1001	ctx, cancel := c.getRequestContext()
1002	defer cancel()
1003
1004	r, err := c.client.ListServices(
1005		ctx,
1006		&swarmapi.ListServicesRequest{Filters: filters})
1007	if err != nil {
1008		return nil, err
1009	}
1010
1011	services := []types.Service{}
1012
1013	for _, service := range r.Services {
1014		services = append(services, convert.ServiceFromGRPC(*service))
1015	}
1016
1017	return services, nil
1018}
1019
1020// imageWithDigestString takes an image such as name or name:tag
1021// and returns the image pinned to a digest, such as name@sha256:34234...
1022// Due to the difference between the docker/docker/reference, and the
1023// docker/distribution/reference packages, we're parsing the image twice.
1024// As the two packages converge, this function should be simplified.
1025// TODO(nishanttotla): After the packages converge, the function must
1026// convert distreference.Named -> distreference.Canonical, and the logic simplified.
1027func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) {
1028	if _, err := digest.ParseDigest(image); err == nil {
1029		return "", errors.New("image reference is an image ID")
1030	}
1031	ref, err := distreference.ParseNamed(image)
1032	if err != nil {
1033		return "", err
1034	}
1035	// only query registry if not a canonical reference (i.e. with digest)
1036	if _, ok := ref.(distreference.Canonical); !ok {
1037		// create a docker/docker/reference Named object because GetRepository needs it
1038		dockerRef, err := reference.ParseNamed(image)
1039		if err != nil {
1040			return "", err
1041		}
1042		dockerRef = reference.WithDefaultTag(dockerRef)
1043		namedTaggedRef, ok := dockerRef.(reference.NamedTagged)
1044		if !ok {
1045			return "", fmt.Errorf("unable to cast image to NamedTagged reference object")
1046		}
1047
1048		repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig)
1049		if err != nil {
1050			return "", err
1051		}
1052		dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag())
1053		if err != nil {
1054			return "", err
1055		}
1056
1057		namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest)
1058		if err != nil {
1059			return "", err
1060		}
1061		return namedDigestedRef.String(), nil
1062	}
1063	// reference already contains a digest, so just return it
1064	return ref.String(), nil
1065}
1066
1067// CreateService creates a new service in a managed swarm cluster.
1068func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) {
1069	c.RLock()
1070	defer c.RUnlock()
1071
1072	if !c.isActiveManager() {
1073		return nil, c.errNoManager()
1074	}
1075
1076	ctx, cancel := c.getRequestContext()
1077	defer cancel()
1078
1079	err := c.populateNetworkID(ctx, c.client, &s)
1080	if err != nil {
1081		return nil, err
1082	}
1083
1084	serviceSpec, err := convert.ServiceSpecToGRPC(s)
1085	if err != nil {
1086		return nil, err
1087	}
1088
1089	ctnr := serviceSpec.Task.GetContainer()
1090	if ctnr == nil {
1091		return nil, fmt.Errorf("service does not use container tasks")
1092	}
1093
1094	if encodedAuth != "" {
1095		ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
1096	}
1097
1098	// retrieve auth config from encoded auth
1099	authConfig := &apitypes.AuthConfig{}
1100	if encodedAuth != "" {
1101		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
1102			logrus.Warnf("invalid authconfig: %v", err)
1103		}
1104	}
1105
1106	resp := &apitypes.ServiceCreateResponse{}
1107
1108	// pin image by digest
1109	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
1110		digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig)
1111		if err != nil {
1112			logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())
1113			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()))
1114		} else if ctnr.Image != digestImage {
1115			logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage)
1116			ctnr.Image = digestImage
1117		} else {
1118			logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image)
1119		}
1120	}
1121
1122	r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
1123	if err != nil {
1124		return nil, err
1125	}
1126
1127	resp.ID = r.Service.ID
1128	return resp, nil
1129}
1130
1131// GetService returns a service based on an ID or name.
1132func (c *Cluster) GetService(input string) (types.Service, error) {
1133	c.RLock()
1134	defer c.RUnlock()
1135
1136	if !c.isActiveManager() {
1137		return types.Service{}, c.errNoManager()
1138	}
1139
1140	ctx, cancel := c.getRequestContext()
1141	defer cancel()
1142
1143	service, err := getService(ctx, c.client, input)
1144	if err != nil {
1145		return types.Service{}, err
1146	}
1147	return convert.ServiceFromGRPC(*service), nil
1148}
1149
1150// UpdateService updates existing service to match new properties.
1151func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) {
1152	c.RLock()
1153	defer c.RUnlock()
1154
1155	if !c.isActiveManager() {
1156		return nil, c.errNoManager()
1157	}
1158
1159	ctx, cancel := c.getRequestContext()
1160	defer cancel()
1161
1162	err := c.populateNetworkID(ctx, c.client, &spec)
1163	if err != nil {
1164		return nil, err
1165	}
1166
1167	serviceSpec, err := convert.ServiceSpecToGRPC(spec)
1168	if err != nil {
1169		return nil, err
1170	}
1171
1172	currentService, err := getService(ctx, c.client, serviceIDOrName)
1173	if err != nil {
1174		return nil, err
1175	}
1176
1177	newCtnr := serviceSpec.Task.GetContainer()
1178	if newCtnr == nil {
1179		return nil, fmt.Errorf("service does not use container tasks")
1180	}
1181
1182	if encodedAuth != "" {
1183		newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
1184	} else {
1185		// this is needed because if the encodedAuth isn't being updated then we
1186		// shouldn't lose it, and continue to use the one that was already present
1187		var ctnr *swarmapi.ContainerSpec
1188		switch registryAuthFrom {
1189		case apitypes.RegistryAuthFromSpec, "":
1190			ctnr = currentService.Spec.Task.GetContainer()
1191		case apitypes.RegistryAuthFromPreviousSpec:
1192			if currentService.PreviousSpec == nil {
1193				return nil, fmt.Errorf("service does not have a previous spec")
1194			}
1195			ctnr = currentService.PreviousSpec.Task.GetContainer()
1196		default:
1197			return nil, fmt.Errorf("unsupported registryAuthFromValue")
1198		}
1199		if ctnr == nil {
1200			return nil, fmt.Errorf("service does not use container tasks")
1201		}
1202		newCtnr.PullOptions = ctnr.PullOptions
1203		// update encodedAuth so it can be used to pin image by digest
1204		if ctnr.PullOptions != nil {
1205			encodedAuth = ctnr.PullOptions.RegistryAuth
1206		}
1207	}
1208
1209	// retrieve auth config from encoded auth
1210	authConfig := &apitypes.AuthConfig{}
1211	if encodedAuth != "" {
1212		if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
1213			logrus.Warnf("invalid authconfig: %v", err)
1214		}
1215	}
1216
1217	resp := &apitypes.ServiceUpdateResponse{}
1218
1219	// pin image by digest
1220	if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" {
1221		digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
1222		if err != nil {
1223			logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
1224			resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()))
1225		} else if newCtnr.Image != digestImage {
1226			logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
1227			newCtnr.Image = digestImage
1228		} else {
1229			logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
1230		}
1231	}
1232
1233	_, err = c.client.UpdateService(
1234		ctx,
1235		&swarmapi.UpdateServiceRequest{
1236			ServiceID: currentService.ID,
1237			Spec:      &serviceSpec,
1238			ServiceVersion: &swarmapi.Version{
1239				Index: version,
1240			},
1241		},
1242	)
1243
1244	return resp, err
1245}
1246
1247// RemoveService removes a service from a managed swarm cluster.
1248func (c *Cluster) RemoveService(input string) error {
1249	c.RLock()
1250	defer c.RUnlock()
1251
1252	if !c.isActiveManager() {
1253		return c.errNoManager()
1254	}
1255
1256	ctx, cancel := c.getRequestContext()
1257	defer cancel()
1258
1259	service, err := getService(ctx, c.client, input)
1260	if err != nil {
1261		return err
1262	}
1263
1264	if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil {
1265		return err
1266	}
1267	return nil
1268}
1269
1270// ServiceLogs collects service logs and writes them back to `config.OutStream`
1271func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error {
1272	c.RLock()
1273	if !c.isActiveManager() {
1274		c.RUnlock()
1275		return c.errNoManager()
1276	}
1277
1278	service, err := getService(ctx, c.client, input)
1279	if err != nil {
1280		c.RUnlock()
1281		return err
1282	}
1283
1284	stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{
1285		Selector: &swarmapi.LogSelector{
1286			ServiceIDs: []string{service.ID},
1287		},
1288		Options: &swarmapi.LogSubscriptionOptions{
1289			Follow: config.Follow,
1290		},
1291	})
1292	if err != nil {
1293		c.RUnlock()
1294		return err
1295	}
1296
1297	wf := ioutils.NewWriteFlusher(config.OutStream)
1298	defer wf.Close()
1299	close(started)
1300	wf.Flush()
1301
1302	outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout)
1303	errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr)
1304
1305	// Release the lock before starting the stream.
1306	c.RUnlock()
1307	for {
1308		// Check the context before doing anything.
1309		select {
1310		case <-ctx.Done():
1311			return ctx.Err()
1312		default:
1313		}
1314
1315		subscribeMsg, err := stream.Recv()
1316		if err == io.EOF {
1317			return nil
1318		}
1319		if err != nil {
1320			return err
1321		}
1322
1323		for _, msg := range subscribeMsg.Messages {
1324			data := []byte{}
1325
1326			if config.Timestamps {
1327				ts, err := ptypes.Timestamp(msg.Timestamp)
1328				if err != nil {
1329					return err
1330				}
1331				data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...)
1332			}
1333
1334			data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ",
1335				contextPrefix, msg.Context.NodeID,
1336				contextPrefix, msg.Context.ServiceID,
1337				contextPrefix, msg.Context.TaskID,
1338			))...)
1339
1340			data = append(data, msg.Data...)
1341
1342			switch msg.Stream {
1343			case swarmapi.LogStreamStdout:
1344				outStream.Write(data)
1345			case swarmapi.LogStreamStderr:
1346				errStream.Write(data)
1347			}
1348		}
1349	}
1350}
1351
1352// GetNodes returns a list of all nodes known to a cluster.
1353func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) {
1354	c.RLock()
1355	defer c.RUnlock()
1356
1357	if !c.isActiveManager() {
1358		return nil, c.errNoManager()
1359	}
1360
1361	filters, err := newListNodesFilters(options.Filters)
1362	if err != nil {
1363		return nil, err
1364	}
1365
1366	ctx, cancel := c.getRequestContext()
1367	defer cancel()
1368
1369	r, err := c.client.ListNodes(
1370		ctx,
1371		&swarmapi.ListNodesRequest{Filters: filters})
1372	if err != nil {
1373		return nil, err
1374	}
1375
1376	nodes := []types.Node{}
1377
1378	for _, node := range r.Nodes {
1379		nodes = append(nodes, convert.NodeFromGRPC(*node))
1380	}
1381	return nodes, nil
1382}
1383
1384// GetNode returns a node based on an ID or name.
1385func (c *Cluster) GetNode(input string) (types.Node, error) {
1386	c.RLock()
1387	defer c.RUnlock()
1388
1389	if !c.isActiveManager() {
1390		return types.Node{}, c.errNoManager()
1391	}
1392
1393	ctx, cancel := c.getRequestContext()
1394	defer cancel()
1395
1396	node, err := getNode(ctx, c.client, input)
1397	if err != nil {
1398		return types.Node{}, err
1399	}
1400	return convert.NodeFromGRPC(*node), nil
1401}
1402
1403// UpdateNode updates existing nodes properties.
1404func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error {
1405	c.RLock()
1406	defer c.RUnlock()
1407
1408	if !c.isActiveManager() {
1409		return c.errNoManager()
1410	}
1411
1412	nodeSpec, err := convert.NodeSpecToGRPC(spec)
1413	if err != nil {
1414		return err
1415	}
1416
1417	ctx, cancel := c.getRequestContext()
1418	defer cancel()
1419
1420	currentNode, err := getNode(ctx, c.client, input)
1421	if err != nil {
1422		return err
1423	}
1424
1425	_, err = c.client.UpdateNode(
1426		ctx,
1427		&swarmapi.UpdateNodeRequest{
1428			NodeID: currentNode.ID,
1429			Spec:   &nodeSpec,
1430			NodeVersion: &swarmapi.Version{
1431				Index: version,
1432			},
1433		},
1434	)
1435	return err
1436}
1437
1438// RemoveNode removes a node from a cluster
1439func (c *Cluster) RemoveNode(input string, force bool) error {
1440	c.RLock()
1441	defer c.RUnlock()
1442
1443	if !c.isActiveManager() {
1444		return c.errNoManager()
1445	}
1446
1447	ctx, cancel := c.getRequestContext()
1448	defer cancel()
1449
1450	node, err := getNode(ctx, c.client, input)
1451	if err != nil {
1452		return err
1453	}
1454
1455	if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil {
1456		return err
1457	}
1458	return nil
1459}
1460
1461// GetTasks returns a list of tasks matching the filter options.
1462func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) {
1463	c.RLock()
1464	defer c.RUnlock()
1465
1466	if !c.isActiveManager() {
1467		return nil, c.errNoManager()
1468	}
1469
1470	byName := func(filter filters.Args) error {
1471		if filter.Include("service") {
1472			serviceFilters := filter.Get("service")
1473			for _, serviceFilter := range serviceFilters {
1474				service, err := c.GetService(serviceFilter)
1475				if err != nil {
1476					return err
1477				}
1478				filter.Del("service", serviceFilter)
1479				filter.Add("service", service.ID)
1480			}
1481		}
1482		if filter.Include("node") {
1483			nodeFilters := filter.Get("node")
1484			for _, nodeFilter := range nodeFilters {
1485				node, err := c.GetNode(nodeFilter)
1486				if err != nil {
1487					return err
1488				}
1489				filter.Del("node", nodeFilter)
1490				filter.Add("node", node.ID)
1491			}
1492		}
1493		return nil
1494	}
1495
1496	filters, err := newListTasksFilters(options.Filters, byName)
1497	if err != nil {
1498		return nil, err
1499	}
1500
1501	ctx, cancel := c.getRequestContext()
1502	defer cancel()
1503
1504	r, err := c.client.ListTasks(
1505		ctx,
1506		&swarmapi.ListTasksRequest{Filters: filters})
1507	if err != nil {
1508		return nil, err
1509	}
1510
1511	tasks := []types.Task{}
1512
1513	for _, task := range r.Tasks {
1514		if task.Spec.GetContainer() != nil {
1515			tasks = append(tasks, convert.TaskFromGRPC(*task))
1516		}
1517	}
1518	return tasks, nil
1519}
1520
1521// GetTask returns a task by an ID.
1522func (c *Cluster) GetTask(input string) (types.Task, error) {
1523	c.RLock()
1524	defer c.RUnlock()
1525
1526	if !c.isActiveManager() {
1527		return types.Task{}, c.errNoManager()
1528	}
1529
1530	ctx, cancel := c.getRequestContext()
1531	defer cancel()
1532
1533	task, err := getTask(ctx, c.client, input)
1534	if err != nil {
1535		return types.Task{}, err
1536	}
1537	return convert.TaskFromGRPC(*task), nil
1538}
1539
1540// GetNetwork returns a cluster network by an ID.
1541func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) {
1542	c.RLock()
1543	defer c.RUnlock()
1544
1545	if !c.isActiveManager() {
1546		return apitypes.NetworkResource{}, c.errNoManager()
1547	}
1548
1549	ctx, cancel := c.getRequestContext()
1550	defer cancel()
1551
1552	network, err := getNetwork(ctx, c.client, input)
1553	if err != nil {
1554		return apitypes.NetworkResource{}, err
1555	}
1556	return convert.BasicNetworkFromGRPC(*network), nil
1557}
1558
1559func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]apitypes.NetworkResource, error) {
1560	c.RLock()
1561	defer c.RUnlock()
1562
1563	if !c.isActiveManager() {
1564		return nil, c.errNoManager()
1565	}
1566
1567	ctx, cancel := c.getRequestContext()
1568	defer cancel()
1569
1570	r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters})
1571	if err != nil {
1572		return nil, err
1573	}
1574
1575	var networks []apitypes.NetworkResource
1576
1577	for _, network := range r.Networks {
1578		networks = append(networks, convert.BasicNetworkFromGRPC(*network))
1579	}
1580
1581	return networks, nil
1582}
1583
1584// GetNetworks returns all current cluster managed networks.
1585func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) {
1586	return c.getNetworks(nil)
1587}
1588
1589// GetNetworksByName returns cluster managed networks by name.
1590// It is ok to have multiple networks here. #18864
1591func (c *Cluster) GetNetworksByName(name string) ([]apitypes.NetworkResource, error) {
1592	// Note that swarmapi.GetNetworkRequest.Name is not functional.
1593	// So we cannot just use that with c.GetNetwork.
1594	return c.getNetworks(&swarmapi.ListNetworksRequest_Filters{
1595		Names: []string{name},
1596	})
1597}
1598
1599func attacherKey(target, containerID string) string {
1600	return containerID + ":" + target
1601}
1602
1603// UpdateAttachment signals the attachment config to the attachment
1604// waiter who is trying to start or attach the container to the
1605// network.
1606func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error {
1607	c.RLock()
1608	attacher, ok := c.attachers[attacherKey(target, containerID)]
1609	c.RUnlock()
1610	if !ok || attacher == nil {
1611		return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target)
1612	}
1613
1614	attacher.attachWaitCh <- config
1615	close(attacher.attachWaitCh)
1616	return nil
1617}
1618
1619// WaitForDetachment waits for the container to stop or detach from
1620// the network.
1621func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error {
1622	c.RLock()
1623	attacher, ok := c.attachers[attacherKey(networkName, containerID)]
1624	if !ok {
1625		attacher, ok = c.attachers[attacherKey(networkID, containerID)]
1626	}
1627	if c.node == nil || c.node.Agent() == nil {
1628		c.RUnlock()
1629		return fmt.Errorf("invalid cluster node while waiting for detachment")
1630	}
1631
1632	agent := c.node.Agent()
1633	c.RUnlock()
1634
1635	if ok && attacher != nil &&
1636		attacher.detachWaitCh != nil &&
1637		attacher.attachCompleteCh != nil {
1638		// Attachment may be in progress still so wait for
1639		// attachment to complete.
1640		select {
1641		case <-attacher.attachCompleteCh:
1642		case <-ctx.Done():
1643			return ctx.Err()
1644		}
1645
1646		if attacher.taskID == taskID {
1647			select {
1648			case <-attacher.detachWaitCh:
1649			case <-ctx.Done():
1650				return ctx.Err()
1651			}
1652		}
1653	}
1654
1655	return agent.ResourceAllocator().DetachNetwork(ctx, taskID)
1656}
1657
1658// AttachNetwork generates an attachment request towards the manager.
1659func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) {
1660	aKey := attacherKey(target, containerID)
1661	c.Lock()
1662	if c.node == nil || c.node.Agent() == nil {
1663		c.Unlock()
1664		return nil, fmt.Errorf("invalid cluster node while attaching to network")
1665	}
1666	if attacher, ok := c.attachers[aKey]; ok {
1667		c.Unlock()
1668		return attacher.config, nil
1669	}
1670
1671	agent := c.node.Agent()
1672	attachWaitCh := make(chan *network.NetworkingConfig)
1673	detachWaitCh := make(chan struct{})
1674	attachCompleteCh := make(chan struct{})
1675	c.attachers[aKey] = &attacher{
1676		attachWaitCh:     attachWaitCh,
1677		attachCompleteCh: attachCompleteCh,
1678		detachWaitCh:     detachWaitCh,
1679	}
1680	c.Unlock()
1681
1682	ctx, cancel := c.getRequestContext()
1683	defer cancel()
1684
1685	taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses)
1686	if err != nil {
1687		c.Lock()
1688		delete(c.attachers, aKey)
1689		c.Unlock()
1690		return nil, fmt.Errorf("Could not attach to network %s: %v", target, err)
1691	}
1692
1693	c.Lock()
1694	c.attachers[aKey].taskID = taskID
1695	close(attachCompleteCh)
1696	c.Unlock()
1697
1698	logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID)
1699
1700	var config *network.NetworkingConfig
1701	select {
1702	case config = <-attachWaitCh:
1703	case <-ctx.Done():
1704		return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err())
1705	}
1706
1707	c.Lock()
1708	c.attachers[aKey].config = config
1709	c.Unlock()
1710	return config, nil
1711}
1712
1713// DetachNetwork unblocks the waiters waiting on WaitForDetachment so
1714// that a request to detach can be generated towards the manager.
1715func (c *Cluster) DetachNetwork(target string, containerID string) error {
1716	aKey := attacherKey(target, containerID)
1717
1718	c.Lock()
1719	attacher, ok := c.attachers[aKey]
1720	delete(c.attachers, aKey)
1721	c.Unlock()
1722
1723	if !ok {
1724		return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target)
1725	}
1726
1727	close(attacher.detachWaitCh)
1728	return nil
1729}
1730
1731// CreateNetwork creates a new cluster managed network.
1732func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) {
1733	c.RLock()
1734	defer c.RUnlock()
1735
1736	if !c.isActiveManager() {
1737		return "", c.errNoManager()
1738	}
1739
1740	if runconfig.IsPreDefinedNetwork(s.Name) {
1741		err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name)
1742		return "", apierrors.NewRequestForbiddenError(err)
1743	}
1744
1745	ctx, cancel := c.getRequestContext()
1746	defer cancel()
1747
1748	networkSpec := convert.BasicNetworkCreateToGRPC(s)
1749	r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec})
1750	if err != nil {
1751		return "", err
1752	}
1753
1754	return r.Network.ID, nil
1755}
1756
1757// RemoveNetwork removes a cluster network.
1758func (c *Cluster) RemoveNetwork(input string) error {
1759	c.RLock()
1760	defer c.RUnlock()
1761
1762	if !c.isActiveManager() {
1763		return c.errNoManager()
1764	}
1765
1766	ctx, cancel := c.getRequestContext()
1767	defer cancel()
1768
1769	network, err := getNetwork(ctx, c.client, input)
1770	if err != nil {
1771		return err
1772	}
1773
1774	if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil {
1775		return err
1776	}
1777	return nil
1778}
1779
1780func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error {
1781	// Always prefer NetworkAttachmentConfigs from TaskTemplate
1782	// but fallback to service spec for backward compatibility
1783	networks := s.TaskTemplate.Networks
1784	if len(networks) == 0 {
1785		networks = s.Networks
1786	}
1787
1788	for i, n := range networks {
1789		apiNetwork, err := getNetwork(ctx, client, n.Target)
1790		if err != nil {
1791			if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() {
1792				err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name())
1793				return apierrors.NewRequestForbiddenError(err)
1794			}
1795			return err
1796		}
1797		networks[i].Target = apiNetwork.ID
1798	}
1799	return nil
1800}
1801
1802func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) {
1803	// GetNetwork to match via full ID.
1804	rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input})
1805	if err != nil {
1806		// If any error (including NotFound), ListNetworks to match via ID prefix and full name.
1807		rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}})
1808		if err != nil || len(rl.Networks) == 0 {
1809			rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}})
1810		}
1811
1812		if err != nil {
1813			return nil, err
1814		}
1815
1816		if len(rl.Networks) == 0 {
1817			return nil, fmt.Errorf("network %s not found", input)
1818		}
1819
1820		if l := len(rl.Networks); l > 1 {
1821			return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l)
1822		}
1823
1824		return rl.Networks[0], nil
1825	}
1826	return rg.Network, nil
1827}
1828
1829// Cleanup stops active swarm node. This is run before daemon shutdown.
1830func (c *Cluster) Cleanup() {
1831	c.Lock()
1832	node := c.node
1833	if node == nil {
1834		c.Unlock()
1835		return
1836	}
1837	defer c.Unlock()
1838	if c.isActiveManager() {
1839		active, reachable, unreachable, err := c.managerStats()
1840		if err == nil {
1841			singlenode := active && isLastManager(reachable, unreachable)
1842			if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) {
1843				logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable)
1844			}
1845		}
1846	}
1847	c.stopNode()
1848}
1849
1850func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) {
1851	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1852	defer cancel()
1853	nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{})
1854	if err != nil {
1855		return false, 0, 0, err
1856	}
1857	for _, n := range nodes.Nodes {
1858		if n.ManagerStatus != nil {
1859			if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE {
1860				reachable++
1861				if n.ID == c.node.NodeID() {
1862					current = true
1863				}
1864			}
1865			if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE {
1866				unreachable++
1867			}
1868		}
1869	}
1870	return
1871}
1872
1873func validateAndSanitizeInitRequest(req *types.InitRequest) error {
1874	var err error
1875	req.ListenAddr, err = validateAddr(req.ListenAddr)
1876	if err != nil {
1877		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
1878	}
1879
1880	if req.Spec.Annotations.Name == "" {
1881		req.Spec.Annotations.Name = "default"
1882	} else if req.Spec.Annotations.Name != "default" {
1883		return errors.New(`swarm spec must be named "default"`)
1884	}
1885
1886	return nil
1887}
1888
1889func validateAndSanitizeJoinRequest(req *types.JoinRequest) error {
1890	var err error
1891	req.ListenAddr, err = validateAddr(req.ListenAddr)
1892	if err != nil {
1893		return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err)
1894	}
1895	if len(req.RemoteAddrs) == 0 {
1896		return fmt.Errorf("at least 1 RemoteAddr is required to join")
1897	}
1898	for i := range req.RemoteAddrs {
1899		req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i])
1900		if err != nil {
1901			return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err)
1902		}
1903	}
1904	return nil
1905}
1906
1907func validateAddr(addr string) (string, error) {
1908	if addr == "" {
1909		return addr, fmt.Errorf("invalid empty address")
1910	}
1911	newaddr, err := opts.ParseTCPAddr(addr, defaultAddr)
1912	if err != nil {
1913		return addr, nil
1914	}
1915	return strings.TrimPrefix(newaddr, "tcp://"), nil
1916}
1917
1918func initClusterSpec(node *node, spec types.Spec) error {
1919	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
1920	for conn := range node.ListenControlSocket(ctx) {
1921		if ctx.Err() != nil {
1922			return ctx.Err()
1923		}
1924		if conn != nil {
1925			client := swarmapi.NewControlClient(conn)
1926			var cluster *swarmapi.Cluster
1927			for i := 0; ; i++ {
1928				lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{})
1929				if err != nil {
1930					return fmt.Errorf("error on listing clusters: %v", err)
1931				}
1932				if len(lcr.Clusters) == 0 {
1933					if i < 10 {
1934						time.Sleep(200 * time.Millisecond)
1935						continue
1936					}
1937					return fmt.Errorf("empty list of clusters was returned")
1938				}
1939				cluster = lcr.Clusters[0]
1940				break
1941			}
1942			// In init, we take the initial default values from swarmkit, and merge
1943			// any non nil or 0 value from spec to GRPC spec. This will leave the
1944			// default value alone.
1945			// Note that this is different from Update(), as in Update() we expect
1946			// user to specify the complete spec of the cluster (as they already know
1947			// the existing one and knows which field to update)
1948			clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec)
1949			if err != nil {
1950				return fmt.Errorf("error updating cluster settings: %v", err)
1951			}
1952			_, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{
1953				ClusterID:      cluster.ID,
1954				ClusterVersion: &cluster.Meta.Version,
1955				Spec:           &clusterSpec,
1956			})
1957			if err != nil {
1958				return fmt.Errorf("error updating cluster settings: %v", err)
1959			}
1960			return nil
1961		}
1962	}
1963	return ctx.Err()
1964}
1965
1966func detectLockedError(err error) error {
1967	if err == swarmnode.ErrInvalidUnlockKey {
1968		return errors.WithStack(ErrSwarmLocked)
1969	}
1970	return err
1971}
1972