1package connection 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io" 10 "net" 11 "net/url" 12 "strings" 13 "time" 14 15 "code.cloudfoundry.org/garden" 16 "code.cloudfoundry.org/garden/routes" 17 "code.cloudfoundry.org/garden/transport" 18 "code.cloudfoundry.org/lager" 19 "github.com/tedsuo/rata" 20) 21 22var ErrDisconnected = errors.New("disconnected") 23var ErrInvalidMessage = errors.New("invalid message payload") 24 25//go:generate counterfeiter . Connection 26type Connection interface { 27 Ping() error 28 29 Capacity() (garden.Capacity, error) 30 31 Create(spec garden.ContainerSpec) (string, error) 32 List(properties garden.Properties) ([]string, error) 33 34 // Destroys the container with the given handle. If the container cannot be 35 // found, garden.ContainerNotFoundError is returned. If deletion fails for another 36 // reason, another error type is returned. 37 Destroy(handle string) error 38 39 Stop(handle string, kill bool) error 40 41 Info(handle string) (garden.ContainerInfo, error) 42 BulkInfo(handles []string) (map[string]garden.ContainerInfoEntry, error) 43 BulkMetrics(handles []string) (map[string]garden.ContainerMetricsEntry, error) 44 45 StreamIn(handle string, spec garden.StreamInSpec) error 46 StreamOut(handle string, spec garden.StreamOutSpec) (io.ReadCloser, error) 47 48 CurrentBandwidthLimits(handle string) (garden.BandwidthLimits, error) 49 CurrentCPULimits(handle string) (garden.CPULimits, error) 50 CurrentDiskLimits(handle string) (garden.DiskLimits, error) 51 CurrentMemoryLimits(handle string) (garden.MemoryLimits, error) 52 53 Run(handle string, spec garden.ProcessSpec, io garden.ProcessIO) (garden.Process, error) 54 Attach(handle string, processID string, io garden.ProcessIO) (garden.Process, error) 55 56 NetIn(handle string, hostPort, containerPort uint32) (uint32, uint32, error) 57 NetOut(handle string, rule garden.NetOutRule) error 58 BulkNetOut(handle string, rules []garden.NetOutRule) error 59 60 SetGraceTime(handle string, graceTime time.Duration) error 61 62 Properties(handle string) (garden.Properties, error) 63 Property(handle string, name string) (string, error) 64 SetProperty(handle string, name string, value string) error 65 66 Metrics(handle string) (garden.Metrics, error) 67 RemoveProperty(handle string, name string) error 68} 69 70//go:generate counterfeiter . HijackStreamer 71type HijackStreamer interface { 72 Stream(handler string, body io.Reader, params rata.Params, query url.Values, contentType string) (io.ReadCloser, error) 73 Hijack(handler string, body io.Reader, params rata.Params, query url.Values, contentType string) (net.Conn, *bufio.Reader, error) 74} 75 76type connection struct { 77 hijacker HijackStreamer 78 log lager.Logger 79} 80 81type Error struct { 82 StatusCode int 83 Message string 84} 85 86func (err Error) Error() string { 87 return err.Message 88} 89 90func New(network, address string) Connection { 91 return NewWithLogger(network, address, lager.NewLogger("garden-connection")) 92} 93 94func NewWithLogger(network, address string, logger lager.Logger) Connection { 95 hijacker := NewHijackStreamer(network, address) 96 return NewWithHijacker(hijacker, logger) 97} 98 99func NewWithDialerAndLogger(dialer DialerFunc, log lager.Logger) Connection { 100 hijacker := NewHijackStreamerWithDialer(dialer) 101 return NewWithHijacker(hijacker, log) 102} 103 104func NewWithHijacker(hijacker HijackStreamer, log lager.Logger) Connection { 105 return &connection{ 106 hijacker: hijacker, 107 log: log, 108 } 109} 110 111func (c *connection) Ping() error { 112 return c.do(routes.Ping, nil, &struct{}{}, nil, nil) 113} 114 115func (c *connection) Capacity() (garden.Capacity, error) { 116 capacity := garden.Capacity{} 117 err := c.do(routes.Capacity, nil, &capacity, nil, nil) 118 if err != nil { 119 return garden.Capacity{}, err 120 } 121 122 return capacity, nil 123} 124 125func (c *connection) Create(spec garden.ContainerSpec) (string, error) { 126 res := struct { 127 Handle string `json:"handle"` 128 }{} 129 130 err := c.do(routes.Create, spec, &res, nil, nil) 131 if err != nil { 132 return "", err 133 } 134 135 return res.Handle, nil 136} 137 138func (c *connection) Stop(handle string, kill bool) error { 139 return c.do( 140 routes.Stop, 141 map[string]bool{ 142 "kill": kill, 143 }, 144 &struct{}{}, 145 rata.Params{ 146 "handle": handle, 147 }, 148 nil, 149 ) 150} 151 152func (c *connection) Destroy(handle string) error { 153 return c.do( 154 routes.Destroy, 155 nil, 156 &struct{}{}, 157 rata.Params{ 158 "handle": handle, 159 }, 160 nil, 161 ) 162} 163 164func (c *connection) Run(handle string, spec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error) { 165 reqBody := new(bytes.Buffer) 166 167 err := transport.WriteMessage(reqBody, spec) 168 if err != nil { 169 return nil, err 170 } 171 172 hijackedConn, hijackedResponseReader, err := c.hijacker.Hijack( 173 routes.Run, 174 reqBody, 175 rata.Params{ 176 "handle": handle, 177 }, 178 nil, 179 "application/json", 180 ) 181 if err != nil { 182 return nil, err 183 } 184 185 return c.streamProcess(handle, processIO, hijackedConn, hijackedResponseReader) 186} 187 188func (c *connection) Attach(handle string, processID string, processIO garden.ProcessIO) (garden.Process, error) { 189 reqBody := new(bytes.Buffer) 190 191 hijackedConn, hijackedResponseReader, err := c.hijacker.Hijack( 192 routes.Attach, 193 reqBody, 194 rata.Params{ 195 "handle": handle, 196 "pid": processID, 197 }, 198 nil, 199 "", 200 ) 201 if err != nil { 202 return nil, err 203 } 204 205 return c.streamProcess(handle, processIO, hijackedConn, hijackedResponseReader) 206} 207 208func (c *connection) streamProcess(handle string, processIO garden.ProcessIO, hijackedConn net.Conn, hijackedResponseReader *bufio.Reader) (garden.Process, error) { 209 decoder := json.NewDecoder(hijackedResponseReader) 210 211 payload := &transport.ProcessPayload{} 212 if err := decoder.Decode(payload); err != nil { 213 return nil, err 214 } 215 216 processPipeline := &processStream{ 217 processID: payload.ProcessID, 218 conn: hijackedConn, 219 } 220 221 hijack := func(streamType string) (net.Conn, io.Reader, error) { 222 params := rata.Params{ 223 "handle": handle, 224 "pid": processPipeline.ProcessID(), 225 "streamid": payload.StreamID, 226 } 227 228 return c.hijacker.Hijack( 229 streamType, 230 nil, 231 params, 232 nil, 233 "application/json", 234 ) 235 } 236 237 process := newProcess(payload.ProcessID, processPipeline) 238 streamHandler := newStreamHandler(c.log) 239 streamHandler.streamIn(processPipeline, processIO.Stdin) 240 241 var stdoutConn net.Conn 242 if processIO.Stdout != nil { 243 var ( 244 stdout io.Reader 245 err error 246 ) 247 stdoutConn, stdout, err = hijack(routes.Stdout) 248 if err != nil { 249 werr := fmt.Errorf("connection: failed to hijack stream %s: %s", routes.Stdout, err) 250 process.exited(0, werr) 251 hijackedConn.Close() 252 return process, nil 253 } 254 streamHandler.streamOut(processIO.Stdout, stdout) 255 } 256 257 var stderrConn net.Conn 258 if processIO.Stderr != nil { 259 var ( 260 stderr io.Reader 261 err error 262 ) 263 stderrConn, stderr, err = hijack(routes.Stderr) 264 if err != nil { 265 werr := fmt.Errorf("connection: failed to hijack stream %s: %s", routes.Stderr, err) 266 process.exited(0, werr) 267 hijackedConn.Close() 268 return process, nil 269 } 270 streamHandler.streamOut(processIO.Stderr, stderr) 271 } 272 273 go func() { 274 defer hijackedConn.Close() 275 if stdoutConn != nil { 276 defer stdoutConn.Close() 277 } 278 if stderrConn != nil { 279 defer stderrConn.Close() 280 } 281 282 exitCode, err := streamHandler.wait(decoder) 283 process.exited(exitCode, err) 284 }() 285 286 return process, nil 287} 288 289func (c *connection) NetIn(handle string, hostPort, containerPort uint32) (uint32, uint32, error) { 290 res := &transport.NetInResponse{} 291 292 err := c.do( 293 routes.NetIn, 294 &transport.NetInRequest{ 295 Handle: handle, 296 HostPort: hostPort, 297 ContainerPort: containerPort, 298 }, 299 res, 300 rata.Params{ 301 "handle": handle, 302 }, 303 nil, 304 ) 305 306 if err != nil { 307 return 0, 0, err 308 } 309 310 return res.HostPort, res.ContainerPort, nil 311} 312 313func (c *connection) BulkNetOut(handle string, rules []garden.NetOutRule) error { 314 return c.do( 315 routes.BulkNetOut, 316 rules, 317 &struct{}{}, 318 rata.Params{ 319 "handle": handle, 320 }, 321 nil, 322 ) 323} 324 325func (c *connection) NetOut(handle string, rule garden.NetOutRule) error { 326 return c.do( 327 routes.NetOut, 328 rule, 329 &struct{}{}, 330 rata.Params{ 331 "handle": handle, 332 }, 333 nil, 334 ) 335} 336 337func (c *connection) Property(handle string, name string) (string, error) { 338 var res struct { 339 Value string `json:"value"` 340 } 341 342 err := c.do( 343 routes.Property, 344 nil, 345 &res, 346 rata.Params{ 347 "handle": handle, 348 "key": name, 349 }, 350 nil, 351 ) 352 353 return res.Value, err 354} 355 356func (c *connection) SetProperty(handle string, name string, value string) error { 357 err := c.do( 358 routes.SetProperty, 359 map[string]string{ 360 "value": value, 361 }, 362 &struct{}{}, 363 rata.Params{ 364 "handle": handle, 365 "key": name, 366 }, 367 nil, 368 ) 369 370 if err != nil { 371 return err 372 } 373 374 return nil 375} 376 377func (c *connection) RemoveProperty(handle string, name string) error { 378 err := c.do( 379 routes.RemoveProperty, 380 nil, 381 &struct{}{}, 382 rata.Params{ 383 "handle": handle, 384 "key": name, 385 }, 386 nil, 387 ) 388 389 if err != nil { 390 return err 391 } 392 393 return nil 394} 395 396func (c *connection) CurrentBandwidthLimits(handle string) (garden.BandwidthLimits, error) { 397 res := garden.BandwidthLimits{} 398 399 err := c.do( 400 routes.CurrentBandwidthLimits, 401 nil, 402 &res, 403 rata.Params{ 404 "handle": handle, 405 }, 406 nil, 407 ) 408 409 return res, err 410} 411 412func (c *connection) CurrentCPULimits(handle string) (garden.CPULimits, error) { 413 res := garden.CPULimits{} 414 415 err := c.do( 416 routes.CurrentCPULimits, 417 nil, 418 &res, 419 rata.Params{ 420 "handle": handle, 421 }, 422 nil, 423 ) 424 425 return res, err 426} 427 428func (c *connection) CurrentDiskLimits(handle string) (garden.DiskLimits, error) { 429 res := garden.DiskLimits{} 430 431 err := c.do( 432 routes.CurrentDiskLimits, 433 nil, 434 &res, 435 rata.Params{ 436 "handle": handle, 437 }, 438 nil, 439 ) 440 441 return res, err 442} 443 444func (c *connection) CurrentMemoryLimits(handle string) (garden.MemoryLimits, error) { 445 res := garden.MemoryLimits{} 446 447 err := c.do( 448 routes.CurrentMemoryLimits, 449 nil, 450 &res, 451 rata.Params{ 452 "handle": handle, 453 }, 454 nil, 455 ) 456 457 return res, err 458} 459 460func (c *connection) StreamIn(handle string, spec garden.StreamInSpec) error { 461 body, err := c.hijacker.Stream( 462 routes.StreamIn, 463 spec.TarStream, 464 rata.Params{ 465 "handle": handle, 466 }, 467 url.Values{ 468 "user": []string{spec.User}, 469 "destination": []string{spec.Path}, 470 }, 471 "application/x-tar", 472 ) 473 if err != nil { 474 return err 475 } 476 477 return body.Close() 478} 479 480func (c *connection) StreamOut(handle string, spec garden.StreamOutSpec) (io.ReadCloser, error) { 481 return c.hijacker.Stream( 482 routes.StreamOut, 483 nil, 484 rata.Params{ 485 "handle": handle, 486 }, 487 url.Values{ 488 "user": []string{spec.User}, 489 "source": []string{spec.Path}, 490 }, 491 "", 492 ) 493} 494 495func (c *connection) List(filterProperties garden.Properties) ([]string, error) { 496 values := url.Values{} 497 for name, val := range filterProperties { 498 values[name] = []string{val} 499 } 500 501 res := &struct { 502 Handles []string 503 }{} 504 505 if err := c.do( 506 routes.List, 507 nil, 508 &res, 509 nil, 510 values, 511 ); err != nil { 512 return nil, err 513 } 514 515 return res.Handles, nil 516} 517 518func (c *connection) SetGraceTime(handle string, graceTime time.Duration) error { 519 return c.do(routes.SetGraceTime, graceTime, &struct{}{}, rata.Params{"handle": handle}, nil) 520} 521 522func (c *connection) Properties(handle string) (garden.Properties, error) { 523 res := make(garden.Properties) 524 err := c.do(routes.Properties, nil, &res, rata.Params{"handle": handle}, nil) 525 return res, err 526} 527 528func (c *connection) Metrics(handle string) (garden.Metrics, error) { 529 res := garden.Metrics{} 530 err := c.do(routes.Metrics, nil, &res, rata.Params{"handle": handle}, nil) 531 return res, err 532} 533 534func (c *connection) Info(handle string) (garden.ContainerInfo, error) { 535 res := garden.ContainerInfo{} 536 537 err := c.do(routes.Info, nil, &res, rata.Params{"handle": handle}, nil) 538 if err != nil { 539 return garden.ContainerInfo{}, err 540 } 541 542 return res, nil 543} 544 545func (c *connection) BulkInfo(handles []string) (map[string]garden.ContainerInfoEntry, error) { 546 res := make(map[string]garden.ContainerInfoEntry) 547 queryParams := url.Values{ 548 "handles": []string{strings.Join(handles, ",")}, 549 } 550 err := c.do(routes.BulkInfo, nil, &res, nil, queryParams) 551 return res, err 552} 553 554func (c *connection) BulkMetrics(handles []string) (map[string]garden.ContainerMetricsEntry, error) { 555 res := make(map[string]garden.ContainerMetricsEntry) 556 queryParams := url.Values{ 557 "handles": []string{strings.Join(handles, ",")}, 558 } 559 err := c.do(routes.BulkMetrics, nil, &res, nil, queryParams) 560 return res, err 561} 562 563func (c *connection) do( 564 handler string, 565 req, res interface{}, 566 params rata.Params, 567 query url.Values, 568) error { 569 var body io.Reader 570 571 if req != nil { 572 buf := new(bytes.Buffer) 573 574 err := transport.WriteMessage(buf, req) 575 if err != nil { 576 return err 577 } 578 579 body = buf 580 } 581 582 contentType := "" 583 if req != nil { 584 contentType = "application/json" 585 } 586 587 response, err := c.hijacker.Stream( 588 handler, 589 body, 590 params, 591 query, 592 contentType, 593 ) 594 if err != nil { 595 return err 596 } 597 598 defer response.Close() 599 600 return json.NewDecoder(response).Decode(res) 601} 602