1/*
2Copyright 2014 The go-marathon Authors All rights reserved.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package marathon
18
19import (
20	"bytes"
21	"encoding/json"
22	"errors"
23	"fmt"
24	"io"
25	"io/ioutil"
26	"log"
27	"net"
28	"net/http"
29	"net/url"
30	"regexp"
31	"strings"
32	"sync"
33	"time"
34)
35
36// Marathon is the interface to the marathon API
37type Marathon interface {
38	// -- APPLICATIONS ---
39
40	// get a listing of the application ids
41	ListApplications(url.Values) ([]string, error)
42	// a list of application versions
43	ApplicationVersions(name string) (*ApplicationVersions, error)
44	// check a application version exists
45	HasApplicationVersion(name, version string) (bool, error)
46	// change an application to a different version
47	SetApplicationVersion(name string, version *ApplicationVersion) (*DeploymentID, error)
48	// check if an application is ok
49	ApplicationOK(name string) (bool, error)
50	// create an application in marathon
51	CreateApplication(application *Application) (*Application, error)
52	// delete an application
53	DeleteApplication(name string, force bool) (*DeploymentID, error)
54	// update an application in marathon
55	UpdateApplication(application *Application, force bool) (*DeploymentID, error)
56	// a list of deployments on a application
57	ApplicationDeployments(name string) ([]*DeploymentID, error)
58	// scale a application
59	ScaleApplicationInstances(name string, instances int, force bool) (*DeploymentID, error)
60	// restart an application
61	RestartApplication(name string, force bool) (*DeploymentID, error)
62	// get a list of applications from marathon
63	Applications(url.Values) (*Applications, error)
64	// get an application by name
65	Application(name string) (*Application, error)
66	// get an application by options
67	ApplicationBy(name string, opts *GetAppOpts) (*Application, error)
68	// get an application by name and version
69	ApplicationByVersion(name, version string) (*Application, error)
70	// wait of application
71	WaitOnApplication(name string, timeout time.Duration) error
72
73	// -- PODS ---
74	// whether this version of Marathon supports pods
75	SupportsPods() (bool, error)
76
77	// get pod status
78	PodStatus(name string) (*PodStatus, error)
79	// get all pod statuses
80	PodStatuses() ([]*PodStatus, error)
81
82	// get pod
83	Pod(name string) (*Pod, error)
84	// get all pods
85	Pods() ([]Pod, error)
86	// create pod
87	CreatePod(pod *Pod) (*Pod, error)
88	// update pod
89	UpdatePod(pod *Pod, force bool) (*Pod, error)
90	// delete pod
91	DeletePod(name string, force bool) (*DeploymentID, error)
92	// wait on pod to be deployed
93	WaitOnPod(name string, timeout time.Duration) error
94	// check if a pod is running
95	PodIsRunning(name string) bool
96
97	// get versions of a pod
98	PodVersions(name string) ([]string, error)
99	// get pod by version
100	PodByVersion(name, version string) (*Pod, error)
101
102	// delete instances of a pod
103	DeletePodInstances(name string, instances []string) ([]*PodInstance, error)
104	// delete pod instance
105	DeletePodInstance(name, instance string) (*PodInstance, error)
106
107	// -- TASKS ---
108
109	// get a list of tasks for a specific application
110	Tasks(application string) (*Tasks, error)
111	// get a list of all tasks
112	AllTasks(opts *AllTasksOpts) (*Tasks, error)
113	// get the endpoints for a service on a application
114	TaskEndpoints(name string, port int, healthCheck bool) ([]string, error)
115	// kill all the tasks for any application
116	KillApplicationTasks(applicationID string, opts *KillApplicationTasksOpts) (*Tasks, error)
117	// kill a single task
118	KillTask(taskID string, opts *KillTaskOpts) (*Task, error)
119	// kill the given array of tasks
120	KillTasks(taskIDs []string, opts *KillTaskOpts) error
121
122	// --- GROUPS ---
123
124	// list all the groups in the system
125	Groups() (*Groups, error)
126	// retrieve a specific group from marathon
127	Group(name string) (*Group, error)
128	// list all groups in marathon by options
129	GroupsBy(opts *GetGroupOpts) (*Groups, error)
130	// retrieve a specific group from marathon by options
131	GroupBy(name string, opts *GetGroupOpts) (*Group, error)
132	// create a group deployment
133	CreateGroup(group *Group) error
134	// delete a group
135	DeleteGroup(name string, force bool) (*DeploymentID, error)
136	// update a groups
137	UpdateGroup(id string, group *Group, force bool) (*DeploymentID, error)
138	// check if a group exists
139	HasGroup(name string) (bool, error)
140	// wait for an group to be deployed
141	WaitOnGroup(name string, timeout time.Duration) error
142
143	// --- DEPLOYMENTS ---
144
145	// get a list of the deployments
146	Deployments() ([]*Deployment, error)
147	// delete a deployment
148	DeleteDeployment(id string, force bool) (*DeploymentID, error)
149	// check to see if a deployment exists
150	HasDeployment(id string) (bool, error)
151	// wait of a deployment to finish
152	WaitOnDeployment(id string, timeout time.Duration) error
153
154	// --- SUBSCRIPTIONS ---
155
156	// a list of current subscriptions
157	Subscriptions() (*Subscriptions, error)
158	// add a events listener
159	AddEventsListener(filter int) (EventsChannel, error)
160	// remove a events listener
161	RemoveEventsListener(channel EventsChannel)
162	// Subscribe a callback URL
163	Subscribe(string) error
164	// Unsubscribe a callback URL
165	Unsubscribe(string) error
166
167	// --- QUEUE ---
168	// get marathon launch queue
169	Queue() (*Queue, error)
170	// resets task launch delay of the specific application
171	DeleteQueueDelay(appID string) error
172
173	// --- MISC ---
174
175	// get the marathon url
176	GetMarathonURL() string
177	// ping the marathon
178	Ping() (bool, error)
179	// grab the marathon server info
180	Info() (*Info, error)
181	// retrieve the leader info
182	Leader() (string, error)
183	// cause the current leader to abdicate
184	AbdicateLeader() (string, error)
185}
186
187var (
188	// ErrMarathonDown is thrown when all the marathon endpoints are down
189	ErrMarathonDown = errors.New("all the Marathon hosts are presently down")
190	// ErrTimeoutError is thrown when the operation has timed out
191	ErrTimeoutError = errors.New("the operation has timed out")
192
193	// Default HTTP client used for SSE subscription requests
194	// It is invalid to set client.Timeout because it includes time to read response so
195	// set dial, tls handshake and response header timeouts instead
196	defaultHTTPSSEClient = &http.Client{
197		Transport: &http.Transport{
198			Dial: (&net.Dialer{
199				Timeout: 5 * time.Second,
200			}).Dial,
201			ResponseHeaderTimeout: 10 * time.Second,
202			TLSHandshakeTimeout:   5 * time.Second,
203		},
204	}
205
206	// Default HTTP client used for non SSE requests
207	defaultHTTPClient = &http.Client{
208		Timeout: 10 * time.Second,
209	}
210)
211
212// EventsChannelContext holds contextual data for an EventsChannel.
213type EventsChannelContext struct {
214	filter     int
215	done       chan struct{}
216	completion *sync.WaitGroup
217}
218
219type marathonClient struct {
220	sync.RWMutex
221	// the configuration for the client
222	config Config
223	// the flag used to prevent multiple SSE subscriptions
224	subscribedToSSE bool
225	// the ip address of the client
226	ipAddress string
227	// the http server
228	eventsHTTP *http.Server
229	// the marathon hosts
230	hosts *cluster
231	// a map of service you wish to listen to
232	listeners map[EventsChannel]EventsChannelContext
233	// a custom log function for debug messages
234	debugLog func(format string, v ...interface{})
235	// the marathon HTTP client to ensure consistency in requests
236	client *httpClient
237}
238
239type httpClient struct {
240	// the configuration for the marathon HTTP client
241	config Config
242}
243
244// newRequestError signals that creating a new http.Request failed
245type newRequestError struct {
246	error
247}
248
249// NewClient creates a new marathon client
250//		config:			the configuration to use
251func NewClient(config Config) (Marathon, error) {
252	// step: if the SSE HTTP client is missing, prefer a configured regular
253	// client, and otherwise use the default SSE HTTP client.
254	if config.HTTPSSEClient == nil {
255		config.HTTPSSEClient = defaultHTTPSSEClient
256		if config.HTTPClient != nil {
257			config.HTTPSSEClient = config.HTTPClient
258		}
259	}
260
261	// step: if a regular HTTP client is missing, use the default one.
262	if config.HTTPClient == nil {
263		config.HTTPClient = defaultHTTPClient
264	}
265
266	// step: if no polling wait time is set, default to 500 milliseconds.
267	if config.PollingWaitTime == 0 {
268		config.PollingWaitTime = defaultPollingWaitTime
269	}
270
271	// step: setup shared client
272	client := &httpClient{config: config}
273
274	// step: create a new cluster
275	hosts, err := newCluster(client, config.URL, config.DCOSToken != "")
276	if err != nil {
277		return nil, err
278	}
279
280	debugLog := func(string, ...interface{}) {}
281	if config.LogOutput != nil {
282		logger := log.New(config.LogOutput, "", 0)
283		debugLog = func(format string, v ...interface{}) {
284			logger.Printf(format, v...)
285		}
286	}
287
288	return &marathonClient{
289		config:    config,
290		listeners: make(map[EventsChannel]EventsChannelContext),
291		hosts:     hosts,
292		debugLog:  debugLog,
293		client:    client,
294	}, nil
295}
296
297// GetMarathonURL retrieves the marathon url
298func (r *marathonClient) GetMarathonURL() string {
299	return r.config.URL
300}
301
302// Ping pings the current marathon endpoint (note, this is not a ICMP ping, but a rest api call)
303func (r *marathonClient) Ping() (bool, error) {
304	if err := r.apiGet(marathonAPIPing, nil, nil); err != nil {
305		return false, err
306	}
307	return true, nil
308}
309
310func (r *marathonClient) apiHead(path string, result interface{}) error {
311	return r.apiCall("HEAD", path, nil, result)
312}
313
314func (r *marathonClient) apiGet(path string, post, result interface{}) error {
315	return r.apiCall("GET", path, post, result)
316}
317
318func (r *marathonClient) apiPut(path string, post, result interface{}) error {
319	return r.apiCall("PUT", path, post, result)
320}
321
322func (r *marathonClient) apiPost(path string, post, result interface{}) error {
323	return r.apiCall("POST", path, post, result)
324}
325
326func (r *marathonClient) apiDelete(path string, post, result interface{}) error {
327	return r.apiCall("DELETE", path, post, result)
328}
329
330func (r *marathonClient) apiCall(method, path string, body, result interface{}) error {
331	const deploymentHeader = "Marathon-Deployment-Id"
332
333	for {
334		// step: marshall the request to json
335		var requestBody []byte
336		var err error
337		if body != nil {
338			if requestBody, err = json.Marshal(body); err != nil {
339				return err
340			}
341		}
342
343		// step: create the API request
344		request, member, err := r.buildAPIRequest(method, path, bytes.NewReader(requestBody))
345		if err != nil {
346			return err
347		}
348
349		// step: perform the API request
350		response, err := r.client.Do(request)
351		if err != nil {
352			r.hosts.markDown(member)
353			// step: attempt the request on another member
354			r.debugLog("apiCall(): request failed on host: %s, error: %s, trying another", member, err)
355			continue
356		}
357		defer response.Body.Close()
358
359		// step: read the response body
360		respBody, err := ioutil.ReadAll(response.Body)
361		if err != nil {
362			return err
363		}
364
365		if len(requestBody) > 0 {
366			r.debugLog("apiCall(): %v %v %s returned %v %s", request.Method, request.URL.String(), requestBody, response.Status, oneLogLine(respBody))
367		} else {
368			r.debugLog("apiCall(): %v %v returned %v %s", request.Method, request.URL.String(), response.Status, oneLogLine(respBody))
369		}
370
371		// step: check for a successful response
372		if response.StatusCode >= 200 && response.StatusCode <= 299 {
373			if result != nil {
374				// If we have a deployment ID header and no response body, give them that
375				// This specifically handles the use case of a DELETE on an app/pod
376				// We need a way to retrieve the deployment ID
377				deploymentID := response.Header.Get(deploymentHeader)
378				if len(respBody) == 0 && deploymentID != "" {
379					d := DeploymentID{
380						DeploymentID: deploymentID,
381					}
382					if deployID, ok := result.(*DeploymentID); ok {
383						*deployID = d
384					}
385				} else {
386					if err := json.Unmarshal(respBody, result); err != nil {
387						return fmt.Errorf("failed to unmarshal response from Marathon: %s", err)
388					}
389				}
390			}
391			return nil
392		}
393
394		// step: if the member node returns a >= 500 && <= 599 we should try another node?
395		if response.StatusCode >= 500 && response.StatusCode <= 599 {
396			// step: mark the host as down
397			r.hosts.markDown(member)
398			r.debugLog("apiCall(): request failed, host: %s, status: %d, trying another", member, response.StatusCode)
399			continue
400		}
401
402		return NewAPIError(response.StatusCode, respBody)
403	}
404}
405
406// wait waits until the provided function returns true (or times out)
407func (r *marathonClient) wait(name string, timeout time.Duration, fn func(string) bool) error {
408	timer := time.NewTimer(timeout)
409	defer timer.Stop()
410
411	ticker := time.NewTicker(r.config.PollingWaitTime)
412	defer ticker.Stop()
413	for {
414		if fn(name) {
415			return nil
416		}
417
418		select {
419		case <-timer.C:
420			return ErrTimeoutError
421		case <-ticker.C:
422			continue
423		}
424	}
425}
426
427// buildAPIRequest creates a default API request.
428// It fails when there is no available member in the cluster anymore or when the request can not be built.
429func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader) (request *http.Request, member string, err error) {
430	// Grab a member from the cluster
431	member, err = r.hosts.getMember()
432	if err != nil {
433		return nil, "", ErrMarathonDown
434	}
435
436	// Build the HTTP request to Marathon
437	request, err = r.client.buildMarathonJSONRequest(method, member, path, reader)
438	if err != nil {
439		return nil, member, newRequestError{err}
440	}
441	return request, member, nil
442}
443
444// buildMarathonJSONRequest is like buildMarathonRequest but sets the
445// Content-Type and Accept headers to application/json.
446func (rc *httpClient) buildMarathonJSONRequest(method, member, path string, reader io.Reader) (request *http.Request, err error) {
447	req, err := rc.buildMarathonRequest(method, member, path, reader)
448	if err == nil {
449		req.Header.Add("Content-Type", "application/json")
450		req.Header.Add("Accept", "application/json")
451	}
452
453	return req, err
454}
455
456// buildMarathonRequest creates a new HTTP request and configures it according to the *httpClient configuration.
457// The path must not contain a leading "/", otherwise buildMarathonRequest will panic.
458func (rc *httpClient) buildMarathonRequest(method, member, path string, reader io.Reader) (request *http.Request, err error) {
459	if strings.HasPrefix(path, "/") {
460		panic(fmt.Sprintf("Path '%s' must not start with a leading slash", path))
461	}
462
463	// Create the endpoint URL
464	url := fmt.Sprintf("%s/%s", member, path)
465
466	// Instantiate an HTTP request
467	request, err = http.NewRequest(method, url, reader)
468	if err != nil {
469		return nil, err
470	}
471
472	// Add any basic auth and the content headers
473	if rc.config.HTTPBasicAuthUser != "" && rc.config.HTTPBasicPassword != "" {
474		request.SetBasicAuth(rc.config.HTTPBasicAuthUser, rc.config.HTTPBasicPassword)
475	}
476
477	if rc.config.DCOSToken != "" {
478		request.Header.Add("Authorization", "token="+rc.config.DCOSToken)
479	}
480
481	return request, nil
482}
483
484func (rc *httpClient) Do(request *http.Request) (response *http.Response, err error) {
485	return rc.config.HTTPClient.Do(request)
486}
487
488var oneLogLineRegex = regexp.MustCompile(`(?m)^\s*`)
489
490// oneLogLine removes indentation at the beginning of each line and
491// escapes new line characters.
492func oneLogLine(in []byte) []byte {
493	return bytes.Replace(oneLogLineRegex.ReplaceAll(in, nil), []byte("\n"), []byte("\\n "), -1)
494}
495