1package connection
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/json"
7	"errors"
8	"fmt"
9	"io"
10	"net"
11	"net/url"
12	"strings"
13	"time"
14
15	"code.cloudfoundry.org/garden"
16	"code.cloudfoundry.org/garden/routes"
17	"code.cloudfoundry.org/garden/transport"
18	"code.cloudfoundry.org/lager"
19	"github.com/tedsuo/rata"
20)
21
22var ErrDisconnected = errors.New("disconnected")
23var ErrInvalidMessage = errors.New("invalid message payload")
24
25//go:generate counterfeiter . Connection
26type Connection interface {
27	Ping() error
28
29	Capacity() (garden.Capacity, error)
30
31	Create(spec garden.ContainerSpec) (string, error)
32	List(properties garden.Properties) ([]string, error)
33
34	// Destroys the container with the given handle. If the container cannot be
35	// found, garden.ContainerNotFoundError is returned. If deletion fails for another
36	// reason, another error type is returned.
37	Destroy(handle string) error
38
39	Stop(handle string, kill bool) error
40
41	Info(handle string) (garden.ContainerInfo, error)
42	BulkInfo(handles []string) (map[string]garden.ContainerInfoEntry, error)
43	BulkMetrics(handles []string) (map[string]garden.ContainerMetricsEntry, error)
44
45	StreamIn(handle string, spec garden.StreamInSpec) error
46	StreamOut(handle string, spec garden.StreamOutSpec) (io.ReadCloser, error)
47
48	CurrentBandwidthLimits(handle string) (garden.BandwidthLimits, error)
49	CurrentCPULimits(handle string) (garden.CPULimits, error)
50	CurrentDiskLimits(handle string) (garden.DiskLimits, error)
51	CurrentMemoryLimits(handle string) (garden.MemoryLimits, error)
52
53	Run(handle string, spec garden.ProcessSpec, io garden.ProcessIO) (garden.Process, error)
54	Attach(handle string, processID string, io garden.ProcessIO) (garden.Process, error)
55
56	NetIn(handle string, hostPort, containerPort uint32) (uint32, uint32, error)
57	NetOut(handle string, rule garden.NetOutRule) error
58	BulkNetOut(handle string, rules []garden.NetOutRule) error
59
60	SetGraceTime(handle string, graceTime time.Duration) error
61
62	Properties(handle string) (garden.Properties, error)
63	Property(handle string, name string) (string, error)
64	SetProperty(handle string, name string, value string) error
65
66	Metrics(handle string) (garden.Metrics, error)
67	RemoveProperty(handle string, name string) error
68}
69
70//go:generate counterfeiter . HijackStreamer
71type HijackStreamer interface {
72	Stream(handler string, body io.Reader, params rata.Params, query url.Values, contentType string) (io.ReadCloser, error)
73	Hijack(handler string, body io.Reader, params rata.Params, query url.Values, contentType string) (net.Conn, *bufio.Reader, error)
74}
75
76type connection struct {
77	hijacker HijackStreamer
78	log      lager.Logger
79}
80
81type Error struct {
82	StatusCode int
83	Message    string
84}
85
86func (err Error) Error() string {
87	return err.Message
88}
89
90func New(network, address string) Connection {
91	return NewWithLogger(network, address, lager.NewLogger("garden-connection"))
92}
93
94func NewWithLogger(network, address string, logger lager.Logger) Connection {
95	hijacker := NewHijackStreamer(network, address)
96	return NewWithHijacker(hijacker, logger)
97}
98
99func NewWithDialerAndLogger(dialer DialerFunc, log lager.Logger) Connection {
100	hijacker := NewHijackStreamerWithDialer(dialer)
101	return NewWithHijacker(hijacker, log)
102}
103
104func NewWithHijacker(hijacker HijackStreamer, log lager.Logger) Connection {
105	return &connection{
106		hijacker: hijacker,
107		log:      log,
108	}
109}
110
111func (c *connection) Ping() error {
112	return c.do(routes.Ping, nil, &struct{}{}, nil, nil)
113}
114
115func (c *connection) Capacity() (garden.Capacity, error) {
116	capacity := garden.Capacity{}
117	err := c.do(routes.Capacity, nil, &capacity, nil, nil)
118	if err != nil {
119		return garden.Capacity{}, err
120	}
121
122	return capacity, nil
123}
124
125func (c *connection) Create(spec garden.ContainerSpec) (string, error) {
126	res := struct {
127		Handle string `json:"handle"`
128	}{}
129
130	err := c.do(routes.Create, spec, &res, nil, nil)
131	if err != nil {
132		return "", err
133	}
134
135	return res.Handle, nil
136}
137
138func (c *connection) Stop(handle string, kill bool) error {
139	return c.do(
140		routes.Stop,
141		map[string]bool{
142			"kill": kill,
143		},
144		&struct{}{},
145		rata.Params{
146			"handle": handle,
147		},
148		nil,
149	)
150}
151
152func (c *connection) Destroy(handle string) error {
153	return c.do(
154		routes.Destroy,
155		nil,
156		&struct{}{},
157		rata.Params{
158			"handle": handle,
159		},
160		nil,
161	)
162}
163
164func (c *connection) Run(handle string, spec garden.ProcessSpec, processIO garden.ProcessIO) (garden.Process, error) {
165	reqBody := new(bytes.Buffer)
166
167	err := transport.WriteMessage(reqBody, spec)
168	if err != nil {
169		return nil, err
170	}
171
172	hijackedConn, hijackedResponseReader, err := c.hijacker.Hijack(
173		routes.Run,
174		reqBody,
175		rata.Params{
176			"handle": handle,
177		},
178		nil,
179		"application/json",
180	)
181	if err != nil {
182		return nil, err
183	}
184
185	return c.streamProcess(handle, processIO, hijackedConn, hijackedResponseReader)
186}
187
188func (c *connection) Attach(handle string, processID string, processIO garden.ProcessIO) (garden.Process, error) {
189	reqBody := new(bytes.Buffer)
190
191	hijackedConn, hijackedResponseReader, err := c.hijacker.Hijack(
192		routes.Attach,
193		reqBody,
194		rata.Params{
195			"handle": handle,
196			"pid":    processID,
197		},
198		nil,
199		"",
200	)
201	if err != nil {
202		return nil, err
203	}
204
205	return c.streamProcess(handle, processIO, hijackedConn, hijackedResponseReader)
206}
207
208func (c *connection) streamProcess(handle string, processIO garden.ProcessIO, hijackedConn net.Conn, hijackedResponseReader *bufio.Reader) (garden.Process, error) {
209	decoder := json.NewDecoder(hijackedResponseReader)
210
211	payload := &transport.ProcessPayload{}
212	if err := decoder.Decode(payload); err != nil {
213		return nil, err
214	}
215
216	processPipeline := &processStream{
217		processID: payload.ProcessID,
218		conn:      hijackedConn,
219	}
220
221	hijack := func(streamType string) (net.Conn, io.Reader, error) {
222		params := rata.Params{
223			"handle":   handle,
224			"pid":      processPipeline.ProcessID(),
225			"streamid": payload.StreamID,
226		}
227
228		return c.hijacker.Hijack(
229			streamType,
230			nil,
231			params,
232			nil,
233			"application/json",
234		)
235	}
236
237	process := newProcess(payload.ProcessID, processPipeline)
238	streamHandler := newStreamHandler(c.log)
239	streamHandler.streamIn(processPipeline, processIO.Stdin)
240
241	var stdoutConn net.Conn
242	if processIO.Stdout != nil {
243		var (
244			stdout io.Reader
245			err    error
246		)
247		stdoutConn, stdout, err = hijack(routes.Stdout)
248		if err != nil {
249			werr := fmt.Errorf("connection: failed to hijack stream %s: %s", routes.Stdout, err)
250			process.exited(0, werr)
251			hijackedConn.Close()
252			return process, nil
253		}
254		streamHandler.streamOut(processIO.Stdout, stdout)
255	}
256
257	var stderrConn net.Conn
258	if processIO.Stderr != nil {
259		var (
260			stderr io.Reader
261			err    error
262		)
263		stderrConn, stderr, err = hijack(routes.Stderr)
264		if err != nil {
265			werr := fmt.Errorf("connection: failed to hijack stream %s: %s", routes.Stderr, err)
266			process.exited(0, werr)
267			hijackedConn.Close()
268			return process, nil
269		}
270		streamHandler.streamOut(processIO.Stderr, stderr)
271	}
272
273	go func() {
274		defer hijackedConn.Close()
275		if stdoutConn != nil {
276			defer stdoutConn.Close()
277		}
278		if stderrConn != nil {
279			defer stderrConn.Close()
280		}
281
282		exitCode, err := streamHandler.wait(decoder)
283		process.exited(exitCode, err)
284	}()
285
286	return process, nil
287}
288
289func (c *connection) NetIn(handle string, hostPort, containerPort uint32) (uint32, uint32, error) {
290	res := &transport.NetInResponse{}
291
292	err := c.do(
293		routes.NetIn,
294		&transport.NetInRequest{
295			Handle:        handle,
296			HostPort:      hostPort,
297			ContainerPort: containerPort,
298		},
299		res,
300		rata.Params{
301			"handle": handle,
302		},
303		nil,
304	)
305
306	if err != nil {
307		return 0, 0, err
308	}
309
310	return res.HostPort, res.ContainerPort, nil
311}
312
313func (c *connection) BulkNetOut(handle string, rules []garden.NetOutRule) error {
314	return c.do(
315		routes.BulkNetOut,
316		rules,
317		&struct{}{},
318		rata.Params{
319			"handle": handle,
320		},
321		nil,
322	)
323}
324
325func (c *connection) NetOut(handle string, rule garden.NetOutRule) error {
326	return c.do(
327		routes.NetOut,
328		rule,
329		&struct{}{},
330		rata.Params{
331			"handle": handle,
332		},
333		nil,
334	)
335}
336
337func (c *connection) Property(handle string, name string) (string, error) {
338	var res struct {
339		Value string `json:"value"`
340	}
341
342	err := c.do(
343		routes.Property,
344		nil,
345		&res,
346		rata.Params{
347			"handle": handle,
348			"key":    name,
349		},
350		nil,
351	)
352
353	return res.Value, err
354}
355
356func (c *connection) SetProperty(handle string, name string, value string) error {
357	err := c.do(
358		routes.SetProperty,
359		map[string]string{
360			"value": value,
361		},
362		&struct{}{},
363		rata.Params{
364			"handle": handle,
365			"key":    name,
366		},
367		nil,
368	)
369
370	if err != nil {
371		return err
372	}
373
374	return nil
375}
376
377func (c *connection) RemoveProperty(handle string, name string) error {
378	err := c.do(
379		routes.RemoveProperty,
380		nil,
381		&struct{}{},
382		rata.Params{
383			"handle": handle,
384			"key":    name,
385		},
386		nil,
387	)
388
389	if err != nil {
390		return err
391	}
392
393	return nil
394}
395
396func (c *connection) CurrentBandwidthLimits(handle string) (garden.BandwidthLimits, error) {
397	res := garden.BandwidthLimits{}
398
399	err := c.do(
400		routes.CurrentBandwidthLimits,
401		nil,
402		&res,
403		rata.Params{
404			"handle": handle,
405		},
406		nil,
407	)
408
409	return res, err
410}
411
412func (c *connection) CurrentCPULimits(handle string) (garden.CPULimits, error) {
413	res := garden.CPULimits{}
414
415	err := c.do(
416		routes.CurrentCPULimits,
417		nil,
418		&res,
419		rata.Params{
420			"handle": handle,
421		},
422		nil,
423	)
424
425	return res, err
426}
427
428func (c *connection) CurrentDiskLimits(handle string) (garden.DiskLimits, error) {
429	res := garden.DiskLimits{}
430
431	err := c.do(
432		routes.CurrentDiskLimits,
433		nil,
434		&res,
435		rata.Params{
436			"handle": handle,
437		},
438		nil,
439	)
440
441	return res, err
442}
443
444func (c *connection) CurrentMemoryLimits(handle string) (garden.MemoryLimits, error) {
445	res := garden.MemoryLimits{}
446
447	err := c.do(
448		routes.CurrentMemoryLimits,
449		nil,
450		&res,
451		rata.Params{
452			"handle": handle,
453		},
454		nil,
455	)
456
457	return res, err
458}
459
460func (c *connection) StreamIn(handle string, spec garden.StreamInSpec) error {
461	body, err := c.hijacker.Stream(
462		routes.StreamIn,
463		spec.TarStream,
464		rata.Params{
465			"handle": handle,
466		},
467		url.Values{
468			"user":        []string{spec.User},
469			"destination": []string{spec.Path},
470		},
471		"application/x-tar",
472	)
473	if err != nil {
474		return err
475	}
476
477	return body.Close()
478}
479
480func (c *connection) StreamOut(handle string, spec garden.StreamOutSpec) (io.ReadCloser, error) {
481	return c.hijacker.Stream(
482		routes.StreamOut,
483		nil,
484		rata.Params{
485			"handle": handle,
486		},
487		url.Values{
488			"user":   []string{spec.User},
489			"source": []string{spec.Path},
490		},
491		"",
492	)
493}
494
495func (c *connection) List(filterProperties garden.Properties) ([]string, error) {
496	values := url.Values{}
497	for name, val := range filterProperties {
498		values[name] = []string{val}
499	}
500
501	res := &struct {
502		Handles []string
503	}{}
504
505	if err := c.do(
506		routes.List,
507		nil,
508		&res,
509		nil,
510		values,
511	); err != nil {
512		return nil, err
513	}
514
515	return res.Handles, nil
516}
517
518func (c *connection) SetGraceTime(handle string, graceTime time.Duration) error {
519	return c.do(routes.SetGraceTime, graceTime, &struct{}{}, rata.Params{"handle": handle}, nil)
520}
521
522func (c *connection) Properties(handle string) (garden.Properties, error) {
523	res := make(garden.Properties)
524	err := c.do(routes.Properties, nil, &res, rata.Params{"handle": handle}, nil)
525	return res, err
526}
527
528func (c *connection) Metrics(handle string) (garden.Metrics, error) {
529	res := garden.Metrics{}
530	err := c.do(routes.Metrics, nil, &res, rata.Params{"handle": handle}, nil)
531	return res, err
532}
533
534func (c *connection) Info(handle string) (garden.ContainerInfo, error) {
535	res := garden.ContainerInfo{}
536
537	err := c.do(routes.Info, nil, &res, rata.Params{"handle": handle}, nil)
538	if err != nil {
539		return garden.ContainerInfo{}, err
540	}
541
542	return res, nil
543}
544
545func (c *connection) BulkInfo(handles []string) (map[string]garden.ContainerInfoEntry, error) {
546	res := make(map[string]garden.ContainerInfoEntry)
547	queryParams := url.Values{
548		"handles": []string{strings.Join(handles, ",")},
549	}
550	err := c.do(routes.BulkInfo, nil, &res, nil, queryParams)
551	return res, err
552}
553
554func (c *connection) BulkMetrics(handles []string) (map[string]garden.ContainerMetricsEntry, error) {
555	res := make(map[string]garden.ContainerMetricsEntry)
556	queryParams := url.Values{
557		"handles": []string{strings.Join(handles, ",")},
558	}
559	err := c.do(routes.BulkMetrics, nil, &res, nil, queryParams)
560	return res, err
561}
562
563func (c *connection) do(
564	handler string,
565	req, res interface{},
566	params rata.Params,
567	query url.Values,
568) error {
569	var body io.Reader
570
571	if req != nil {
572		buf := new(bytes.Buffer)
573
574		err := transport.WriteMessage(buf, req)
575		if err != nil {
576			return err
577		}
578
579		body = buf
580	}
581
582	contentType := ""
583	if req != nil {
584		contentType = "application/json"
585	}
586
587	response, err := c.hijacker.Stream(
588		handler,
589		body,
590		params,
591		query,
592		contentType,
593	)
594	if err != nil {
595		return err
596	}
597
598	defer response.Close()
599
600	return json.NewDecoder(response).Decode(res)
601}
602