1// Copyright 2016 go-dockerclient authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package testing
6
7import (
8	"bytes"
9	"encoding/json"
10	"errors"
11	"fmt"
12	"io"
13	"io/ioutil"
14	"math/rand"
15	"net"
16	"net/http"
17	"strings"
18	"time"
19
20	"github.com/docker/docker/api/types/swarm"
21	"github.com/fsouza/go-dockerclient"
22	"github.com/gorilla/mux"
23)
24
25type swarmServer struct {
26	srv      *DockerServer
27	mux      *mux.Router
28	listener net.Listener
29}
30
31func newSwarmServer(srv *DockerServer, bind string) (*swarmServer, error) {
32	listener, err := net.Listen("tcp", bind)
33	if err != nil {
34		return nil, err
35	}
36	router := mux.NewRouter()
37	router.Path("/internal/updatenodes").Methods("POST").HandlerFunc(srv.handlerWrapper(srv.internalUpdateNodes))
38	server := &swarmServer{
39		listener: listener,
40		mux:      router,
41		srv:      srv,
42	}
43	go http.Serve(listener, router)
44	return server, nil
45}
46
47func (s *swarmServer) URL() string {
48	if s.listener == nil {
49		return ""
50	}
51	return "http://" + s.listener.Addr().String() + "/"
52}
53
54// MutateTask changes a task, returning an error if the given id does not match
55// to any task in the server.
56func (s *DockerServer) MutateTask(id string, newTask swarm.Task) error {
57	s.swarmMut.Lock()
58	defer s.swarmMut.Unlock()
59	for i, task := range s.tasks {
60		if task.ID == id {
61			s.tasks[i] = &newTask
62			return nil
63		}
64	}
65	return errors.New("task not found")
66}
67
68func (s *DockerServer) swarmInit(w http.ResponseWriter, r *http.Request) {
69	s.swarmMut.Lock()
70	defer s.swarmMut.Unlock()
71	if s.swarm != nil {
72		w.WriteHeader(http.StatusNotAcceptable)
73		return
74	}
75	var req swarm.InitRequest
76	err := json.NewDecoder(r.Body).Decode(&req)
77	if err != nil && err != io.EOF {
78		http.Error(w, err.Error(), http.StatusInternalServerError)
79		return
80	}
81	node, err := s.initSwarmNode(req.ListenAddr, req.AdvertiseAddr)
82	if err != nil {
83		http.Error(w, err.Error(), http.StatusInternalServerError)
84		return
85	}
86	node.ManagerStatus.Leader = true
87	err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{
88		Op:   "add",
89		Node: node,
90	})
91	if err != nil {
92		http.Error(w, err.Error(), http.StatusInternalServerError)
93		return
94	}
95	s.swarm = &swarm.Swarm{
96		JoinTokens: swarm.JoinTokens{
97			Manager: s.generateID(),
98			Worker:  s.generateID(),
99		},
100	}
101	w.Header().Set("Content-Type", "application/json")
102	err = json.NewEncoder(w).Encode(s.nodeID)
103	if err != nil {
104		http.Error(w, err.Error(), http.StatusInternalServerError)
105	}
106}
107
108func (s *DockerServer) swarmInspect(w http.ResponseWriter, r *http.Request) {
109	s.swarmMut.Lock()
110	defer s.swarmMut.Unlock()
111	if s.swarm == nil {
112		w.WriteHeader(http.StatusNotAcceptable)
113	} else {
114		w.WriteHeader(http.StatusOK)
115		w.Header().Set("Content-Type", "application/json")
116		json.NewEncoder(w).Encode(s.swarm)
117	}
118}
119
120func (s *DockerServer) swarmJoin(w http.ResponseWriter, r *http.Request) {
121	s.swarmMut.Lock()
122	defer s.swarmMut.Unlock()
123	if s.swarm != nil {
124		w.WriteHeader(http.StatusNotAcceptable)
125		return
126	}
127	var req swarm.JoinRequest
128	err := json.NewDecoder(r.Body).Decode(&req)
129	if err != nil {
130		http.Error(w, err.Error(), http.StatusInternalServerError)
131		return
132	}
133	if len(req.RemoteAddrs) == 0 {
134		w.WriteHeader(http.StatusBadRequest)
135		return
136	}
137	node, err := s.initSwarmNode(req.ListenAddr, req.AdvertiseAddr)
138	if err != nil {
139		http.Error(w, err.Error(), http.StatusInternalServerError)
140		return
141	}
142	s.swarm = &swarm.Swarm{
143		JoinTokens: swarm.JoinTokens{
144			Manager: s.generateID(),
145			Worker:  s.generateID(),
146		},
147	}
148	s.swarmMut.Unlock()
149	err = s.runNodeOperation(fmt.Sprintf("http://%s", req.RemoteAddrs[0]), nodeOperation{
150		Op:        "add",
151		Node:      node,
152		forceLock: true,
153	})
154	s.swarmMut.Lock()
155	if err != nil {
156		http.Error(w, err.Error(), http.StatusInternalServerError)
157		return
158	}
159	w.WriteHeader(http.StatusOK)
160}
161
162func (s *DockerServer) swarmLeave(w http.ResponseWriter, r *http.Request) {
163	s.swarmMut.Lock()
164	defer s.swarmMut.Unlock()
165	if s.swarm == nil {
166		w.WriteHeader(http.StatusNotAcceptable)
167	} else {
168		s.swarmServer.listener.Close()
169		s.swarm = nil
170		s.nodes = nil
171		s.swarmServer = nil
172		s.nodeID = ""
173		w.WriteHeader(http.StatusOK)
174	}
175}
176
177func (s *DockerServer) containerForService(srv *swarm.Service, name string) *docker.Container {
178	hostConfig := docker.HostConfig{}
179	dockerConfig := docker.Config{
180		Entrypoint: srv.Spec.TaskTemplate.ContainerSpec.Command,
181		Cmd:        srv.Spec.TaskTemplate.ContainerSpec.Args,
182		Env:        srv.Spec.TaskTemplate.ContainerSpec.Env,
183	}
184	return &docker.Container{
185		ID:         s.generateID(),
186		Name:       name,
187		Image:      srv.Spec.TaskTemplate.ContainerSpec.Image,
188		Created:    time.Now(),
189		Config:     &dockerConfig,
190		HostConfig: &hostConfig,
191		State: docker.State{
192			Running:   true,
193			StartedAt: time.Now(),
194			Pid:       rand.Int() % 50000,
195			ExitCode:  0,
196		},
197	}
198}
199
200func (s *DockerServer) serviceCreate(w http.ResponseWriter, r *http.Request) {
201	var config swarm.ServiceSpec
202	defer r.Body.Close()
203	err := json.NewDecoder(r.Body).Decode(&config)
204	if err != nil {
205		http.Error(w, err.Error(), http.StatusInternalServerError)
206		return
207	}
208	s.cMut.Lock()
209	defer s.cMut.Unlock()
210	s.swarmMut.Lock()
211	defer s.swarmMut.Unlock()
212	if len(s.nodes) == 0 || s.swarm == nil {
213		http.Error(w, "no swarm nodes available", http.StatusNotAcceptable)
214		return
215	}
216	if config.Name == "" {
217		config.Name = s.generateID()
218	}
219	for _, s := range s.services {
220		if s.Spec.Name == config.Name {
221			http.Error(w, "there's already a service with this name", http.StatusConflict)
222			return
223		}
224	}
225	service := swarm.Service{
226		ID:   s.generateID(),
227		Spec: config,
228	}
229	s.setServiceEndpoint(&service)
230	s.addTasks(&service, false)
231	s.services = append(s.services, &service)
232	err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{})
233	if err != nil {
234		http.Error(w, err.Error(), http.StatusInternalServerError)
235		return
236	}
237	w.WriteHeader(http.StatusOK)
238	json.NewEncoder(w).Encode(service)
239}
240
241func (s *DockerServer) setServiceEndpoint(service *swarm.Service) {
242	if service.Spec.EndpointSpec == nil {
243		return
244	}
245	service.Endpoint = swarm.Endpoint{
246		Spec: *service.Spec.EndpointSpec,
247	}
248	for _, port := range service.Spec.EndpointSpec.Ports {
249		if port.PublishedPort == 0 {
250			port.PublishedPort = uint32(30000 + s.servicePorts)
251			s.servicePorts++
252		}
253		service.Endpoint.Ports = append(service.Endpoint.Ports, port)
254	}
255}
256
257func (s *DockerServer) addTasks(service *swarm.Service, update bool) {
258	if service.Spec.TaskTemplate.ContainerSpec == nil {
259		return
260	}
261	containerCount := 1
262	if service.Spec.Mode.Global != nil {
263		containerCount = len(s.nodes)
264	} else if repl := service.Spec.Mode.Replicated; repl != nil {
265		if repl.Replicas != nil {
266			containerCount = int(*repl.Replicas)
267		}
268	}
269	for i := 0; i < containerCount; i++ {
270		name := fmt.Sprintf("%s-%d", service.Spec.Name, i)
271		if update {
272			name = fmt.Sprintf("%s-%d-updated", service.Spec.Name, i)
273		}
274		container := s.containerForService(service, name)
275		chosenNode := s.nodes[s.nodeRR]
276		s.nodeRR = (s.nodeRR + 1) % len(s.nodes)
277		task := swarm.Task{
278			ID:        s.generateID(),
279			ServiceID: service.ID,
280			NodeID:    chosenNode.ID,
281			Status: swarm.TaskStatus{
282				State: swarm.TaskStateReady,
283				ContainerStatus: &swarm.ContainerStatus{
284					ContainerID: container.ID,
285				},
286			},
287			DesiredState: swarm.TaskStateReady,
288			Spec:         service.Spec.TaskTemplate,
289		}
290		s.tasks = append(s.tasks, &task)
291		s.addContainer(container)
292		s.notify(container)
293	}
294}
295
296func (s *DockerServer) serviceInspect(w http.ResponseWriter, r *http.Request) {
297	s.swarmMut.Lock()
298	defer s.swarmMut.Unlock()
299	if s.swarm == nil {
300		w.WriteHeader(http.StatusNotAcceptable)
301		return
302	}
303	id := mux.Vars(r)["id"]
304	for _, srv := range s.services {
305		if srv.ID == id || srv.Spec.Name == id {
306			json.NewEncoder(w).Encode(srv)
307			return
308		}
309	}
310	http.Error(w, "service not found", http.StatusNotFound)
311}
312
313func (s *DockerServer) taskInspect(w http.ResponseWriter, r *http.Request) {
314	s.swarmMut.Lock()
315	defer s.swarmMut.Unlock()
316	if s.swarm == nil {
317		w.WriteHeader(http.StatusNotAcceptable)
318		return
319	}
320	id := mux.Vars(r)["id"]
321	for _, task := range s.tasks {
322		if task.ID == id {
323			json.NewEncoder(w).Encode(task)
324			return
325		}
326	}
327	http.Error(w, "task not found", http.StatusNotFound)
328}
329
330func (s *DockerServer) serviceList(w http.ResponseWriter, r *http.Request) {
331	s.swarmMut.Lock()
332	defer s.swarmMut.Unlock()
333	if s.swarm == nil {
334		w.WriteHeader(http.StatusNotAcceptable)
335		return
336	}
337	filtersRaw := r.FormValue("filters")
338	var filters map[string][]string
339	json.Unmarshal([]byte(filtersRaw), &filters)
340	if filters == nil {
341		json.NewEncoder(w).Encode(s.services)
342		return
343	}
344	var ret []*swarm.Service
345	for i, srv := range s.services {
346		if inFilter(filters["id"], srv.ID) &&
347			inFilter(filters["name"], srv.Spec.Name) {
348			ret = append(ret, s.services[i])
349		}
350	}
351	json.NewEncoder(w).Encode(ret)
352}
353
354func (s *DockerServer) taskList(w http.ResponseWriter, r *http.Request) {
355	s.swarmMut.Lock()
356	defer s.swarmMut.Unlock()
357	if s.swarm == nil {
358		w.WriteHeader(http.StatusNotAcceptable)
359		return
360	}
361	filtersRaw := r.FormValue("filters")
362	var filters map[string][]string
363	json.Unmarshal([]byte(filtersRaw), &filters)
364	if filters == nil {
365		json.NewEncoder(w).Encode(s.tasks)
366		return
367	}
368	var ret []*swarm.Task
369	for i, task := range s.tasks {
370		var srv *swarm.Service
371		for _, srv = range s.services {
372			if task.ServiceID == srv.ID {
373				break
374			}
375		}
376		if srv == nil {
377			http.Error(w, "service not found", http.StatusNotFound)
378			return
379		}
380		if inFilter(filters["id"], task.ID) &&
381			(inFilter(filters["service"], task.ServiceID) ||
382				inFilter(filters["service"], srv.Spec.Annotations.Name)) &&
383			inFilter(filters["node"], task.NodeID) &&
384			inFilter(filters["desired-state"], string(task.DesiredState)) &&
385			inLabelFilter(filters["label"], srv.Spec.Annotations.Labels) {
386			ret = append(ret, s.tasks[i])
387		}
388	}
389	json.NewEncoder(w).Encode(ret)
390}
391
392func inLabelFilter(list []string, labels map[string]string) bool {
393	if len(list) == 0 {
394		return true
395	}
396	for _, item := range list {
397		parts := strings.Split(item, "=")
398		key := parts[0]
399		if val, ok := labels[key]; ok {
400			if len(parts) > 1 && val != parts[1] {
401				continue
402			}
403			return true
404		}
405	}
406	return false
407}
408
409func inFilter(list []string, wanted string) bool {
410	if len(list) == 0 {
411		return true
412	}
413	for _, item := range list {
414		if item == wanted {
415			return true
416		}
417	}
418	return false
419}
420
421func (s *DockerServer) serviceDelete(w http.ResponseWriter, r *http.Request) {
422	s.swarmMut.Lock()
423	defer s.swarmMut.Unlock()
424	s.cMut.Lock()
425	defer s.cMut.Unlock()
426	if s.swarm == nil {
427		w.WriteHeader(http.StatusNotAcceptable)
428		return
429	}
430	id := mux.Vars(r)["id"]
431	var i int
432	var toDelete *swarm.Service
433	for i = range s.services {
434		if s.services[i].ID == id || s.services[i].Spec.Name == id {
435			toDelete = s.services[i]
436			break
437		}
438	}
439	if toDelete == nil {
440		http.Error(w, "service not found", http.StatusNotFound)
441		return
442	}
443	s.services[i] = s.services[len(s.services)-1]
444	s.services = s.services[:len(s.services)-1]
445	for i := 0; i < len(s.tasks); i++ {
446		if s.tasks[i].ServiceID == toDelete.ID {
447			cont, _ := s.findContainerWithLock(s.tasks[i].Status.ContainerStatus.ContainerID, false)
448			if cont != nil {
449				delete(s.containers, cont.ID)
450				delete(s.contNameToID, cont.Name)
451			}
452			s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
453			i--
454		}
455	}
456	err := s.runNodeOperation(s.swarmServer.URL(), nodeOperation{})
457	if err != nil {
458		http.Error(w, err.Error(), http.StatusInternalServerError)
459		return
460	}
461}
462
463func (s *DockerServer) serviceUpdate(w http.ResponseWriter, r *http.Request) {
464	start := time.Now()
465	s.swarmMut.Lock()
466	defer s.swarmMut.Unlock()
467	s.cMut.Lock()
468	defer s.cMut.Unlock()
469	if s.swarm == nil {
470		w.WriteHeader(http.StatusNotAcceptable)
471		return
472	}
473	id := mux.Vars(r)["id"]
474	var toUpdate *swarm.Service
475	for i := range s.services {
476		if s.services[i].ID == id || s.services[i].Spec.Name == id {
477			toUpdate = s.services[i]
478			break
479		}
480	}
481	if toUpdate == nil {
482		http.Error(w, "service not found", http.StatusNotFound)
483		return
484	}
485	var newSpec swarm.ServiceSpec
486	err := json.NewDecoder(r.Body).Decode(&newSpec)
487	if err != nil {
488		http.Error(w, err.Error(), http.StatusInternalServerError)
489		return
490	}
491	toUpdate.Spec = newSpec
492	end := time.Now()
493	toUpdate.UpdateStatus = &swarm.UpdateStatus{
494		State:       swarm.UpdateStateCompleted,
495		CompletedAt: &end,
496		StartedAt:   &start,
497	}
498	s.setServiceEndpoint(toUpdate)
499	for i := 0; i < len(s.tasks); i++ {
500		if s.tasks[i].ServiceID != toUpdate.ID {
501			continue
502		}
503		cont, _ := s.findContainerWithLock(s.tasks[i].Status.ContainerStatus.ContainerID, false)
504		if cont != nil {
505			delete(s.containers, cont.ID)
506			delete(s.contNameToID, cont.Name)
507		}
508		s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
509		i--
510	}
511	s.addTasks(toUpdate, true)
512	err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{})
513	if err != nil {
514		http.Error(w, err.Error(), http.StatusInternalServerError)
515		return
516	}
517}
518
519func (s *DockerServer) nodeUpdate(w http.ResponseWriter, r *http.Request) {
520	s.swarmMut.Lock()
521	defer s.swarmMut.Unlock()
522	if s.swarm == nil {
523		w.WriteHeader(http.StatusNotAcceptable)
524		return
525	}
526	id := mux.Vars(r)["id"]
527	var n *swarm.Node
528	for i := range s.nodes {
529		if s.nodes[i].ID == id {
530			n = &s.nodes[i]
531			break
532		}
533	}
534	if n == nil {
535		w.WriteHeader(http.StatusNotFound)
536		return
537	}
538	var spec swarm.NodeSpec
539	err := json.NewDecoder(r.Body).Decode(&spec)
540	if err != nil {
541		http.Error(w, err.Error(), http.StatusInternalServerError)
542		return
543	}
544	n.Spec = spec
545	err = s.runNodeOperation(s.swarmServer.URL(), nodeOperation{
546		Op:   "update",
547		Node: *n,
548	})
549	if err != nil {
550		http.Error(w, err.Error(), http.StatusInternalServerError)
551		return
552	}
553}
554
555func (s *DockerServer) nodeDelete(w http.ResponseWriter, r *http.Request) {
556	s.swarmMut.Lock()
557	defer s.swarmMut.Unlock()
558	if s.swarm == nil {
559		w.WriteHeader(http.StatusNotAcceptable)
560		return
561	}
562	id := mux.Vars(r)["id"]
563	err := s.runNodeOperation(s.swarmServer.URL(), nodeOperation{
564		Op: "delete",
565		Node: swarm.Node{
566			ID: id,
567		},
568	})
569	if err != nil {
570		http.Error(w, err.Error(), http.StatusInternalServerError)
571		return
572	}
573}
574
575func (s *DockerServer) nodeInspect(w http.ResponseWriter, r *http.Request) {
576	s.swarmMut.Lock()
577	defer s.swarmMut.Unlock()
578	if s.swarm == nil {
579		w.WriteHeader(http.StatusNotAcceptable)
580		return
581	}
582	id := mux.Vars(r)["id"]
583	for _, n := range s.nodes {
584		if n.ID == id {
585			err := json.NewEncoder(w).Encode(n)
586			if err != nil {
587				http.Error(w, err.Error(), http.StatusInternalServerError)
588			}
589			return
590		}
591	}
592	w.WriteHeader(http.StatusNotFound)
593}
594
595func (s *DockerServer) nodeList(w http.ResponseWriter, r *http.Request) {
596	s.swarmMut.Lock()
597	defer s.swarmMut.Unlock()
598	if s.swarm == nil {
599		w.WriteHeader(http.StatusNotAcceptable)
600		return
601	}
602	err := json.NewEncoder(w).Encode(s.nodes)
603	if err != nil {
604		http.Error(w, err.Error(), http.StatusInternalServerError)
605	}
606}
607
608type nodeOperation struct {
609	Op        string
610	Node      swarm.Node
611	Tasks     []*swarm.Task
612	Services  []*swarm.Service
613	forceLock bool
614}
615
616func (s *DockerServer) runNodeOperation(dst string, nodeOp nodeOperation) error {
617	data, err := json.Marshal(nodeOp)
618	if err != nil {
619		return err
620	}
621	url := fmt.Sprintf("%s/internal/updatenodes", strings.TrimRight(dst, "/"))
622	if nodeOp.forceLock {
623		url += "?forcelock=1"
624	}
625	rsp, err := http.Post(url, "application/json", bytes.NewReader(data))
626	if err != nil {
627		return err
628	}
629	if rsp.StatusCode != http.StatusOK {
630		return fmt.Errorf("unexpected status code in updatenodes: %d", rsp.StatusCode)
631	}
632	return json.NewDecoder(rsp.Body).Decode(&s.nodes)
633}
634
635func (s *DockerServer) internalUpdateNodes(w http.ResponseWriter, r *http.Request) {
636	propagate := r.URL.Query().Get("propagate") != "0"
637	if !propagate || r.URL.Query().Get("forcelock") != "" {
638		s.swarmMut.Lock()
639		defer s.swarmMut.Unlock()
640	}
641	data, err := ioutil.ReadAll(r.Body)
642	if err != nil {
643		http.Error(w, err.Error(), http.StatusInternalServerError)
644		return
645	}
646	var nodeOp nodeOperation
647	err = json.Unmarshal(data, &nodeOp)
648	if err != nil {
649		http.Error(w, err.Error(), http.StatusInternalServerError)
650		return
651	}
652	switch nodeOp.Op {
653	case "add":
654		s.nodes = append(s.nodes, nodeOp.Node)
655	case "update":
656		for i, n := range s.nodes {
657			if n.ID == nodeOp.Node.ID {
658				s.nodes[i] = nodeOp.Node
659				break
660			}
661		}
662	case "delete":
663		for i, n := range s.nodes {
664			if n.ID == nodeOp.Node.ID {
665				s.nodes = append(s.nodes[:i], s.nodes[i+1:]...)
666				break
667			}
668		}
669	}
670	if propagate {
671		nodeOp.Services = s.services
672		nodeOp.Tasks = s.tasks
673		data, _ = json.Marshal(nodeOp)
674		for _, node := range s.nodes {
675			if s.nodeID == node.ID {
676				continue
677			}
678			url := fmt.Sprintf("http://%s/internal/updatenodes?propagate=0", node.ManagerStatus.Addr)
679			_, err = http.Post(url, "application/json", bytes.NewReader(data))
680			if err != nil {
681				http.Error(w, err.Error(), http.StatusInternalServerError)
682				return
683			}
684		}
685	}
686	if nodeOp.Services != nil {
687		s.services = nodeOp.Services
688	}
689	if nodeOp.Tasks != nil {
690		s.tasks = nodeOp.Tasks
691	}
692	w.Header().Set("Content-Type", "application/json")
693	err = json.NewEncoder(w).Encode(s.nodes)
694	if err != nil {
695		http.Error(w, err.Error(), http.StatusInternalServerError)
696	}
697}
698