1package container 2 3import ( 4 "fmt" 5 "sort" 6 "strings" 7 "sync" 8 9 "github.com/docker/docker/api/types" 10 "github.com/docker/docker/api/types/filters" 11 "github.com/docker/docker/api/types/network" 12 swarmtypes "github.com/docker/docker/api/types/swarm" 13 "github.com/docker/docker/daemon/cluster/controllers/plugin" 14 "github.com/docker/docker/daemon/cluster/convert" 15 executorpkg "github.com/docker/docker/daemon/cluster/executor" 16 clustertypes "github.com/docker/docker/daemon/cluster/provider" 17 networktypes "github.com/docker/libnetwork/types" 18 "github.com/docker/swarmkit/agent" 19 "github.com/docker/swarmkit/agent/exec" 20 "github.com/docker/swarmkit/api" 21 "github.com/docker/swarmkit/api/naming" 22 "github.com/sirupsen/logrus" 23 "golang.org/x/net/context" 24) 25 26type executor struct { 27 backend executorpkg.Backend 28 pluginBackend plugin.Backend 29 dependencies exec.DependencyManager 30 mutex sync.Mutex // This mutex protects the following node field 31 node *api.NodeDescription 32} 33 34// NewExecutor returns an executor from the docker client. 35func NewExecutor(b executorpkg.Backend, p plugin.Backend) exec.Executor { 36 return &executor{ 37 backend: b, 38 pluginBackend: p, 39 dependencies: agent.NewDependencyManager(), 40 } 41} 42 43// Describe returns the underlying node description from the docker client. 44func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) { 45 info, err := e.backend.SystemInfo() 46 if err != nil { 47 return nil, err 48 } 49 50 plugins := map[api.PluginDescription]struct{}{} 51 addPlugins := func(typ string, names []string) { 52 for _, name := range names { 53 plugins[api.PluginDescription{ 54 Type: typ, 55 Name: name, 56 }] = struct{}{} 57 } 58 } 59 60 // add v1 plugins 61 addPlugins("Volume", info.Plugins.Volume) 62 // Add builtin driver "overlay" (the only builtin multi-host driver) to 63 // the plugin list by default. 64 addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...)) 65 addPlugins("Authorization", info.Plugins.Authorization) 66 addPlugins("Log", info.Plugins.Log) 67 68 // add v2 plugins 69 v2Plugins, err := e.backend.PluginManager().List(filters.NewArgs()) 70 if err == nil { 71 for _, plgn := range v2Plugins { 72 for _, typ := range plgn.Config.Interface.Types { 73 if typ.Prefix != "docker" || !plgn.Enabled { 74 continue 75 } 76 plgnTyp := typ.Capability 77 switch typ.Capability { 78 case "volumedriver": 79 plgnTyp = "Volume" 80 case "networkdriver": 81 plgnTyp = "Network" 82 case "logdriver": 83 plgnTyp = "Log" 84 } 85 86 plugins[api.PluginDescription{ 87 Type: plgnTyp, 88 Name: plgn.Name, 89 }] = struct{}{} 90 } 91 } 92 } 93 94 pluginFields := make([]api.PluginDescription, 0, len(plugins)) 95 for k := range plugins { 96 pluginFields = append(pluginFields, k) 97 } 98 99 sort.Sort(sortedPlugins(pluginFields)) 100 101 // parse []string labels into a map[string]string 102 labels := map[string]string{} 103 for _, l := range info.Labels { 104 stringSlice := strings.SplitN(l, "=", 2) 105 // this will take the last value in the list for a given key 106 // ideally, one shouldn't assign multiple values to the same key 107 if len(stringSlice) > 1 { 108 labels[stringSlice[0]] = stringSlice[1] 109 } 110 } 111 112 description := &api.NodeDescription{ 113 Hostname: info.Name, 114 Platform: &api.Platform{ 115 Architecture: info.Architecture, 116 OS: info.OSType, 117 }, 118 Engine: &api.EngineDescription{ 119 EngineVersion: info.ServerVersion, 120 Labels: labels, 121 Plugins: pluginFields, 122 }, 123 Resources: &api.Resources{ 124 NanoCPUs: int64(info.NCPU) * 1e9, 125 MemoryBytes: info.MemTotal, 126 Generic: convert.GenericResourcesToGRPC(info.GenericResources), 127 }, 128 } 129 130 // Save the node information in the executor field 131 e.mutex.Lock() 132 e.node = description 133 e.mutex.Unlock() 134 135 return description, nil 136} 137 138func (e *executor) Configure(ctx context.Context, node *api.Node) error { 139 var ingressNA *api.NetworkAttachment 140 attachments := make(map[string]string) 141 142 for _, na := range node.Attachments { 143 if na.Network.Spec.Ingress { 144 ingressNA = na 145 } 146 attachments[na.Network.ID] = na.Addresses[0] 147 } 148 149 if ingressNA == nil { 150 e.backend.ReleaseIngress() 151 return e.backend.GetAttachmentStore().ResetAttachments(attachments) 152 } 153 154 options := types.NetworkCreate{ 155 Driver: ingressNA.Network.DriverState.Name, 156 IPAM: &network.IPAM{ 157 Driver: ingressNA.Network.IPAM.Driver.Name, 158 }, 159 Options: ingressNA.Network.DriverState.Options, 160 Ingress: true, 161 CheckDuplicate: true, 162 } 163 164 for _, ic := range ingressNA.Network.IPAM.Configs { 165 c := network.IPAMConfig{ 166 Subnet: ic.Subnet, 167 IPRange: ic.Range, 168 Gateway: ic.Gateway, 169 } 170 options.IPAM.Config = append(options.IPAM.Config, c) 171 } 172 173 _, err := e.backend.SetupIngress(clustertypes.NetworkCreateRequest{ 174 ID: ingressNA.Network.ID, 175 NetworkCreateRequest: types.NetworkCreateRequest{ 176 Name: ingressNA.Network.Spec.Annotations.Name, 177 NetworkCreate: options, 178 }, 179 }, ingressNA.Addresses[0]) 180 if err != nil { 181 return err 182 } 183 184 return e.backend.GetAttachmentStore().ResetAttachments(attachments) 185} 186 187// Controller returns a docker container runner. 188func (e *executor) Controller(t *api.Task) (exec.Controller, error) { 189 dependencyGetter := agent.Restrict(e.dependencies, t) 190 191 // Get the node description from the executor field 192 e.mutex.Lock() 193 nodeDescription := e.node 194 e.mutex.Unlock() 195 196 if t.Spec.GetAttachment() != nil { 197 return newNetworkAttacherController(e.backend, t, nodeDescription, dependencyGetter) 198 } 199 200 var ctlr exec.Controller 201 switch r := t.Spec.GetRuntime().(type) { 202 case *api.TaskSpec_Generic: 203 logrus.WithFields(logrus.Fields{ 204 "kind": r.Generic.Kind, 205 "type_url": r.Generic.Payload.TypeUrl, 206 }).Debug("custom runtime requested") 207 runtimeKind, err := naming.Runtime(t.Spec) 208 if err != nil { 209 return ctlr, err 210 } 211 switch runtimeKind { 212 case string(swarmtypes.RuntimePlugin): 213 info, _ := e.backend.SystemInfo() 214 if !info.ExperimentalBuild { 215 return ctlr, fmt.Errorf("runtime type %q only supported in experimental", swarmtypes.RuntimePlugin) 216 } 217 c, err := plugin.NewController(e.pluginBackend, t) 218 if err != nil { 219 return ctlr, err 220 } 221 ctlr = c 222 default: 223 return ctlr, fmt.Errorf("unsupported runtime type: %q", runtimeKind) 224 } 225 case *api.TaskSpec_Container: 226 c, err := newController(e.backend, t, nodeDescription, dependencyGetter) 227 if err != nil { 228 return ctlr, err 229 } 230 ctlr = c 231 default: 232 return ctlr, fmt.Errorf("unsupported runtime: %q", r) 233 } 234 235 return ctlr, nil 236} 237 238func (e *executor) SetNetworkBootstrapKeys(keys []*api.EncryptionKey) error { 239 nwKeys := []*networktypes.EncryptionKey{} 240 for _, key := range keys { 241 nwKey := &networktypes.EncryptionKey{ 242 Subsystem: key.Subsystem, 243 Algorithm: int32(key.Algorithm), 244 Key: make([]byte, len(key.Key)), 245 LamportTime: key.LamportTime, 246 } 247 copy(nwKey.Key, key.Key) 248 nwKeys = append(nwKeys, nwKey) 249 } 250 e.backend.SetNetworkBootstrapKeys(nwKeys) 251 252 return nil 253} 254 255func (e *executor) Secrets() exec.SecretsManager { 256 return e.dependencies.Secrets() 257} 258 259func (e *executor) Configs() exec.ConfigsManager { 260 return e.dependencies.Configs() 261} 262 263type sortedPlugins []api.PluginDescription 264 265func (sp sortedPlugins) Len() int { return len(sp) } 266 267func (sp sortedPlugins) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] } 268 269func (sp sortedPlugins) Less(i, j int) bool { 270 if sp[i].Type != sp[j].Type { 271 return sp[i].Type < sp[j].Type 272 } 273 return sp[i].Name < sp[j].Name 274} 275