1// Copyright 2016 go-dockerclient authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package testing 6 7import ( 8 "bytes" 9 "encoding/json" 10 "errors" 11 "fmt" 12 "io" 13 "io/ioutil" 14 "math/rand" 15 "net" 16 "net/http" 17 "strings" 18 "time" 19 20 "github.com/docker/docker/api/types/swarm" 21 "github.com/fsouza/go-dockerclient" 22 "github.com/gorilla/mux" 23) 24 25type swarmServer struct { 26 srv *DockerServer 27 mux *mux.Router 28 listener net.Listener 29} 30 31func newSwarmServer(srv *DockerServer, bind string) (*swarmServer, error) { 32 listener, err := net.Listen("tcp", bind) 33 if err != nil { 34 return nil, err 35 } 36 router := mux.NewRouter() 37 router.Path("/internal/updatenodes").Methods("POST").HandlerFunc(srv.handlerWrapper(srv.internalUpdateNodes)) 38 server := &swarmServer{ 39 listener: listener, 40 mux: router, 41 srv: srv, 42 } 43 go http.Serve(listener, router) 44 return server, nil 45} 46 47func (s *swarmServer) URL() string { 48 if s.listener == nil { 49 return "" 50 } 51 return "http://" + s.listener.Addr().String() + "/" 52} 53 54// MutateTask changes a task, returning an error if the given id does not match 55// to any task in the server. 56func (s *DockerServer) MutateTask(id string, newTask swarm.Task) error { 57 s.swarmMut.Lock() 58 defer s.swarmMut.Unlock() 59 for i, task := range s.tasks { 60 if task.ID == id { 61 s.tasks[i] = &newTask 62 return nil 63 } 64 } 65 return errors.New("task not found") 66} 67 68func (s *DockerServer) swarmInit(w http.ResponseWriter, r *http.Request) { 69 s.swarmMut.Lock() 70 defer s.swarmMut.Unlock() 71 if s.swarm != nil { 72 w.WriteHeader(http.StatusNotAcceptable) 73 return 74 } 75 var req swarm.InitRequest 76 err := json.NewDecoder(r.Body).Decode(&req) 77 if err != nil && err != io.EOF { 78 http.Error(w, err.Error(), http.StatusInternalServerError) 79 return 80 } 81 node, err := s.initSwarmNode(req.ListenAddr, req.AdvertiseAddr) 82 if err != nil { 83 http.Error(w, err.Error(), http.StatusInternalServerError) 84 return 85 } 86 node.ManagerStatus.Leader = true 87 err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{ 88 Op: "add", 89 Node: node, 90 }) 91 if err != nil { 92 http.Error(w, err.Error(), http.StatusInternalServerError) 93 return 94 } 95 s.swarm = &swarm.Swarm{ 96 JoinTokens: swarm.JoinTokens{ 97 Manager: s.generateID(), 98 Worker: s.generateID(), 99 }, 100 } 101 w.Header().Set("Content-Type", "application/json") 102 err = json.NewEncoder(w).Encode(s.nodeID) 103 if err != nil { 104 http.Error(w, err.Error(), http.StatusInternalServerError) 105 } 106} 107 108func (s *DockerServer) swarmInspect(w http.ResponseWriter, r *http.Request) { 109 s.swarmMut.Lock() 110 defer s.swarmMut.Unlock() 111 if s.swarm == nil { 112 w.WriteHeader(http.StatusNotAcceptable) 113 } else { 114 w.WriteHeader(http.StatusOK) 115 w.Header().Set("Content-Type", "application/json") 116 json.NewEncoder(w).Encode(s.swarm) 117 } 118} 119 120func (s *DockerServer) swarmJoin(w http.ResponseWriter, r *http.Request) { 121 s.swarmMut.Lock() 122 defer s.swarmMut.Unlock() 123 if s.swarm != nil { 124 w.WriteHeader(http.StatusNotAcceptable) 125 return 126 } 127 var req swarm.JoinRequest 128 err := json.NewDecoder(r.Body).Decode(&req) 129 if err != nil { 130 http.Error(w, err.Error(), http.StatusInternalServerError) 131 return 132 } 133 if len(req.RemoteAddrs) == 0 { 134 w.WriteHeader(http.StatusBadRequest) 135 return 136 } 137 node, err := s.initSwarmNode(req.ListenAddr, req.AdvertiseAddr) 138 if err != nil { 139 http.Error(w, err.Error(), http.StatusInternalServerError) 140 return 141 } 142 s.swarm = &swarm.Swarm{ 143 JoinTokens: swarm.JoinTokens{ 144 Manager: s.generateID(), 145 Worker: s.generateID(), 146 }, 147 } 148 s.swarmMut.Unlock() 149 err = s.runNodeOperation(fmt.Sprintf("http://%s", req.RemoteAddrs[0]), nodeOperation{ 150 Op: "add", 151 Node: node, 152 forceLock: true, 153 }) 154 s.swarmMut.Lock() 155 if err != nil { 156 http.Error(w, err.Error(), http.StatusInternalServerError) 157 return 158 } 159 w.WriteHeader(http.StatusOK) 160} 161 162func (s *DockerServer) swarmLeave(w http.ResponseWriter, r *http.Request) { 163 s.swarmMut.Lock() 164 defer s.swarmMut.Unlock() 165 if s.swarm == nil { 166 w.WriteHeader(http.StatusNotAcceptable) 167 } else { 168 s.swarmServer.listener.Close() 169 s.swarm = nil 170 s.nodes = nil 171 s.swarmServer = nil 172 s.nodeID = "" 173 w.WriteHeader(http.StatusOK) 174 } 175} 176 177func (s *DockerServer) containerForService(srv *swarm.Service, name string) *docker.Container { 178 hostConfig := docker.HostConfig{} 179 dockerConfig := docker.Config{ 180 Entrypoint: srv.Spec.TaskTemplate.ContainerSpec.Command, 181 Cmd: srv.Spec.TaskTemplate.ContainerSpec.Args, 182 Env: srv.Spec.TaskTemplate.ContainerSpec.Env, 183 } 184 return &docker.Container{ 185 ID: s.generateID(), 186 Name: name, 187 Image: srv.Spec.TaskTemplate.ContainerSpec.Image, 188 Created: time.Now(), 189 Config: &dockerConfig, 190 HostConfig: &hostConfig, 191 State: docker.State{ 192 Running: true, 193 StartedAt: time.Now(), 194 Pid: rand.Int() % 50000, 195 ExitCode: 0, 196 }, 197 } 198} 199 200func (s *DockerServer) serviceCreate(w http.ResponseWriter, r *http.Request) { 201 var config swarm.ServiceSpec 202 defer r.Body.Close() 203 err := json.NewDecoder(r.Body).Decode(&config) 204 if err != nil { 205 http.Error(w, err.Error(), http.StatusInternalServerError) 206 return 207 } 208 s.cMut.Lock() 209 defer s.cMut.Unlock() 210 s.swarmMut.Lock() 211 defer s.swarmMut.Unlock() 212 if len(s.nodes) == 0 || s.swarm == nil { 213 http.Error(w, "no swarm nodes available", http.StatusNotAcceptable) 214 return 215 } 216 if config.Name == "" { 217 config.Name = s.generateID() 218 } 219 for _, s := range s.services { 220 if s.Spec.Name == config.Name { 221 http.Error(w, "there's already a service with this name", http.StatusConflict) 222 return 223 } 224 } 225 service := swarm.Service{ 226 ID: s.generateID(), 227 Spec: config, 228 } 229 s.setServiceEndpoint(&service) 230 s.addTasks(&service, false) 231 s.services = append(s.services, &service) 232 err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{}) 233 if err != nil { 234 http.Error(w, err.Error(), http.StatusInternalServerError) 235 return 236 } 237 w.WriteHeader(http.StatusOK) 238 json.NewEncoder(w).Encode(service) 239} 240 241func (s *DockerServer) setServiceEndpoint(service *swarm.Service) { 242 if service.Spec.EndpointSpec == nil { 243 return 244 } 245 service.Endpoint = swarm.Endpoint{ 246 Spec: *service.Spec.EndpointSpec, 247 } 248 for _, port := range service.Spec.EndpointSpec.Ports { 249 if port.PublishedPort == 0 { 250 port.PublishedPort = uint32(30000 + s.servicePorts) 251 s.servicePorts++ 252 } 253 service.Endpoint.Ports = append(service.Endpoint.Ports, port) 254 } 255} 256 257func (s *DockerServer) addTasks(service *swarm.Service, update bool) { 258 if service.Spec.TaskTemplate.ContainerSpec == nil { 259 return 260 } 261 containerCount := 1 262 if service.Spec.Mode.Global != nil { 263 containerCount = len(s.nodes) 264 } else if repl := service.Spec.Mode.Replicated; repl != nil { 265 if repl.Replicas != nil { 266 containerCount = int(*repl.Replicas) 267 } 268 } 269 for i := 0; i < containerCount; i++ { 270 name := fmt.Sprintf("%s-%d", service.Spec.Name, i) 271 if update { 272 name = fmt.Sprintf("%s-%d-updated", service.Spec.Name, i) 273 } 274 container := s.containerForService(service, name) 275 chosenNode := s.nodes[s.nodeRR] 276 s.nodeRR = (s.nodeRR + 1) % len(s.nodes) 277 task := swarm.Task{ 278 ID: s.generateID(), 279 ServiceID: service.ID, 280 NodeID: chosenNode.ID, 281 Status: swarm.TaskStatus{ 282 State: swarm.TaskStateReady, 283 ContainerStatus: &swarm.ContainerStatus{ 284 ContainerID: container.ID, 285 }, 286 }, 287 DesiredState: swarm.TaskStateReady, 288 Spec: service.Spec.TaskTemplate, 289 } 290 s.tasks = append(s.tasks, &task) 291 s.addContainer(container) 292 s.notify(container) 293 } 294} 295 296func (s *DockerServer) serviceInspect(w http.ResponseWriter, r *http.Request) { 297 s.swarmMut.Lock() 298 defer s.swarmMut.Unlock() 299 if s.swarm == nil { 300 w.WriteHeader(http.StatusNotAcceptable) 301 return 302 } 303 id := mux.Vars(r)["id"] 304 for _, srv := range s.services { 305 if srv.ID == id || srv.Spec.Name == id { 306 json.NewEncoder(w).Encode(srv) 307 return 308 } 309 } 310 http.Error(w, "service not found", http.StatusNotFound) 311} 312 313func (s *DockerServer) taskInspect(w http.ResponseWriter, r *http.Request) { 314 s.swarmMut.Lock() 315 defer s.swarmMut.Unlock() 316 if s.swarm == nil { 317 w.WriteHeader(http.StatusNotAcceptable) 318 return 319 } 320 id := mux.Vars(r)["id"] 321 for _, task := range s.tasks { 322 if task.ID == id { 323 json.NewEncoder(w).Encode(task) 324 return 325 } 326 } 327 http.Error(w, "task not found", http.StatusNotFound) 328} 329 330func (s *DockerServer) serviceList(w http.ResponseWriter, r *http.Request) { 331 s.swarmMut.Lock() 332 defer s.swarmMut.Unlock() 333 if s.swarm == nil { 334 w.WriteHeader(http.StatusNotAcceptable) 335 return 336 } 337 filtersRaw := r.FormValue("filters") 338 var filters map[string][]string 339 json.Unmarshal([]byte(filtersRaw), &filters) 340 if filters == nil { 341 json.NewEncoder(w).Encode(s.services) 342 return 343 } 344 var ret []*swarm.Service 345 for i, srv := range s.services { 346 if inFilter(filters["id"], srv.ID) && 347 inFilter(filters["name"], srv.Spec.Name) { 348 ret = append(ret, s.services[i]) 349 } 350 } 351 json.NewEncoder(w).Encode(ret) 352} 353 354func (s *DockerServer) taskList(w http.ResponseWriter, r *http.Request) { 355 s.swarmMut.Lock() 356 defer s.swarmMut.Unlock() 357 if s.swarm == nil { 358 w.WriteHeader(http.StatusNotAcceptable) 359 return 360 } 361 filtersRaw := r.FormValue("filters") 362 var filters map[string][]string 363 json.Unmarshal([]byte(filtersRaw), &filters) 364 if filters == nil { 365 json.NewEncoder(w).Encode(s.tasks) 366 return 367 } 368 var ret []*swarm.Task 369 for i, task := range s.tasks { 370 var srv *swarm.Service 371 for _, srv = range s.services { 372 if task.ServiceID == srv.ID { 373 break 374 } 375 } 376 if srv == nil { 377 http.Error(w, "service not found", http.StatusNotFound) 378 return 379 } 380 if inFilter(filters["id"], task.ID) && 381 (inFilter(filters["service"], task.ServiceID) || 382 inFilter(filters["service"], srv.Spec.Annotations.Name)) && 383 inFilter(filters["node"], task.NodeID) && 384 inFilter(filters["desired-state"], string(task.DesiredState)) && 385 inLabelFilter(filters["label"], srv.Spec.Annotations.Labels) { 386 ret = append(ret, s.tasks[i]) 387 } 388 } 389 json.NewEncoder(w).Encode(ret) 390} 391 392func inLabelFilter(list []string, labels map[string]string) bool { 393 if len(list) == 0 { 394 return true 395 } 396 for _, item := range list { 397 parts := strings.Split(item, "=") 398 key := parts[0] 399 if val, ok := labels[key]; ok { 400 if len(parts) > 1 && val != parts[1] { 401 continue 402 } 403 return true 404 } 405 } 406 return false 407} 408 409func inFilter(list []string, wanted string) bool { 410 if len(list) == 0 { 411 return true 412 } 413 for _, item := range list { 414 if item == wanted { 415 return true 416 } 417 } 418 return false 419} 420 421func (s *DockerServer) serviceDelete(w http.ResponseWriter, r *http.Request) { 422 s.swarmMut.Lock() 423 defer s.swarmMut.Unlock() 424 s.cMut.Lock() 425 defer s.cMut.Unlock() 426 if s.swarm == nil { 427 w.WriteHeader(http.StatusNotAcceptable) 428 return 429 } 430 id := mux.Vars(r)["id"] 431 var i int 432 var toDelete *swarm.Service 433 for i = range s.services { 434 if s.services[i].ID == id || s.services[i].Spec.Name == id { 435 toDelete = s.services[i] 436 break 437 } 438 } 439 if toDelete == nil { 440 http.Error(w, "service not found", http.StatusNotFound) 441 return 442 } 443 s.services[i] = s.services[len(s.services)-1] 444 s.services = s.services[:len(s.services)-1] 445 for i := 0; i < len(s.tasks); i++ { 446 if s.tasks[i].ServiceID == toDelete.ID { 447 cont, _ := s.findContainerWithLock(s.tasks[i].Status.ContainerStatus.ContainerID, false) 448 if cont != nil { 449 delete(s.containers, cont.ID) 450 delete(s.contNameToID, cont.Name) 451 } 452 s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) 453 i-- 454 } 455 } 456 err := s.runNodeOperation(s.swarmServer.URL(), nodeOperation{}) 457 if err != nil { 458 http.Error(w, err.Error(), http.StatusInternalServerError) 459 return 460 } 461} 462 463func (s *DockerServer) serviceUpdate(w http.ResponseWriter, r *http.Request) { 464 start := time.Now() 465 s.swarmMut.Lock() 466 defer s.swarmMut.Unlock() 467 s.cMut.Lock() 468 defer s.cMut.Unlock() 469 if s.swarm == nil { 470 w.WriteHeader(http.StatusNotAcceptable) 471 return 472 } 473 id := mux.Vars(r)["id"] 474 var toUpdate *swarm.Service 475 for i := range s.services { 476 if s.services[i].ID == id || s.services[i].Spec.Name == id { 477 toUpdate = s.services[i] 478 break 479 } 480 } 481 if toUpdate == nil { 482 http.Error(w, "service not found", http.StatusNotFound) 483 return 484 } 485 var newSpec swarm.ServiceSpec 486 err := json.NewDecoder(r.Body).Decode(&newSpec) 487 if err != nil { 488 http.Error(w, err.Error(), http.StatusInternalServerError) 489 return 490 } 491 toUpdate.Spec = newSpec 492 end := time.Now() 493 toUpdate.UpdateStatus = &swarm.UpdateStatus{ 494 State: swarm.UpdateStateCompleted, 495 CompletedAt: &end, 496 StartedAt: &start, 497 } 498 s.setServiceEndpoint(toUpdate) 499 for i := 0; i < len(s.tasks); i++ { 500 if s.tasks[i].ServiceID != toUpdate.ID { 501 continue 502 } 503 cont, _ := s.findContainerWithLock(s.tasks[i].Status.ContainerStatus.ContainerID, false) 504 if cont != nil { 505 delete(s.containers, cont.ID) 506 delete(s.contNameToID, cont.Name) 507 } 508 s.tasks = append(s.tasks[:i], s.tasks[i+1:]...) 509 i-- 510 } 511 s.addTasks(toUpdate, true) 512 err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{}) 513 if err != nil { 514 http.Error(w, err.Error(), http.StatusInternalServerError) 515 return 516 } 517} 518 519func (s *DockerServer) nodeUpdate(w http.ResponseWriter, r *http.Request) { 520 s.swarmMut.Lock() 521 defer s.swarmMut.Unlock() 522 if s.swarm == nil { 523 w.WriteHeader(http.StatusNotAcceptable) 524 return 525 } 526 id := mux.Vars(r)["id"] 527 var n *swarm.Node 528 for i := range s.nodes { 529 if s.nodes[i].ID == id { 530 n = &s.nodes[i] 531 break 532 } 533 } 534 if n == nil { 535 w.WriteHeader(http.StatusNotFound) 536 return 537 } 538 var spec swarm.NodeSpec 539 err := json.NewDecoder(r.Body).Decode(&spec) 540 if err != nil { 541 http.Error(w, err.Error(), http.StatusInternalServerError) 542 return 543 } 544 n.Spec = spec 545 err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{ 546 Op: "update", 547 Node: *n, 548 }) 549 if err != nil { 550 http.Error(w, err.Error(), http.StatusInternalServerError) 551 return 552 } 553} 554 555func (s *DockerServer) nodeDelete(w http.ResponseWriter, r *http.Request) { 556 s.swarmMut.Lock() 557 defer s.swarmMut.Unlock() 558 if s.swarm == nil { 559 w.WriteHeader(http.StatusNotAcceptable) 560 return 561 } 562 id := mux.Vars(r)["id"] 563 err := s.runNodeOperation(s.swarmServer.URL(), nodeOperation{ 564 Op: "delete", 565 Node: swarm.Node{ 566 ID: id, 567 }, 568 }) 569 if err != nil { 570 http.Error(w, err.Error(), http.StatusInternalServerError) 571 return 572 } 573} 574 575func (s *DockerServer) nodeInspect(w http.ResponseWriter, r *http.Request) { 576 s.swarmMut.Lock() 577 defer s.swarmMut.Unlock() 578 if s.swarm == nil { 579 w.WriteHeader(http.StatusNotAcceptable) 580 return 581 } 582 id := mux.Vars(r)["id"] 583 for _, n := range s.nodes { 584 if n.ID == id { 585 err := json.NewEncoder(w).Encode(n) 586 if err != nil { 587 http.Error(w, err.Error(), http.StatusInternalServerError) 588 } 589 return 590 } 591 } 592 w.WriteHeader(http.StatusNotFound) 593} 594 595func (s *DockerServer) nodeList(w http.ResponseWriter, r *http.Request) { 596 s.swarmMut.Lock() 597 defer s.swarmMut.Unlock() 598 if s.swarm == nil { 599 w.WriteHeader(http.StatusNotAcceptable) 600 return 601 } 602 err := json.NewEncoder(w).Encode(s.nodes) 603 if err != nil { 604 http.Error(w, err.Error(), http.StatusInternalServerError) 605 } 606} 607 608type nodeOperation struct { 609 Op string 610 Node swarm.Node 611 Tasks []*swarm.Task 612 Services []*swarm.Service 613 forceLock bool 614} 615 616func (s *DockerServer) runNodeOperation(dst string, nodeOp nodeOperation) error { 617 data, err := json.Marshal(nodeOp) 618 if err != nil { 619 return err 620 } 621 url := fmt.Sprintf("%s/internal/updatenodes", strings.TrimRight(dst, "/")) 622 if nodeOp.forceLock { 623 url += "?forcelock=1" 624 } 625 rsp, err := http.Post(url, "application/json", bytes.NewReader(data)) 626 if err != nil { 627 return err 628 } 629 if rsp.StatusCode != http.StatusOK { 630 return fmt.Errorf("unexpected status code in updatenodes: %d", rsp.StatusCode) 631 } 632 return json.NewDecoder(rsp.Body).Decode(&s.nodes) 633} 634 635func (s *DockerServer) internalUpdateNodes(w http.ResponseWriter, r *http.Request) { 636 propagate := r.URL.Query().Get("propagate") != "0" 637 if !propagate || r.URL.Query().Get("forcelock") != "" { 638 s.swarmMut.Lock() 639 defer s.swarmMut.Unlock() 640 } 641 data, err := ioutil.ReadAll(r.Body) 642 if err != nil { 643 http.Error(w, err.Error(), http.StatusInternalServerError) 644 return 645 } 646 var nodeOp nodeOperation 647 err = json.Unmarshal(data, &nodeOp) 648 if err != nil { 649 http.Error(w, err.Error(), http.StatusInternalServerError) 650 return 651 } 652 switch nodeOp.Op { 653 case "add": 654 s.nodes = append(s.nodes, nodeOp.Node) 655 case "update": 656 for i, n := range s.nodes { 657 if n.ID == nodeOp.Node.ID { 658 s.nodes[i] = nodeOp.Node 659 break 660 } 661 } 662 case "delete": 663 for i, n := range s.nodes { 664 if n.ID == nodeOp.Node.ID { 665 s.nodes = append(s.nodes[:i], s.nodes[i+1:]...) 666 break 667 } 668 } 669 } 670 if propagate { 671 nodeOp.Services = s.services 672 nodeOp.Tasks = s.tasks 673 data, _ = json.Marshal(nodeOp) 674 for _, node := range s.nodes { 675 if s.nodeID == node.ID { 676 continue 677 } 678 url := fmt.Sprintf("http://%s/internal/updatenodes?propagate=0", node.ManagerStatus.Addr) 679 _, err = http.Post(url, "application/json", bytes.NewReader(data)) 680 if err != nil { 681 http.Error(w, err.Error(), http.StatusInternalServerError) 682 return 683 } 684 } 685 } 686 if nodeOp.Services != nil { 687 s.services = nodeOp.Services 688 } 689 if nodeOp.Tasks != nil { 690 s.tasks = nodeOp.Tasks 691 } 692 w.Header().Set("Content-Type", "application/json") 693 err = json.NewEncoder(w).Encode(s.nodes) 694 if err != nil { 695 http.Error(w, err.Error(), http.StatusInternalServerError) 696 } 697} 698