1/* 2Copyright 2014 The go-marathon Authors All rights reserved. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package marathon 18 19import ( 20 "bytes" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "io" 25 "io/ioutil" 26 "log" 27 "net" 28 "net/http" 29 "net/url" 30 "regexp" 31 "strings" 32 "sync" 33 "time" 34) 35 36// Marathon is the interface to the marathon API 37type Marathon interface { 38 // -- APPLICATIONS --- 39 40 // get a listing of the application ids 41 ListApplications(url.Values) ([]string, error) 42 // a list of application versions 43 ApplicationVersions(name string) (*ApplicationVersions, error) 44 // check a application version exists 45 HasApplicationVersion(name, version string) (bool, error) 46 // change an application to a different version 47 SetApplicationVersion(name string, version *ApplicationVersion) (*DeploymentID, error) 48 // check if an application is ok 49 ApplicationOK(name string) (bool, error) 50 // create an application in marathon 51 CreateApplication(application *Application) (*Application, error) 52 // delete an application 53 DeleteApplication(name string, force bool) (*DeploymentID, error) 54 // update an application in marathon 55 UpdateApplication(application *Application, force bool) (*DeploymentID, error) 56 // a list of deployments on a application 57 ApplicationDeployments(name string) ([]*DeploymentID, error) 58 // scale a application 59 ScaleApplicationInstances(name string, instances int, force bool) (*DeploymentID, error) 60 // restart an application 61 RestartApplication(name string, force bool) (*DeploymentID, error) 62 // get a list of applications from marathon 63 Applications(url.Values) (*Applications, error) 64 // get an application by name 65 Application(name string) (*Application, error) 66 // get an application by options 67 ApplicationBy(name string, opts *GetAppOpts) (*Application, error) 68 // get an application by name and version 69 ApplicationByVersion(name, version string) (*Application, error) 70 // wait of application 71 WaitOnApplication(name string, timeout time.Duration) error 72 73 // -- PODS --- 74 // whether this version of Marathon supports pods 75 SupportsPods() (bool, error) 76 77 // get pod status 78 PodStatus(name string) (*PodStatus, error) 79 // get all pod statuses 80 PodStatuses() ([]*PodStatus, error) 81 82 // get pod 83 Pod(name string) (*Pod, error) 84 // get all pods 85 Pods() ([]Pod, error) 86 // create pod 87 CreatePod(pod *Pod) (*Pod, error) 88 // update pod 89 UpdatePod(pod *Pod, force bool) (*Pod, error) 90 // delete pod 91 DeletePod(name string, force bool) (*DeploymentID, error) 92 // wait on pod to be deployed 93 WaitOnPod(name string, timeout time.Duration) error 94 // check if a pod is running 95 PodIsRunning(name string) bool 96 97 // get versions of a pod 98 PodVersions(name string) ([]string, error) 99 // get pod by version 100 PodByVersion(name, version string) (*Pod, error) 101 102 // delete instances of a pod 103 DeletePodInstances(name string, instances []string) ([]*PodInstance, error) 104 // delete pod instance 105 DeletePodInstance(name, instance string) (*PodInstance, error) 106 107 // -- TASKS --- 108 109 // get a list of tasks for a specific application 110 Tasks(application string) (*Tasks, error) 111 // get a list of all tasks 112 AllTasks(opts *AllTasksOpts) (*Tasks, error) 113 // get the endpoints for a service on a application 114 TaskEndpoints(name string, port int, healthCheck bool) ([]string, error) 115 // kill all the tasks for any application 116 KillApplicationTasks(applicationID string, opts *KillApplicationTasksOpts) (*Tasks, error) 117 // kill a single task 118 KillTask(taskID string, opts *KillTaskOpts) (*Task, error) 119 // kill the given array of tasks 120 KillTasks(taskIDs []string, opts *KillTaskOpts) error 121 122 // --- GROUPS --- 123 124 // list all the groups in the system 125 Groups() (*Groups, error) 126 // retrieve a specific group from marathon 127 Group(name string) (*Group, error) 128 // list all groups in marathon by options 129 GroupsBy(opts *GetGroupOpts) (*Groups, error) 130 // retrieve a specific group from marathon by options 131 GroupBy(name string, opts *GetGroupOpts) (*Group, error) 132 // create a group deployment 133 CreateGroup(group *Group) error 134 // delete a group 135 DeleteGroup(name string, force bool) (*DeploymentID, error) 136 // update a groups 137 UpdateGroup(id string, group *Group, force bool) (*DeploymentID, error) 138 // check if a group exists 139 HasGroup(name string) (bool, error) 140 // wait for an group to be deployed 141 WaitOnGroup(name string, timeout time.Duration) error 142 143 // --- DEPLOYMENTS --- 144 145 // get a list of the deployments 146 Deployments() ([]*Deployment, error) 147 // delete a deployment 148 DeleteDeployment(id string, force bool) (*DeploymentID, error) 149 // check to see if a deployment exists 150 HasDeployment(id string) (bool, error) 151 // wait of a deployment to finish 152 WaitOnDeployment(id string, timeout time.Duration) error 153 154 // --- SUBSCRIPTIONS --- 155 156 // a list of current subscriptions 157 Subscriptions() (*Subscriptions, error) 158 // add a events listener 159 AddEventsListener(filter int) (EventsChannel, error) 160 // remove a events listener 161 RemoveEventsListener(channel EventsChannel) 162 // Subscribe a callback URL 163 Subscribe(string) error 164 // Unsubscribe a callback URL 165 Unsubscribe(string) error 166 167 // --- QUEUE --- 168 // get marathon launch queue 169 Queue() (*Queue, error) 170 // resets task launch delay of the specific application 171 DeleteQueueDelay(appID string) error 172 173 // --- MISC --- 174 175 // get the marathon url 176 GetMarathonURL() string 177 // ping the marathon 178 Ping() (bool, error) 179 // grab the marathon server info 180 Info() (*Info, error) 181 // retrieve the leader info 182 Leader() (string, error) 183 // cause the current leader to abdicate 184 AbdicateLeader() (string, error) 185} 186 187var ( 188 // ErrMarathonDown is thrown when all the marathon endpoints are down 189 ErrMarathonDown = errors.New("all the Marathon hosts are presently down") 190 // ErrTimeoutError is thrown when the operation has timed out 191 ErrTimeoutError = errors.New("the operation has timed out") 192 193 // Default HTTP client used for SSE subscription requests 194 // It is invalid to set client.Timeout because it includes time to read response so 195 // set dial, tls handshake and response header timeouts instead 196 defaultHTTPSSEClient = &http.Client{ 197 Transport: &http.Transport{ 198 Dial: (&net.Dialer{ 199 Timeout: 5 * time.Second, 200 }).Dial, 201 ResponseHeaderTimeout: 10 * time.Second, 202 TLSHandshakeTimeout: 5 * time.Second, 203 }, 204 } 205 206 // Default HTTP client used for non SSE requests 207 defaultHTTPClient = &http.Client{ 208 Timeout: 10 * time.Second, 209 } 210) 211 212// EventsChannelContext holds contextual data for an EventsChannel. 213type EventsChannelContext struct { 214 filter int 215 done chan struct{} 216 completion *sync.WaitGroup 217} 218 219type marathonClient struct { 220 sync.RWMutex 221 // the configuration for the client 222 config Config 223 // the flag used to prevent multiple SSE subscriptions 224 subscribedToSSE bool 225 // the ip address of the client 226 ipAddress string 227 // the http server 228 eventsHTTP *http.Server 229 // the marathon hosts 230 hosts *cluster 231 // a map of service you wish to listen to 232 listeners map[EventsChannel]EventsChannelContext 233 // a custom log function for debug messages 234 debugLog func(format string, v ...interface{}) 235 // the marathon HTTP client to ensure consistency in requests 236 client *httpClient 237} 238 239type httpClient struct { 240 // the configuration for the marathon HTTP client 241 config Config 242} 243 244// newRequestError signals that creating a new http.Request failed 245type newRequestError struct { 246 error 247} 248 249// NewClient creates a new marathon client 250// config: the configuration to use 251func NewClient(config Config) (Marathon, error) { 252 // step: if the SSE HTTP client is missing, prefer a configured regular 253 // client, and otherwise use the default SSE HTTP client. 254 if config.HTTPSSEClient == nil { 255 config.HTTPSSEClient = defaultHTTPSSEClient 256 if config.HTTPClient != nil { 257 config.HTTPSSEClient = config.HTTPClient 258 } 259 } 260 261 // step: if a regular HTTP client is missing, use the default one. 262 if config.HTTPClient == nil { 263 config.HTTPClient = defaultHTTPClient 264 } 265 266 // step: if no polling wait time is set, default to 500 milliseconds. 267 if config.PollingWaitTime == 0 { 268 config.PollingWaitTime = defaultPollingWaitTime 269 } 270 271 // step: setup shared client 272 client := &httpClient{config: config} 273 274 // step: create a new cluster 275 hosts, err := newCluster(client, config.URL, config.DCOSToken != "") 276 if err != nil { 277 return nil, err 278 } 279 280 debugLog := func(string, ...interface{}) {} 281 if config.LogOutput != nil { 282 logger := log.New(config.LogOutput, "", 0) 283 debugLog = func(format string, v ...interface{}) { 284 logger.Printf(format, v...) 285 } 286 } 287 288 return &marathonClient{ 289 config: config, 290 listeners: make(map[EventsChannel]EventsChannelContext), 291 hosts: hosts, 292 debugLog: debugLog, 293 client: client, 294 }, nil 295} 296 297// GetMarathonURL retrieves the marathon url 298func (r *marathonClient) GetMarathonURL() string { 299 return r.config.URL 300} 301 302// Ping pings the current marathon endpoint (note, this is not a ICMP ping, but a rest api call) 303func (r *marathonClient) Ping() (bool, error) { 304 if err := r.apiGet(marathonAPIPing, nil, nil); err != nil { 305 return false, err 306 } 307 return true, nil 308} 309 310func (r *marathonClient) apiHead(path string, result interface{}) error { 311 return r.apiCall("HEAD", path, nil, result) 312} 313 314func (r *marathonClient) apiGet(path string, post, result interface{}) error { 315 return r.apiCall("GET", path, post, result) 316} 317 318func (r *marathonClient) apiPut(path string, post, result interface{}) error { 319 return r.apiCall("PUT", path, post, result) 320} 321 322func (r *marathonClient) apiPost(path string, post, result interface{}) error { 323 return r.apiCall("POST", path, post, result) 324} 325 326func (r *marathonClient) apiDelete(path string, post, result interface{}) error { 327 return r.apiCall("DELETE", path, post, result) 328} 329 330func (r *marathonClient) apiCall(method, path string, body, result interface{}) error { 331 const deploymentHeader = "Marathon-Deployment-Id" 332 333 for { 334 // step: marshall the request to json 335 var requestBody []byte 336 var err error 337 if body != nil { 338 if requestBody, err = json.Marshal(body); err != nil { 339 return err 340 } 341 } 342 343 // step: create the API request 344 request, member, err := r.buildAPIRequest(method, path, bytes.NewReader(requestBody)) 345 if err != nil { 346 return err 347 } 348 349 // step: perform the API request 350 response, err := r.client.Do(request) 351 if err != nil { 352 r.hosts.markDown(member) 353 // step: attempt the request on another member 354 r.debugLog("apiCall(): request failed on host: %s, error: %s, trying another", member, err) 355 continue 356 } 357 defer response.Body.Close() 358 359 // step: read the response body 360 respBody, err := ioutil.ReadAll(response.Body) 361 if err != nil { 362 return err 363 } 364 365 if len(requestBody) > 0 { 366 r.debugLog("apiCall(): %v %v %s returned %v %s", request.Method, request.URL.String(), requestBody, response.Status, oneLogLine(respBody)) 367 } else { 368 r.debugLog("apiCall(): %v %v returned %v %s", request.Method, request.URL.String(), response.Status, oneLogLine(respBody)) 369 } 370 371 // step: check for a successful response 372 if response.StatusCode >= 200 && response.StatusCode <= 299 { 373 if result != nil { 374 // If we have a deployment ID header and no response body, give them that 375 // This specifically handles the use case of a DELETE on an app/pod 376 // We need a way to retrieve the deployment ID 377 deploymentID := response.Header.Get(deploymentHeader) 378 if len(respBody) == 0 && deploymentID != "" { 379 d := DeploymentID{ 380 DeploymentID: deploymentID, 381 } 382 if deployID, ok := result.(*DeploymentID); ok { 383 *deployID = d 384 } 385 } else { 386 if err := json.Unmarshal(respBody, result); err != nil { 387 return fmt.Errorf("failed to unmarshal response from Marathon: %s", err) 388 } 389 } 390 } 391 return nil 392 } 393 394 // step: if the member node returns a >= 500 && <= 599 we should try another node? 395 if response.StatusCode >= 500 && response.StatusCode <= 599 { 396 // step: mark the host as down 397 r.hosts.markDown(member) 398 r.debugLog("apiCall(): request failed, host: %s, status: %d, trying another", member, response.StatusCode) 399 continue 400 } 401 402 return NewAPIError(response.StatusCode, respBody) 403 } 404} 405 406// wait waits until the provided function returns true (or times out) 407func (r *marathonClient) wait(name string, timeout time.Duration, fn func(string) bool) error { 408 timer := time.NewTimer(timeout) 409 defer timer.Stop() 410 411 ticker := time.NewTicker(r.config.PollingWaitTime) 412 defer ticker.Stop() 413 for { 414 if fn(name) { 415 return nil 416 } 417 418 select { 419 case <-timer.C: 420 return ErrTimeoutError 421 case <-ticker.C: 422 continue 423 } 424 } 425} 426 427// buildAPIRequest creates a default API request. 428// It fails when there is no available member in the cluster anymore or when the request can not be built. 429func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader) (request *http.Request, member string, err error) { 430 // Grab a member from the cluster 431 member, err = r.hosts.getMember() 432 if err != nil { 433 return nil, "", ErrMarathonDown 434 } 435 436 // Build the HTTP request to Marathon 437 request, err = r.client.buildMarathonJSONRequest(method, member, path, reader) 438 if err != nil { 439 return nil, member, newRequestError{err} 440 } 441 return request, member, nil 442} 443 444// buildMarathonJSONRequest is like buildMarathonRequest but sets the 445// Content-Type and Accept headers to application/json. 446func (rc *httpClient) buildMarathonJSONRequest(method, member, path string, reader io.Reader) (request *http.Request, err error) { 447 req, err := rc.buildMarathonRequest(method, member, path, reader) 448 if err == nil { 449 req.Header.Add("Content-Type", "application/json") 450 req.Header.Add("Accept", "application/json") 451 } 452 453 return req, err 454} 455 456// buildMarathonRequest creates a new HTTP request and configures it according to the *httpClient configuration. 457// The path must not contain a leading "/", otherwise buildMarathonRequest will panic. 458func (rc *httpClient) buildMarathonRequest(method, member, path string, reader io.Reader) (request *http.Request, err error) { 459 if strings.HasPrefix(path, "/") { 460 panic(fmt.Sprintf("Path '%s' must not start with a leading slash", path)) 461 } 462 463 // Create the endpoint URL 464 url := fmt.Sprintf("%s/%s", member, path) 465 466 // Instantiate an HTTP request 467 request, err = http.NewRequest(method, url, reader) 468 if err != nil { 469 return nil, err 470 } 471 472 // Add any basic auth and the content headers 473 if rc.config.HTTPBasicAuthUser != "" && rc.config.HTTPBasicPassword != "" { 474 request.SetBasicAuth(rc.config.HTTPBasicAuthUser, rc.config.HTTPBasicPassword) 475 } 476 477 if rc.config.DCOSToken != "" { 478 request.Header.Add("Authorization", "token="+rc.config.DCOSToken) 479 } 480 481 return request, nil 482} 483 484func (rc *httpClient) Do(request *http.Request) (response *http.Response, err error) { 485 return rc.config.HTTPClient.Do(request) 486} 487 488var oneLogLineRegex = regexp.MustCompile(`(?m)^\s*`) 489 490// oneLogLine removes indentation at the beginning of each line and 491// escapes new line characters. 492func oneLogLine(in []byte) []byte { 493 return bytes.Replace(oneLogLineRegex.ReplaceAll(in, nil), []byte("\n"), []byte("\\n "), -1) 494} 495