1package cluster 2 3import ( 4 "crypto/x509" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "io" 9 "io/ioutil" 10 "net" 11 "os" 12 "path/filepath" 13 "runtime" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/sirupsen/logrus" 19 "github.com/docker/distribution/digest" 20 distreference "github.com/docker/distribution/reference" 21 apierrors "github.com/docker/docker/api/errors" 22 apitypes "github.com/docker/docker/api/types" 23 "github.com/docker/docker/api/types/backend" 24 "github.com/docker/docker/api/types/filters" 25 "github.com/docker/docker/api/types/network" 26 types "github.com/docker/docker/api/types/swarm" 27 "github.com/docker/docker/daemon/cluster/convert" 28 executorpkg "github.com/docker/docker/daemon/cluster/executor" 29 "github.com/docker/docker/daemon/cluster/executor/container" 30 "github.com/docker/docker/daemon/logger" 31 "github.com/docker/docker/opts" 32 "github.com/docker/docker/pkg/ioutils" 33 "github.com/docker/docker/pkg/signal" 34 "github.com/docker/docker/pkg/stdcopy" 35 "github.com/docker/docker/reference" 36 "github.com/docker/docker/runconfig" 37 swarmapi "github.com/docker/swarmkit/api" 38 "github.com/docker/swarmkit/manager/encryption" 39 swarmnode "github.com/docker/swarmkit/node" 40 "github.com/docker/swarmkit/protobuf/ptypes" 41 "github.com/pkg/errors" 42 "golang.org/x/net/context" 43 "google.golang.org/grpc" 44) 45 46const swarmDirName = "swarm" 47const controlSocket = "control.sock" 48const swarmConnectTimeout = 20 * time.Second 49const swarmRequestTimeout = 20 * time.Second 50const stateFile = "docker-state.json" 51const defaultAddr = "0.0.0.0:2377" 52 53const ( 54 initialReconnectDelay = 100 * time.Millisecond 55 maxReconnectDelay = 30 * time.Second 56 contextPrefix = "com.docker.swarm" 57) 58 59// ErrNoSwarm is returned on leaving a cluster that was never initialized 60var ErrNoSwarm = fmt.Errorf("This node is not part of a swarm") 61 62// ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated 63var ErrSwarmExists = fmt.Errorf("This node is already part of a swarm. Use \"docker swarm leave\" to leave this swarm and join another one.") 64 65// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet. 66var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.") 67 68// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached. 69var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. The attempt to join the swarm will continue in the background. Use the \"docker info\" command to see the current swarm status of your node.") 70 71// ErrSwarmLocked is returned if the swarm is encrypted and needs a key to unlock it. 72var ErrSwarmLocked = fmt.Errorf("Swarm is encrypted and needs to be unlocked before it can be used. Please use \"docker swarm unlock\" to unlock it.") 73 74// ErrSwarmCertificatesExpired is returned if docker was not started for the whole validity period and they had no chance to renew automatically. 75var ErrSwarmCertificatesExpired = errors.New("Swarm certificates have expired. To replace them, leave the swarm and join again.") 76 77// NetworkSubnetsProvider exposes functions for retrieving the subnets 78// of networks managed by Docker, so they can be filtered. 79type NetworkSubnetsProvider interface { 80 V4Subnets() []net.IPNet 81 V6Subnets() []net.IPNet 82} 83 84// Config provides values for Cluster. 85type Config struct { 86 Root string 87 Name string 88 Backend executorpkg.Backend 89 NetworkSubnetsProvider NetworkSubnetsProvider 90 91 // DefaultAdvertiseAddr is the default host/IP or network interface to use 92 // if no AdvertiseAddr value is specified. 93 DefaultAdvertiseAddr string 94 95 // path to store runtime state, such as the swarm control socket 96 RuntimeRoot string 97} 98 99// Cluster provides capabilities to participate in a cluster as a worker or a 100// manager. 101type Cluster struct { 102 sync.RWMutex 103 *node 104 root string 105 runtimeRoot string 106 config Config 107 configEvent chan struct{} // todo: make this array and goroutine safe 108 actualLocalAddr string // after resolution, not persisted 109 stop bool 110 err error 111 cancelDelay func() 112 attachers map[string]*attacher 113 locked bool 114 lastNodeConfig *nodeStartConfig 115} 116 117// attacher manages the in-memory attachment state of a container 118// attachment to a global scope network managed by swarm manager. It 119// helps in identifying the attachment ID via the taskID and the 120// corresponding attachment configuration obtained from the manager. 121type attacher struct { 122 taskID string 123 config *network.NetworkingConfig 124 attachWaitCh chan *network.NetworkingConfig 125 attachCompleteCh chan struct{} 126 detachWaitCh chan struct{} 127} 128 129type node struct { 130 *swarmnode.Node 131 done chan struct{} 132 ready bool 133 conn *grpc.ClientConn 134 client swarmapi.ControlClient 135 logs swarmapi.LogsClient 136 reconnectDelay time.Duration 137 config nodeStartConfig 138} 139 140// nodeStartConfig holds configuration needed to start a new node. Exported 141// fields of this structure are saved to disk in json. Unexported fields 142// contain data that shouldn't be persisted between daemon reloads. 143type nodeStartConfig struct { 144 // LocalAddr is this machine's local IP or hostname, if specified. 145 LocalAddr string 146 // RemoteAddr is the address that was given to "swarm join". It is used 147 // to find LocalAddr if necessary. 148 RemoteAddr string 149 // ListenAddr is the address we bind to, including a port. 150 ListenAddr string 151 // AdvertiseAddr is the address other nodes should connect to, 152 // including a port. 153 AdvertiseAddr string 154 joinAddr string 155 forceNewCluster bool 156 joinToken string 157 lockKey []byte 158 autolock bool 159} 160 161// New creates a new Cluster instance using provided config. 162func New(config Config) (*Cluster, error) { 163 root := filepath.Join(config.Root, swarmDirName) 164 if err := os.MkdirAll(root, 0700); err != nil { 165 return nil, err 166 } 167 if config.RuntimeRoot == "" { 168 config.RuntimeRoot = root 169 } 170 if err := os.MkdirAll(config.RuntimeRoot, 0700); err != nil { 171 return nil, err 172 } 173 c := &Cluster{ 174 root: root, 175 config: config, 176 configEvent: make(chan struct{}, 10), 177 runtimeRoot: config.RuntimeRoot, 178 attachers: make(map[string]*attacher), 179 } 180 181 nodeConfig, err := c.loadState() 182 if err != nil { 183 if os.IsNotExist(err) { 184 return c, nil 185 } 186 return nil, err 187 } 188 189 n, err := c.startNewNode(*nodeConfig) 190 if err != nil { 191 return nil, err 192 } 193 194 select { 195 case <-time.After(swarmConnectTimeout): 196 logrus.Error("swarm component could not be started before timeout was reached") 197 case <-n.Ready(): 198 case <-n.done: 199 if errors.Cause(c.err) == ErrSwarmLocked { 200 return c, nil 201 } 202 if err, ok := errors.Cause(c.err).(x509.CertificateInvalidError); ok && err.Reason == x509.Expired { 203 c.err = ErrSwarmCertificatesExpired 204 return c, nil 205 } 206 return nil, fmt.Errorf("swarm component could not be started: %v", c.err) 207 } 208 go c.reconnectOnFailure(n) 209 return c, nil 210} 211 212func (c *Cluster) loadState() (*nodeStartConfig, error) { 213 dt, err := ioutil.ReadFile(filepath.Join(c.root, stateFile)) 214 if err != nil { 215 return nil, err 216 } 217 // missing certificate means no actual state to restore from 218 if _, err := os.Stat(filepath.Join(c.root, "certificates/swarm-node.crt")); err != nil { 219 if os.IsNotExist(err) { 220 c.clearState() 221 } 222 return nil, err 223 } 224 var st nodeStartConfig 225 if err := json.Unmarshal(dt, &st); err != nil { 226 return nil, err 227 } 228 return &st, nil 229} 230 231func (c *Cluster) saveState(config nodeStartConfig) error { 232 dt, err := json.Marshal(config) 233 if err != nil { 234 return err 235 } 236 return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600) 237} 238 239func (c *Cluster) reconnectOnFailure(n *node) { 240 for { 241 <-n.done 242 c.Lock() 243 if c.stop || c.node != nil { 244 c.Unlock() 245 return 246 } 247 n.reconnectDelay *= 2 248 if n.reconnectDelay > maxReconnectDelay { 249 n.reconnectDelay = maxReconnectDelay 250 } 251 logrus.Warnf("Restarting swarm in %.2f seconds", n.reconnectDelay.Seconds()) 252 delayCtx, cancel := context.WithTimeout(context.Background(), n.reconnectDelay) 253 c.cancelDelay = cancel 254 c.Unlock() 255 <-delayCtx.Done() 256 if delayCtx.Err() != context.DeadlineExceeded { 257 return 258 } 259 c.Lock() 260 if c.node != nil { 261 c.Unlock() 262 return 263 } 264 var err error 265 config := n.config 266 config.RemoteAddr = c.getRemoteAddress() 267 config.joinAddr = config.RemoteAddr 268 n, err = c.startNewNode(config) 269 if err != nil { 270 c.err = err 271 close(n.done) 272 } 273 c.Unlock() 274 } 275} 276 277func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) { 278 if err := c.config.Backend.IsSwarmCompatible(); err != nil { 279 return nil, err 280 } 281 282 actualLocalAddr := conf.LocalAddr 283 if actualLocalAddr == "" { 284 // If localAddr was not specified, resolve it automatically 285 // based on the route to joinAddr. localAddr can only be left 286 // empty on "join". 287 listenHost, _, err := net.SplitHostPort(conf.ListenAddr) 288 if err != nil { 289 return nil, fmt.Errorf("could not parse listen address: %v", err) 290 } 291 292 listenAddrIP := net.ParseIP(listenHost) 293 if listenAddrIP == nil || !listenAddrIP.IsUnspecified() { 294 actualLocalAddr = listenHost 295 } else { 296 if conf.RemoteAddr == "" { 297 // Should never happen except using swarms created by 298 // old versions that didn't save remoteAddr. 299 conf.RemoteAddr = "8.8.8.8:53" 300 } 301 conn, err := net.Dial("udp", conf.RemoteAddr) 302 if err != nil { 303 return nil, fmt.Errorf("could not find local IP address: %v", err) 304 } 305 localHostPort := conn.LocalAddr().String() 306 actualLocalAddr, _, _ = net.SplitHostPort(localHostPort) 307 conn.Close() 308 } 309 } 310 311 var control string 312 if runtime.GOOS == "windows" { 313 control = `\\.\pipe\` + controlSocket 314 } else { 315 control = filepath.Join(c.runtimeRoot, controlSocket) 316 } 317 318 c.node = nil 319 c.cancelDelay = nil 320 c.stop = false 321 n, err := swarmnode.New(&swarmnode.Config{ 322 Hostname: c.config.Name, 323 ForceNewCluster: conf.forceNewCluster, 324 ListenControlAPI: control, 325 ListenRemoteAPI: conf.ListenAddr, 326 AdvertiseRemoteAPI: conf.AdvertiseAddr, 327 JoinAddr: conf.joinAddr, 328 StateDir: c.root, 329 JoinToken: conf.joinToken, 330 Executor: container.NewExecutor(c.config.Backend), 331 HeartbeatTick: 1, 332 ElectionTick: 3, 333 UnlockKey: conf.lockKey, 334 AutoLockManagers: conf.autolock, 335 }) 336 337 if err != nil { 338 return nil, err 339 } 340 ctx := context.Background() 341 if err := n.Start(ctx); err != nil { 342 return nil, err 343 } 344 node := &node{ 345 Node: n, 346 done: make(chan struct{}), 347 reconnectDelay: initialReconnectDelay, 348 config: conf, 349 } 350 c.node = node 351 c.actualLocalAddr = actualLocalAddr // not saved 352 c.saveState(conf) 353 354 c.config.Backend.SetClusterProvider(c) 355 go func() { 356 err := detectLockedError(n.Err(ctx)) 357 if err != nil { 358 logrus.Errorf("cluster exited with error: %v", err) 359 } 360 c.Lock() 361 c.node = nil 362 c.err = err 363 if errors.Cause(err) == ErrSwarmLocked { 364 c.locked = true 365 confClone := conf 366 c.lastNodeConfig = &confClone 367 } 368 c.Unlock() 369 close(node.done) 370 }() 371 372 go func() { 373 select { 374 case <-n.Ready(): 375 c.Lock() 376 node.ready = true 377 c.err = nil 378 c.Unlock() 379 case <-ctx.Done(): 380 } 381 c.configEvent <- struct{}{} 382 }() 383 384 go func() { 385 for conn := range n.ListenControlSocket(ctx) { 386 c.Lock() 387 if node.conn != conn { 388 if conn == nil { 389 node.client = nil 390 node.logs = nil 391 } else { 392 node.client = swarmapi.NewControlClient(conn) 393 node.logs = swarmapi.NewLogsClient(conn) 394 } 395 } 396 node.conn = conn 397 c.Unlock() 398 c.configEvent <- struct{}{} 399 } 400 }() 401 402 return node, nil 403} 404 405// Init initializes new cluster from user provided request. 406func (c *Cluster) Init(req types.InitRequest) (string, error) { 407 c.Lock() 408 if c.swarmExists() { 409 if !req.ForceNewCluster { 410 c.Unlock() 411 return "", ErrSwarmExists 412 } 413 if err := c.stopNode(); err != nil { 414 c.Unlock() 415 return "", err 416 } 417 } 418 419 if err := validateAndSanitizeInitRequest(&req); err != nil { 420 c.Unlock() 421 return "", err 422 } 423 424 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 425 if err != nil { 426 c.Unlock() 427 return "", err 428 } 429 430 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 431 if err != nil { 432 c.Unlock() 433 return "", err 434 } 435 436 localAddr := listenHost 437 438 // If the local address is undetermined, the advertise address 439 // will be used as local address, if it belongs to this system. 440 // If the advertise address is not local, then we try to find 441 // a system address to use as local address. If this fails, 442 // we give up and ask user to pass the listen address. 443 if net.ParseIP(localAddr).IsUnspecified() { 444 advertiseIP := net.ParseIP(advertiseHost) 445 446 found := false 447 for _, systemIP := range listSystemIPs() { 448 if systemIP.Equal(advertiseIP) { 449 localAddr = advertiseIP.String() 450 found = true 451 break 452 } 453 } 454 455 if !found { 456 ip, err := c.resolveSystemAddr() 457 if err != nil { 458 c.Unlock() 459 logrus.Warnf("Could not find a local address: %v", err) 460 return "", errMustSpecifyListenAddr 461 } 462 localAddr = ip.String() 463 } 464 } 465 466 // todo: check current state existing 467 n, err := c.startNewNode(nodeStartConfig{ 468 forceNewCluster: req.ForceNewCluster, 469 autolock: req.AutoLockManagers, 470 LocalAddr: localAddr, 471 ListenAddr: net.JoinHostPort(listenHost, listenPort), 472 AdvertiseAddr: net.JoinHostPort(advertiseHost, advertisePort), 473 }) 474 if err != nil { 475 c.Unlock() 476 return "", err 477 } 478 c.Unlock() 479 480 select { 481 case <-n.Ready(): 482 if err := initClusterSpec(n, req.Spec); err != nil { 483 return "", err 484 } 485 go c.reconnectOnFailure(n) 486 return n.NodeID(), nil 487 case <-n.done: 488 c.RLock() 489 defer c.RUnlock() 490 if !req.ForceNewCluster { // if failure on first attempt don't keep state 491 if err := c.clearState(); err != nil { 492 return "", err 493 } 494 } 495 return "", c.err 496 } 497} 498 499// Join makes current Cluster part of an existing swarm cluster. 500func (c *Cluster) Join(req types.JoinRequest) error { 501 c.Lock() 502 if c.swarmExists() { 503 c.Unlock() 504 return ErrSwarmExists 505 } 506 if err := validateAndSanitizeJoinRequest(&req); err != nil { 507 c.Unlock() 508 return err 509 } 510 511 listenHost, listenPort, err := resolveListenAddr(req.ListenAddr) 512 if err != nil { 513 c.Unlock() 514 return err 515 } 516 517 var advertiseAddr string 518 if req.AdvertiseAddr != "" { 519 advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort) 520 // For joining, we don't need to provide an advertise address, 521 // since the remote side can detect it. 522 if err == nil { 523 advertiseAddr = net.JoinHostPort(advertiseHost, advertisePort) 524 } 525 } 526 527 // todo: check current state existing 528 n, err := c.startNewNode(nodeStartConfig{ 529 RemoteAddr: req.RemoteAddrs[0], 530 ListenAddr: net.JoinHostPort(listenHost, listenPort), 531 AdvertiseAddr: advertiseAddr, 532 joinAddr: req.RemoteAddrs[0], 533 joinToken: req.JoinToken, 534 }) 535 if err != nil { 536 c.Unlock() 537 return err 538 } 539 c.Unlock() 540 541 select { 542 case <-time.After(swarmConnectTimeout): 543 // attempt to connect will continue in background, but reconnect only if it didn't fail 544 go func() { 545 select { 546 case <-n.Ready(): 547 c.reconnectOnFailure(n) 548 case <-n.done: 549 logrus.Errorf("failed to join the cluster: %+v", c.err) 550 } 551 }() 552 return ErrSwarmJoinTimeoutReached 553 case <-n.Ready(): 554 go c.reconnectOnFailure(n) 555 return nil 556 case <-n.done: 557 c.RLock() 558 defer c.RUnlock() 559 return c.err 560 } 561} 562 563// GetUnlockKey returns the unlock key for the swarm. 564func (c *Cluster) GetUnlockKey() (string, error) { 565 c.RLock() 566 defer c.RUnlock() 567 568 if !c.isActiveManager() { 569 return "", c.errNoManager() 570 } 571 572 ctx, cancel := c.getRequestContext() 573 defer cancel() 574 575 client := swarmapi.NewCAClient(c.conn) 576 577 r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) 578 if err != nil { 579 return "", err 580 } 581 582 if len(r.UnlockKey) == 0 { 583 // no key 584 return "", nil 585 } 586 587 return encryption.HumanReadableKey(r.UnlockKey), nil 588} 589 590// UnlockSwarm provides a key to decrypt data that is encrypted at rest. 591func (c *Cluster) UnlockSwarm(req types.UnlockRequest) error { 592 c.RLock() 593 if !c.isActiveManager() { 594 if err := c.errNoManager(); err != ErrSwarmLocked { 595 c.RUnlock() 596 return err 597 } 598 } 599 600 if c.node != nil || c.locked != true { 601 c.RUnlock() 602 return errors.New("swarm is not locked") 603 } 604 c.RUnlock() 605 606 key, err := encryption.ParseHumanReadableKey(req.UnlockKey) 607 if err != nil { 608 return err 609 } 610 611 c.Lock() 612 config := *c.lastNodeConfig 613 config.lockKey = key 614 n, err := c.startNewNode(config) 615 if err != nil { 616 c.Unlock() 617 return err 618 } 619 c.Unlock() 620 select { 621 case <-n.Ready(): 622 case <-n.done: 623 if errors.Cause(c.err) == ErrSwarmLocked { 624 return errors.New("swarm could not be unlocked: invalid key provided") 625 } 626 return fmt.Errorf("swarm component could not be started: %v", c.err) 627 } 628 go c.reconnectOnFailure(n) 629 return nil 630} 631 632// stopNode is a helper that stops the active c.node and waits until it has 633// shut down. Call while keeping the cluster lock. 634func (c *Cluster) stopNode() error { 635 if c.node == nil { 636 return nil 637 } 638 c.stop = true 639 if c.cancelDelay != nil { 640 c.cancelDelay() 641 c.cancelDelay = nil 642 } 643 node := c.node 644 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) 645 defer cancel() 646 // TODO: can't hold lock on stop because it calls back to network 647 c.Unlock() 648 defer c.Lock() 649 if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { 650 return err 651 } 652 <-node.done 653 return nil 654} 655 656func removingManagerCausesLossOfQuorum(reachable, unreachable int) bool { 657 return reachable-2 <= unreachable 658} 659 660func isLastManager(reachable, unreachable int) bool { 661 return reachable == 1 && unreachable == 0 662} 663 664// Leave shuts down Cluster and removes current state. 665func (c *Cluster) Leave(force bool) error { 666 c.Lock() 667 node := c.node 668 if node == nil { 669 if c.locked { 670 c.locked = false 671 c.lastNodeConfig = nil 672 c.Unlock() 673 } else if c.err == ErrSwarmCertificatesExpired { 674 c.err = nil 675 c.Unlock() 676 } else { 677 c.Unlock() 678 return ErrNoSwarm 679 } 680 } else { 681 if node.Manager() != nil && !force { 682 msg := "You are attempting to leave the swarm on a node that is participating as a manager. " 683 if c.isActiveManager() { 684 active, reachable, unreachable, err := c.managerStats() 685 if err == nil { 686 if active && removingManagerCausesLossOfQuorum(reachable, unreachable) { 687 if isLastManager(reachable, unreachable) { 688 msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. " 689 c.Unlock() 690 return fmt.Errorf(msg) 691 } 692 msg += fmt.Sprintf("Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. ", reachable-1, reachable+unreachable) 693 } 694 } 695 } else { 696 msg += "Doing so may lose the consensus of your cluster. " 697 } 698 699 msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message." 700 c.Unlock() 701 return fmt.Errorf(msg) 702 } 703 if err := c.stopNode(); err != nil { 704 logrus.Errorf("failed to shut down cluster node: %v", err) 705 signal.DumpStacks("") 706 c.Unlock() 707 return err 708 } 709 c.Unlock() 710 if nodeID := node.NodeID(); nodeID != "" { 711 nodeContainers, err := c.listContainerForNode(nodeID) 712 if err != nil { 713 return err 714 } 715 for _, id := range nodeContainers { 716 if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { 717 logrus.Errorf("error removing %v: %v", id, err) 718 } 719 } 720 } 721 } 722 c.configEvent <- struct{}{} 723 // todo: cleanup optional? 724 if err := c.clearState(); err != nil { 725 return err 726 } 727 return nil 728} 729 730func (c *Cluster) listContainerForNode(nodeID string) ([]string, error) { 731 var ids []string 732 filters := filters.NewArgs() 733 filters.Add("label", fmt.Sprintf("com.docker.swarm.node.id=%s", nodeID)) 734 containers, err := c.config.Backend.Containers(&apitypes.ContainerListOptions{ 735 Filters: filters, 736 }) 737 if err != nil { 738 return []string{}, err 739 } 740 for _, c := range containers { 741 ids = append(ids, c.ID) 742 } 743 return ids, nil 744} 745 746func (c *Cluster) clearState() error { 747 // todo: backup this data instead of removing? 748 if err := os.RemoveAll(c.root); err != nil { 749 return err 750 } 751 if err := os.MkdirAll(c.root, 0700); err != nil { 752 return err 753 } 754 c.config.Backend.SetClusterProvider(nil) 755 return nil 756} 757 758func (c *Cluster) getRequestContext() (context.Context, func()) { // TODO: not needed when requests don't block on qourum lost 759 return context.WithTimeout(context.Background(), swarmRequestTimeout) 760} 761 762// Inspect retrieves the configuration properties of a managed swarm cluster. 763func (c *Cluster) Inspect() (types.Swarm, error) { 764 c.RLock() 765 defer c.RUnlock() 766 767 if !c.isActiveManager() { 768 return types.Swarm{}, c.errNoManager() 769 } 770 771 ctx, cancel := c.getRequestContext() 772 defer cancel() 773 774 swarm, err := getSwarm(ctx, c.client) 775 if err != nil { 776 return types.Swarm{}, err 777 } 778 779 return convert.SwarmFromGRPC(*swarm), nil 780} 781 782// Update updates configuration of a managed swarm cluster. 783func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { 784 c.RLock() 785 defer c.RUnlock() 786 787 if !c.isActiveManager() { 788 return c.errNoManager() 789 } 790 791 ctx, cancel := c.getRequestContext() 792 defer cancel() 793 794 swarm, err := getSwarm(ctx, c.client) 795 if err != nil { 796 return err 797 } 798 799 // In update, client should provide the complete spec of the swarm, including 800 // Name and Labels. If a field is specified with 0 or nil, then the default value 801 // will be used to swarmkit. 802 clusterSpec, err := convert.SwarmSpecToGRPC(spec) 803 if err != nil { 804 return err 805 } 806 807 _, err = c.client.UpdateCluster( 808 ctx, 809 &swarmapi.UpdateClusterRequest{ 810 ClusterID: swarm.ID, 811 Spec: &clusterSpec, 812 ClusterVersion: &swarmapi.Version{ 813 Index: version, 814 }, 815 Rotation: swarmapi.KeyRotation{ 816 WorkerJoinToken: flags.RotateWorkerToken, 817 ManagerJoinToken: flags.RotateManagerToken, 818 ManagerUnlockKey: flags.RotateManagerUnlockKey, 819 }, 820 }, 821 ) 822 return err 823} 824 825// IsManager returns true if Cluster is participating as a manager. 826func (c *Cluster) IsManager() bool { 827 c.RLock() 828 defer c.RUnlock() 829 return c.isActiveManager() 830} 831 832// IsAgent returns true if Cluster is participating as a worker/agent. 833func (c *Cluster) IsAgent() bool { 834 c.RLock() 835 defer c.RUnlock() 836 return c.node != nil && c.ready 837} 838 839// GetLocalAddress returns the local address. 840func (c *Cluster) GetLocalAddress() string { 841 c.RLock() 842 defer c.RUnlock() 843 return c.actualLocalAddr 844} 845 846// GetListenAddress returns the listen address. 847func (c *Cluster) GetListenAddress() string { 848 c.RLock() 849 defer c.RUnlock() 850 if c.node != nil { 851 return c.node.config.ListenAddr 852 } 853 return "" 854} 855 856// GetAdvertiseAddress returns the remotely reachable address of this node. 857func (c *Cluster) GetAdvertiseAddress() string { 858 c.RLock() 859 defer c.RUnlock() 860 if c.node != nil && c.node.config.AdvertiseAddr != "" { 861 advertiseHost, _, _ := net.SplitHostPort(c.node.config.AdvertiseAddr) 862 return advertiseHost 863 } 864 return c.actualLocalAddr 865} 866 867// GetRemoteAddress returns a known advertise address of a remote manager if 868// available. 869// todo: change to array/connect with info 870func (c *Cluster) GetRemoteAddress() string { 871 c.RLock() 872 defer c.RUnlock() 873 return c.getRemoteAddress() 874} 875 876func (c *Cluster) getRemoteAddress() string { 877 if c.node == nil { 878 return "" 879 } 880 nodeID := c.node.NodeID() 881 for _, r := range c.node.Remotes() { 882 if r.NodeID != nodeID { 883 return r.Addr 884 } 885 } 886 return "" 887} 888 889// ListenClusterEvents returns a channel that receives messages on cluster 890// participation changes. 891// todo: make cancelable and accessible to multiple callers 892func (c *Cluster) ListenClusterEvents() <-chan struct{} { 893 return c.configEvent 894} 895 896// Info returns information about the current cluster state. 897func (c *Cluster) Info() types.Info { 898 info := types.Info{ 899 NodeAddr: c.GetAdvertiseAddress(), 900 } 901 902 c.RLock() 903 defer c.RUnlock() 904 905 if c.node == nil { 906 info.LocalNodeState = types.LocalNodeStateInactive 907 if c.cancelDelay != nil { 908 info.LocalNodeState = types.LocalNodeStateError 909 } 910 if c.locked { 911 info.LocalNodeState = types.LocalNodeStateLocked 912 } else if c.err == ErrSwarmCertificatesExpired { 913 info.LocalNodeState = types.LocalNodeStateError 914 } 915 } else { 916 info.LocalNodeState = types.LocalNodeStatePending 917 if c.ready == true { 918 info.LocalNodeState = types.LocalNodeStateActive 919 } else if c.locked { 920 info.LocalNodeState = types.LocalNodeStateLocked 921 } 922 } 923 if c.err != nil { 924 info.Error = c.err.Error() 925 } 926 927 ctx, cancel := c.getRequestContext() 928 defer cancel() 929 930 if c.isActiveManager() { 931 info.ControlAvailable = true 932 swarm, err := c.Inspect() 933 if err != nil { 934 info.Error = err.Error() 935 } 936 937 // Strip JoinTokens 938 info.Cluster = swarm.ClusterInfo 939 940 if r, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}); err == nil { 941 info.Nodes = len(r.Nodes) 942 for _, n := range r.Nodes { 943 if n.ManagerStatus != nil { 944 info.Managers = info.Managers + 1 945 } 946 } 947 } 948 } 949 950 if c.node != nil { 951 for _, r := range c.node.Remotes() { 952 info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) 953 } 954 info.NodeID = c.node.NodeID() 955 } 956 957 return info 958} 959 960// isActiveManager should not be called without a read lock 961func (c *Cluster) isActiveManager() bool { 962 return c.node != nil && c.conn != nil 963} 964 965// swarmExists should not be called without a read lock 966func (c *Cluster) swarmExists() bool { 967 return c.node != nil || c.locked || c.err == ErrSwarmCertificatesExpired 968} 969 970// errNoManager returns error describing why manager commands can't be used. 971// Call with read lock. 972func (c *Cluster) errNoManager() error { 973 if c.node == nil { 974 if c.locked { 975 return ErrSwarmLocked 976 } 977 if c.err == ErrSwarmCertificatesExpired { 978 return ErrSwarmCertificatesExpired 979 } 980 return fmt.Errorf("This node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again.") 981 } 982 if c.node.Manager() != nil { 983 return fmt.Errorf("This node is not a swarm manager. Manager is being prepared or has trouble connecting to the cluster.") 984 } 985 return fmt.Errorf("This node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager.") 986} 987 988// GetServices returns all services of a managed swarm cluster. 989func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) { 990 c.RLock() 991 defer c.RUnlock() 992 993 if !c.isActiveManager() { 994 return nil, c.errNoManager() 995 } 996 997 filters, err := newListServicesFilters(options.Filters) 998 if err != nil { 999 return nil, err 1000 } 1001 ctx, cancel := c.getRequestContext() 1002 defer cancel() 1003 1004 r, err := c.client.ListServices( 1005 ctx, 1006 &swarmapi.ListServicesRequest{Filters: filters}) 1007 if err != nil { 1008 return nil, err 1009 } 1010 1011 services := []types.Service{} 1012 1013 for _, service := range r.Services { 1014 services = append(services, convert.ServiceFromGRPC(*service)) 1015 } 1016 1017 return services, nil 1018} 1019 1020// imageWithDigestString takes an image such as name or name:tag 1021// and returns the image pinned to a digest, such as name@sha256:34234... 1022// Due to the difference between the docker/docker/reference, and the 1023// docker/distribution/reference packages, we're parsing the image twice. 1024// As the two packages converge, this function should be simplified. 1025// TODO(nishanttotla): After the packages converge, the function must 1026// convert distreference.Named -> distreference.Canonical, and the logic simplified. 1027func (c *Cluster) imageWithDigestString(ctx context.Context, image string, authConfig *apitypes.AuthConfig) (string, error) { 1028 if _, err := digest.ParseDigest(image); err == nil { 1029 return "", errors.New("image reference is an image ID") 1030 } 1031 ref, err := distreference.ParseNamed(image) 1032 if err != nil { 1033 return "", err 1034 } 1035 // only query registry if not a canonical reference (i.e. with digest) 1036 if _, ok := ref.(distreference.Canonical); !ok { 1037 // create a docker/docker/reference Named object because GetRepository needs it 1038 dockerRef, err := reference.ParseNamed(image) 1039 if err != nil { 1040 return "", err 1041 } 1042 dockerRef = reference.WithDefaultTag(dockerRef) 1043 namedTaggedRef, ok := dockerRef.(reference.NamedTagged) 1044 if !ok { 1045 return "", fmt.Errorf("unable to cast image to NamedTagged reference object") 1046 } 1047 1048 repo, _, err := c.config.Backend.GetRepository(ctx, namedTaggedRef, authConfig) 1049 if err != nil { 1050 return "", err 1051 } 1052 dscrptr, err := repo.Tags(ctx).Get(ctx, namedTaggedRef.Tag()) 1053 if err != nil { 1054 return "", err 1055 } 1056 1057 namedDigestedRef, err := distreference.WithDigest(distreference.EnsureTagged(ref), dscrptr.Digest) 1058 if err != nil { 1059 return "", err 1060 } 1061 return namedDigestedRef.String(), nil 1062 } 1063 // reference already contains a digest, so just return it 1064 return ref.String(), nil 1065} 1066 1067// CreateService creates a new service in a managed swarm cluster. 1068func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) { 1069 c.RLock() 1070 defer c.RUnlock() 1071 1072 if !c.isActiveManager() { 1073 return nil, c.errNoManager() 1074 } 1075 1076 ctx, cancel := c.getRequestContext() 1077 defer cancel() 1078 1079 err := c.populateNetworkID(ctx, c.client, &s) 1080 if err != nil { 1081 return nil, err 1082 } 1083 1084 serviceSpec, err := convert.ServiceSpecToGRPC(s) 1085 if err != nil { 1086 return nil, err 1087 } 1088 1089 ctnr := serviceSpec.Task.GetContainer() 1090 if ctnr == nil { 1091 return nil, fmt.Errorf("service does not use container tasks") 1092 } 1093 1094 if encodedAuth != "" { 1095 ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} 1096 } 1097 1098 // retrieve auth config from encoded auth 1099 authConfig := &apitypes.AuthConfig{} 1100 if encodedAuth != "" { 1101 if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { 1102 logrus.Warnf("invalid authconfig: %v", err) 1103 } 1104 } 1105 1106 resp := &apitypes.ServiceCreateResponse{} 1107 1108 // pin image by digest 1109 if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { 1110 digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig) 1111 if err != nil { 1112 logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()) 1113 resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())) 1114 } else if ctnr.Image != digestImage { 1115 logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage) 1116 ctnr.Image = digestImage 1117 } else { 1118 logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image) 1119 } 1120 } 1121 1122 r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) 1123 if err != nil { 1124 return nil, err 1125 } 1126 1127 resp.ID = r.Service.ID 1128 return resp, nil 1129} 1130 1131// GetService returns a service based on an ID or name. 1132func (c *Cluster) GetService(input string) (types.Service, error) { 1133 c.RLock() 1134 defer c.RUnlock() 1135 1136 if !c.isActiveManager() { 1137 return types.Service{}, c.errNoManager() 1138 } 1139 1140 ctx, cancel := c.getRequestContext() 1141 defer cancel() 1142 1143 service, err := getService(ctx, c.client, input) 1144 if err != nil { 1145 return types.Service{}, err 1146 } 1147 return convert.ServiceFromGRPC(*service), nil 1148} 1149 1150// UpdateService updates existing service to match new properties. 1151func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) { 1152 c.RLock() 1153 defer c.RUnlock() 1154 1155 if !c.isActiveManager() { 1156 return nil, c.errNoManager() 1157 } 1158 1159 ctx, cancel := c.getRequestContext() 1160 defer cancel() 1161 1162 err := c.populateNetworkID(ctx, c.client, &spec) 1163 if err != nil { 1164 return nil, err 1165 } 1166 1167 serviceSpec, err := convert.ServiceSpecToGRPC(spec) 1168 if err != nil { 1169 return nil, err 1170 } 1171 1172 currentService, err := getService(ctx, c.client, serviceIDOrName) 1173 if err != nil { 1174 return nil, err 1175 } 1176 1177 newCtnr := serviceSpec.Task.GetContainer() 1178 if newCtnr == nil { 1179 return nil, fmt.Errorf("service does not use container tasks") 1180 } 1181 1182 if encodedAuth != "" { 1183 newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} 1184 } else { 1185 // this is needed because if the encodedAuth isn't being updated then we 1186 // shouldn't lose it, and continue to use the one that was already present 1187 var ctnr *swarmapi.ContainerSpec 1188 switch registryAuthFrom { 1189 case apitypes.RegistryAuthFromSpec, "": 1190 ctnr = currentService.Spec.Task.GetContainer() 1191 case apitypes.RegistryAuthFromPreviousSpec: 1192 if currentService.PreviousSpec == nil { 1193 return nil, fmt.Errorf("service does not have a previous spec") 1194 } 1195 ctnr = currentService.PreviousSpec.Task.GetContainer() 1196 default: 1197 return nil, fmt.Errorf("unsupported registryAuthFromValue") 1198 } 1199 if ctnr == nil { 1200 return nil, fmt.Errorf("service does not use container tasks") 1201 } 1202 newCtnr.PullOptions = ctnr.PullOptions 1203 // update encodedAuth so it can be used to pin image by digest 1204 if ctnr.PullOptions != nil { 1205 encodedAuth = ctnr.PullOptions.RegistryAuth 1206 } 1207 } 1208 1209 // retrieve auth config from encoded auth 1210 authConfig := &apitypes.AuthConfig{} 1211 if encodedAuth != "" { 1212 if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { 1213 logrus.Warnf("invalid authconfig: %v", err) 1214 } 1215 } 1216 1217 resp := &apitypes.ServiceUpdateResponse{} 1218 1219 // pin image by digest 1220 if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { 1221 digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig) 1222 if err != nil { 1223 logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()) 1224 resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())) 1225 } else if newCtnr.Image != digestImage { 1226 logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage) 1227 newCtnr.Image = digestImage 1228 } else { 1229 logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image) 1230 } 1231 } 1232 1233 _, err = c.client.UpdateService( 1234 ctx, 1235 &swarmapi.UpdateServiceRequest{ 1236 ServiceID: currentService.ID, 1237 Spec: &serviceSpec, 1238 ServiceVersion: &swarmapi.Version{ 1239 Index: version, 1240 }, 1241 }, 1242 ) 1243 1244 return resp, err 1245} 1246 1247// RemoveService removes a service from a managed swarm cluster. 1248func (c *Cluster) RemoveService(input string) error { 1249 c.RLock() 1250 defer c.RUnlock() 1251 1252 if !c.isActiveManager() { 1253 return c.errNoManager() 1254 } 1255 1256 ctx, cancel := c.getRequestContext() 1257 defer cancel() 1258 1259 service, err := getService(ctx, c.client, input) 1260 if err != nil { 1261 return err 1262 } 1263 1264 if _, err := c.client.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil { 1265 return err 1266 } 1267 return nil 1268} 1269 1270// ServiceLogs collects service logs and writes them back to `config.OutStream` 1271func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error { 1272 c.RLock() 1273 if !c.isActiveManager() { 1274 c.RUnlock() 1275 return c.errNoManager() 1276 } 1277 1278 service, err := getService(ctx, c.client, input) 1279 if err != nil { 1280 c.RUnlock() 1281 return err 1282 } 1283 1284 stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{ 1285 Selector: &swarmapi.LogSelector{ 1286 ServiceIDs: []string{service.ID}, 1287 }, 1288 Options: &swarmapi.LogSubscriptionOptions{ 1289 Follow: config.Follow, 1290 }, 1291 }) 1292 if err != nil { 1293 c.RUnlock() 1294 return err 1295 } 1296 1297 wf := ioutils.NewWriteFlusher(config.OutStream) 1298 defer wf.Close() 1299 close(started) 1300 wf.Flush() 1301 1302 outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout) 1303 errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) 1304 1305 // Release the lock before starting the stream. 1306 c.RUnlock() 1307 for { 1308 // Check the context before doing anything. 1309 select { 1310 case <-ctx.Done(): 1311 return ctx.Err() 1312 default: 1313 } 1314 1315 subscribeMsg, err := stream.Recv() 1316 if err == io.EOF { 1317 return nil 1318 } 1319 if err != nil { 1320 return err 1321 } 1322 1323 for _, msg := range subscribeMsg.Messages { 1324 data := []byte{} 1325 1326 if config.Timestamps { 1327 ts, err := ptypes.Timestamp(msg.Timestamp) 1328 if err != nil { 1329 return err 1330 } 1331 data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...) 1332 } 1333 1334 data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ", 1335 contextPrefix, msg.Context.NodeID, 1336 contextPrefix, msg.Context.ServiceID, 1337 contextPrefix, msg.Context.TaskID, 1338 ))...) 1339 1340 data = append(data, msg.Data...) 1341 1342 switch msg.Stream { 1343 case swarmapi.LogStreamStdout: 1344 outStream.Write(data) 1345 case swarmapi.LogStreamStderr: 1346 errStream.Write(data) 1347 } 1348 } 1349 } 1350} 1351 1352// GetNodes returns a list of all nodes known to a cluster. 1353func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) { 1354 c.RLock() 1355 defer c.RUnlock() 1356 1357 if !c.isActiveManager() { 1358 return nil, c.errNoManager() 1359 } 1360 1361 filters, err := newListNodesFilters(options.Filters) 1362 if err != nil { 1363 return nil, err 1364 } 1365 1366 ctx, cancel := c.getRequestContext() 1367 defer cancel() 1368 1369 r, err := c.client.ListNodes( 1370 ctx, 1371 &swarmapi.ListNodesRequest{Filters: filters}) 1372 if err != nil { 1373 return nil, err 1374 } 1375 1376 nodes := []types.Node{} 1377 1378 for _, node := range r.Nodes { 1379 nodes = append(nodes, convert.NodeFromGRPC(*node)) 1380 } 1381 return nodes, nil 1382} 1383 1384// GetNode returns a node based on an ID or name. 1385func (c *Cluster) GetNode(input string) (types.Node, error) { 1386 c.RLock() 1387 defer c.RUnlock() 1388 1389 if !c.isActiveManager() { 1390 return types.Node{}, c.errNoManager() 1391 } 1392 1393 ctx, cancel := c.getRequestContext() 1394 defer cancel() 1395 1396 node, err := getNode(ctx, c.client, input) 1397 if err != nil { 1398 return types.Node{}, err 1399 } 1400 return convert.NodeFromGRPC(*node), nil 1401} 1402 1403// UpdateNode updates existing nodes properties. 1404func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error { 1405 c.RLock() 1406 defer c.RUnlock() 1407 1408 if !c.isActiveManager() { 1409 return c.errNoManager() 1410 } 1411 1412 nodeSpec, err := convert.NodeSpecToGRPC(spec) 1413 if err != nil { 1414 return err 1415 } 1416 1417 ctx, cancel := c.getRequestContext() 1418 defer cancel() 1419 1420 currentNode, err := getNode(ctx, c.client, input) 1421 if err != nil { 1422 return err 1423 } 1424 1425 _, err = c.client.UpdateNode( 1426 ctx, 1427 &swarmapi.UpdateNodeRequest{ 1428 NodeID: currentNode.ID, 1429 Spec: &nodeSpec, 1430 NodeVersion: &swarmapi.Version{ 1431 Index: version, 1432 }, 1433 }, 1434 ) 1435 return err 1436} 1437 1438// RemoveNode removes a node from a cluster 1439func (c *Cluster) RemoveNode(input string, force bool) error { 1440 c.RLock() 1441 defer c.RUnlock() 1442 1443 if !c.isActiveManager() { 1444 return c.errNoManager() 1445 } 1446 1447 ctx, cancel := c.getRequestContext() 1448 defer cancel() 1449 1450 node, err := getNode(ctx, c.client, input) 1451 if err != nil { 1452 return err 1453 } 1454 1455 if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}); err != nil { 1456 return err 1457 } 1458 return nil 1459} 1460 1461// GetTasks returns a list of tasks matching the filter options. 1462func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) { 1463 c.RLock() 1464 defer c.RUnlock() 1465 1466 if !c.isActiveManager() { 1467 return nil, c.errNoManager() 1468 } 1469 1470 byName := func(filter filters.Args) error { 1471 if filter.Include("service") { 1472 serviceFilters := filter.Get("service") 1473 for _, serviceFilter := range serviceFilters { 1474 service, err := c.GetService(serviceFilter) 1475 if err != nil { 1476 return err 1477 } 1478 filter.Del("service", serviceFilter) 1479 filter.Add("service", service.ID) 1480 } 1481 } 1482 if filter.Include("node") { 1483 nodeFilters := filter.Get("node") 1484 for _, nodeFilter := range nodeFilters { 1485 node, err := c.GetNode(nodeFilter) 1486 if err != nil { 1487 return err 1488 } 1489 filter.Del("node", nodeFilter) 1490 filter.Add("node", node.ID) 1491 } 1492 } 1493 return nil 1494 } 1495 1496 filters, err := newListTasksFilters(options.Filters, byName) 1497 if err != nil { 1498 return nil, err 1499 } 1500 1501 ctx, cancel := c.getRequestContext() 1502 defer cancel() 1503 1504 r, err := c.client.ListTasks( 1505 ctx, 1506 &swarmapi.ListTasksRequest{Filters: filters}) 1507 if err != nil { 1508 return nil, err 1509 } 1510 1511 tasks := []types.Task{} 1512 1513 for _, task := range r.Tasks { 1514 if task.Spec.GetContainer() != nil { 1515 tasks = append(tasks, convert.TaskFromGRPC(*task)) 1516 } 1517 } 1518 return tasks, nil 1519} 1520 1521// GetTask returns a task by an ID. 1522func (c *Cluster) GetTask(input string) (types.Task, error) { 1523 c.RLock() 1524 defer c.RUnlock() 1525 1526 if !c.isActiveManager() { 1527 return types.Task{}, c.errNoManager() 1528 } 1529 1530 ctx, cancel := c.getRequestContext() 1531 defer cancel() 1532 1533 task, err := getTask(ctx, c.client, input) 1534 if err != nil { 1535 return types.Task{}, err 1536 } 1537 return convert.TaskFromGRPC(*task), nil 1538} 1539 1540// GetNetwork returns a cluster network by an ID. 1541func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) { 1542 c.RLock() 1543 defer c.RUnlock() 1544 1545 if !c.isActiveManager() { 1546 return apitypes.NetworkResource{}, c.errNoManager() 1547 } 1548 1549 ctx, cancel := c.getRequestContext() 1550 defer cancel() 1551 1552 network, err := getNetwork(ctx, c.client, input) 1553 if err != nil { 1554 return apitypes.NetworkResource{}, err 1555 } 1556 return convert.BasicNetworkFromGRPC(*network), nil 1557} 1558 1559func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([]apitypes.NetworkResource, error) { 1560 c.RLock() 1561 defer c.RUnlock() 1562 1563 if !c.isActiveManager() { 1564 return nil, c.errNoManager() 1565 } 1566 1567 ctx, cancel := c.getRequestContext() 1568 defer cancel() 1569 1570 r, err := c.client.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: filters}) 1571 if err != nil { 1572 return nil, err 1573 } 1574 1575 var networks []apitypes.NetworkResource 1576 1577 for _, network := range r.Networks { 1578 networks = append(networks, convert.BasicNetworkFromGRPC(*network)) 1579 } 1580 1581 return networks, nil 1582} 1583 1584// GetNetworks returns all current cluster managed networks. 1585func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) { 1586 return c.getNetworks(nil) 1587} 1588 1589// GetNetworksByName returns cluster managed networks by name. 1590// It is ok to have multiple networks here. #18864 1591func (c *Cluster) GetNetworksByName(name string) ([]apitypes.NetworkResource, error) { 1592 // Note that swarmapi.GetNetworkRequest.Name is not functional. 1593 // So we cannot just use that with c.GetNetwork. 1594 return c.getNetworks(&swarmapi.ListNetworksRequest_Filters{ 1595 Names: []string{name}, 1596 }) 1597} 1598 1599func attacherKey(target, containerID string) string { 1600 return containerID + ":" + target 1601} 1602 1603// UpdateAttachment signals the attachment config to the attachment 1604// waiter who is trying to start or attach the container to the 1605// network. 1606func (c *Cluster) UpdateAttachment(target, containerID string, config *network.NetworkingConfig) error { 1607 c.RLock() 1608 attacher, ok := c.attachers[attacherKey(target, containerID)] 1609 c.RUnlock() 1610 if !ok || attacher == nil { 1611 return fmt.Errorf("could not find attacher for container %s to network %s", containerID, target) 1612 } 1613 1614 attacher.attachWaitCh <- config 1615 close(attacher.attachWaitCh) 1616 return nil 1617} 1618 1619// WaitForDetachment waits for the container to stop or detach from 1620// the network. 1621func (c *Cluster) WaitForDetachment(ctx context.Context, networkName, networkID, taskID, containerID string) error { 1622 c.RLock() 1623 attacher, ok := c.attachers[attacherKey(networkName, containerID)] 1624 if !ok { 1625 attacher, ok = c.attachers[attacherKey(networkID, containerID)] 1626 } 1627 if c.node == nil || c.node.Agent() == nil { 1628 c.RUnlock() 1629 return fmt.Errorf("invalid cluster node while waiting for detachment") 1630 } 1631 1632 agent := c.node.Agent() 1633 c.RUnlock() 1634 1635 if ok && attacher != nil && 1636 attacher.detachWaitCh != nil && 1637 attacher.attachCompleteCh != nil { 1638 // Attachment may be in progress still so wait for 1639 // attachment to complete. 1640 select { 1641 case <-attacher.attachCompleteCh: 1642 case <-ctx.Done(): 1643 return ctx.Err() 1644 } 1645 1646 if attacher.taskID == taskID { 1647 select { 1648 case <-attacher.detachWaitCh: 1649 case <-ctx.Done(): 1650 return ctx.Err() 1651 } 1652 } 1653 } 1654 1655 return agent.ResourceAllocator().DetachNetwork(ctx, taskID) 1656} 1657 1658// AttachNetwork generates an attachment request towards the manager. 1659func (c *Cluster) AttachNetwork(target string, containerID string, addresses []string) (*network.NetworkingConfig, error) { 1660 aKey := attacherKey(target, containerID) 1661 c.Lock() 1662 if c.node == nil || c.node.Agent() == nil { 1663 c.Unlock() 1664 return nil, fmt.Errorf("invalid cluster node while attaching to network") 1665 } 1666 if attacher, ok := c.attachers[aKey]; ok { 1667 c.Unlock() 1668 return attacher.config, nil 1669 } 1670 1671 agent := c.node.Agent() 1672 attachWaitCh := make(chan *network.NetworkingConfig) 1673 detachWaitCh := make(chan struct{}) 1674 attachCompleteCh := make(chan struct{}) 1675 c.attachers[aKey] = &attacher{ 1676 attachWaitCh: attachWaitCh, 1677 attachCompleteCh: attachCompleteCh, 1678 detachWaitCh: detachWaitCh, 1679 } 1680 c.Unlock() 1681 1682 ctx, cancel := c.getRequestContext() 1683 defer cancel() 1684 1685 taskID, err := agent.ResourceAllocator().AttachNetwork(ctx, containerID, target, addresses) 1686 if err != nil { 1687 c.Lock() 1688 delete(c.attachers, aKey) 1689 c.Unlock() 1690 return nil, fmt.Errorf("Could not attach to network %s: %v", target, err) 1691 } 1692 1693 c.Lock() 1694 c.attachers[aKey].taskID = taskID 1695 close(attachCompleteCh) 1696 c.Unlock() 1697 1698 logrus.Debugf("Successfully attached to network %s with tid %s", target, taskID) 1699 1700 var config *network.NetworkingConfig 1701 select { 1702 case config = <-attachWaitCh: 1703 case <-ctx.Done(): 1704 return nil, fmt.Errorf("attaching to network failed, make sure your network options are correct and check manager logs: %v", ctx.Err()) 1705 } 1706 1707 c.Lock() 1708 c.attachers[aKey].config = config 1709 c.Unlock() 1710 return config, nil 1711} 1712 1713// DetachNetwork unblocks the waiters waiting on WaitForDetachment so 1714// that a request to detach can be generated towards the manager. 1715func (c *Cluster) DetachNetwork(target string, containerID string) error { 1716 aKey := attacherKey(target, containerID) 1717 1718 c.Lock() 1719 attacher, ok := c.attachers[aKey] 1720 delete(c.attachers, aKey) 1721 c.Unlock() 1722 1723 if !ok { 1724 return fmt.Errorf("could not find network attachment for container %s to network %s", containerID, target) 1725 } 1726 1727 close(attacher.detachWaitCh) 1728 return nil 1729} 1730 1731// CreateNetwork creates a new cluster managed network. 1732func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) { 1733 c.RLock() 1734 defer c.RUnlock() 1735 1736 if !c.isActiveManager() { 1737 return "", c.errNoManager() 1738 } 1739 1740 if runconfig.IsPreDefinedNetwork(s.Name) { 1741 err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name) 1742 return "", apierrors.NewRequestForbiddenError(err) 1743 } 1744 1745 ctx, cancel := c.getRequestContext() 1746 defer cancel() 1747 1748 networkSpec := convert.BasicNetworkCreateToGRPC(s) 1749 r, err := c.client.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) 1750 if err != nil { 1751 return "", err 1752 } 1753 1754 return r.Network.ID, nil 1755} 1756 1757// RemoveNetwork removes a cluster network. 1758func (c *Cluster) RemoveNetwork(input string) error { 1759 c.RLock() 1760 defer c.RUnlock() 1761 1762 if !c.isActiveManager() { 1763 return c.errNoManager() 1764 } 1765 1766 ctx, cancel := c.getRequestContext() 1767 defer cancel() 1768 1769 network, err := getNetwork(ctx, c.client, input) 1770 if err != nil { 1771 return err 1772 } 1773 1774 if _, err := c.client.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil { 1775 return err 1776 } 1777 return nil 1778} 1779 1780func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error { 1781 // Always prefer NetworkAttachmentConfigs from TaskTemplate 1782 // but fallback to service spec for backward compatibility 1783 networks := s.TaskTemplate.Networks 1784 if len(networks) == 0 { 1785 networks = s.Networks 1786 } 1787 1788 for i, n := range networks { 1789 apiNetwork, err := getNetwork(ctx, client, n.Target) 1790 if err != nil { 1791 if ln, _ := c.config.Backend.FindNetwork(n.Target); ln != nil && !ln.Info().Dynamic() { 1792 err = fmt.Errorf("The network %s cannot be used with services. Only networks scoped to the swarm can be used, such as those created with the overlay driver.", ln.Name()) 1793 return apierrors.NewRequestForbiddenError(err) 1794 } 1795 return err 1796 } 1797 networks[i].Target = apiNetwork.ID 1798 } 1799 return nil 1800} 1801 1802func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) { 1803 // GetNetwork to match via full ID. 1804 rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input}) 1805 if err != nil { 1806 // If any error (including NotFound), ListNetworks to match via ID prefix and full name. 1807 rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}}) 1808 if err != nil || len(rl.Networks) == 0 { 1809 rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}}) 1810 } 1811 1812 if err != nil { 1813 return nil, err 1814 } 1815 1816 if len(rl.Networks) == 0 { 1817 return nil, fmt.Errorf("network %s not found", input) 1818 } 1819 1820 if l := len(rl.Networks); l > 1 { 1821 return nil, fmt.Errorf("network %s is ambiguous (%d matches found)", input, l) 1822 } 1823 1824 return rl.Networks[0], nil 1825 } 1826 return rg.Network, nil 1827} 1828 1829// Cleanup stops active swarm node. This is run before daemon shutdown. 1830func (c *Cluster) Cleanup() { 1831 c.Lock() 1832 node := c.node 1833 if node == nil { 1834 c.Unlock() 1835 return 1836 } 1837 defer c.Unlock() 1838 if c.isActiveManager() { 1839 active, reachable, unreachable, err := c.managerStats() 1840 if err == nil { 1841 singlenode := active && isLastManager(reachable, unreachable) 1842 if active && !singlenode && removingManagerCausesLossOfQuorum(reachable, unreachable) { 1843 logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable) 1844 } 1845 } 1846 } 1847 c.stopNode() 1848} 1849 1850func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) { 1851 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 1852 defer cancel() 1853 nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}) 1854 if err != nil { 1855 return false, 0, 0, err 1856 } 1857 for _, n := range nodes.Nodes { 1858 if n.ManagerStatus != nil { 1859 if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_REACHABLE { 1860 reachable++ 1861 if n.ID == c.node.NodeID() { 1862 current = true 1863 } 1864 } 1865 if n.ManagerStatus.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE { 1866 unreachable++ 1867 } 1868 } 1869 } 1870 return 1871} 1872 1873func validateAndSanitizeInitRequest(req *types.InitRequest) error { 1874 var err error 1875 req.ListenAddr, err = validateAddr(req.ListenAddr) 1876 if err != nil { 1877 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 1878 } 1879 1880 if req.Spec.Annotations.Name == "" { 1881 req.Spec.Annotations.Name = "default" 1882 } else if req.Spec.Annotations.Name != "default" { 1883 return errors.New(`swarm spec must be named "default"`) 1884 } 1885 1886 return nil 1887} 1888 1889func validateAndSanitizeJoinRequest(req *types.JoinRequest) error { 1890 var err error 1891 req.ListenAddr, err = validateAddr(req.ListenAddr) 1892 if err != nil { 1893 return fmt.Errorf("invalid ListenAddr %q: %v", req.ListenAddr, err) 1894 } 1895 if len(req.RemoteAddrs) == 0 { 1896 return fmt.Errorf("at least 1 RemoteAddr is required to join") 1897 } 1898 for i := range req.RemoteAddrs { 1899 req.RemoteAddrs[i], err = validateAddr(req.RemoteAddrs[i]) 1900 if err != nil { 1901 return fmt.Errorf("invalid remoteAddr %q: %v", req.RemoteAddrs[i], err) 1902 } 1903 } 1904 return nil 1905} 1906 1907func validateAddr(addr string) (string, error) { 1908 if addr == "" { 1909 return addr, fmt.Errorf("invalid empty address") 1910 } 1911 newaddr, err := opts.ParseTCPAddr(addr, defaultAddr) 1912 if err != nil { 1913 return addr, nil 1914 } 1915 return strings.TrimPrefix(newaddr, "tcp://"), nil 1916} 1917 1918func initClusterSpec(node *node, spec types.Spec) error { 1919 ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) 1920 for conn := range node.ListenControlSocket(ctx) { 1921 if ctx.Err() != nil { 1922 return ctx.Err() 1923 } 1924 if conn != nil { 1925 client := swarmapi.NewControlClient(conn) 1926 var cluster *swarmapi.Cluster 1927 for i := 0; ; i++ { 1928 lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{}) 1929 if err != nil { 1930 return fmt.Errorf("error on listing clusters: %v", err) 1931 } 1932 if len(lcr.Clusters) == 0 { 1933 if i < 10 { 1934 time.Sleep(200 * time.Millisecond) 1935 continue 1936 } 1937 return fmt.Errorf("empty list of clusters was returned") 1938 } 1939 cluster = lcr.Clusters[0] 1940 break 1941 } 1942 // In init, we take the initial default values from swarmkit, and merge 1943 // any non nil or 0 value from spec to GRPC spec. This will leave the 1944 // default value alone. 1945 // Note that this is different from Update(), as in Update() we expect 1946 // user to specify the complete spec of the cluster (as they already know 1947 // the existing one and knows which field to update) 1948 clusterSpec, err := convert.MergeSwarmSpecToGRPC(spec, cluster.Spec) 1949 if err != nil { 1950 return fmt.Errorf("error updating cluster settings: %v", err) 1951 } 1952 _, err = client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{ 1953 ClusterID: cluster.ID, 1954 ClusterVersion: &cluster.Meta.Version, 1955 Spec: &clusterSpec, 1956 }) 1957 if err != nil { 1958 return fmt.Errorf("error updating cluster settings: %v", err) 1959 } 1960 return nil 1961 } 1962 } 1963 return ctx.Err() 1964} 1965 1966func detectLockedError(err error) error { 1967 if err == swarmnode.ErrInvalidUnlockKey { 1968 return errors.WithStack(ErrSwarmLocked) 1969 } 1970 return err 1971} 1972