1package api
2
3import (
4	"encoding/json"
5	"fmt"
6	"io"
7	"net"
8	"strconv"
9	"sync"
10	"time"
11)
12
13const (
14	// OriginStart and OriginEnd are the available parameters for the origin
15	// argument when streaming a file. They respectively offset from the start
16	// and end of a file.
17	OriginStart = "start"
18	OriginEnd   = "end"
19)
20
21// AllocFileInfo holds information about a file inside the AllocDir
22type AllocFileInfo struct {
23	Name        string
24	IsDir       bool
25	Size        int64
26	FileMode    string
27	ModTime     time.Time
28	ContentType string
29}
30
31// StreamFrame is used to frame data of a file when streaming
32type StreamFrame struct {
33	Offset    int64  `json:",omitempty"`
34	Data      []byte `json:",omitempty"`
35	File      string `json:",omitempty"`
36	FileEvent string `json:",omitempty"`
37}
38
39// IsHeartbeat returns if the frame is a heartbeat frame
40func (s *StreamFrame) IsHeartbeat() bool {
41	return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0
42}
43
44// AllocFS is used to introspect an allocation directory on a Nomad client
45type AllocFS struct {
46	client *Client
47}
48
49// AllocFS returns an handle to the AllocFS endpoints
50func (c *Client) AllocFS() *AllocFS {
51	return &AllocFS{client: c}
52}
53
54// List is used to list the files at a given path of an allocation directory
55func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
56	if q == nil {
57		q = &QueryOptions{}
58	}
59	if q.Params == nil {
60		q.Params = make(map[string]string)
61	}
62	q.Params["path"] = path
63
64	var resp []*AllocFileInfo
65	qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
66	if err != nil {
67		return nil, nil, err
68	}
69
70	return resp, qm, nil
71}
72
73// Stat is used to stat a file at a given path of an allocation directory
74func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
75	if q == nil {
76		q = &QueryOptions{}
77	}
78	if q.Params == nil {
79		q.Params = make(map[string]string)
80	}
81
82	q.Params["path"] = path
83
84	var resp AllocFileInfo
85	qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
86	if err != nil {
87		return nil, nil, err
88	}
89	return &resp, qm, nil
90}
91
92// ReadAt is used to read bytes at a given offset until limit at the given path
93// in an allocation directory. If limit is <= 0, there is no limit.
94func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
95	reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
96
97	return queryClientNode(a.client, alloc, reqPath, q,
98		func(q *QueryOptions) {
99			q.Params["path"] = path
100			q.Params["offset"] = strconv.FormatInt(offset, 10)
101			q.Params["limit"] = strconv.FormatInt(limit, 10)
102		})
103}
104
105// Cat is used to read contents of a file at the given path in an allocation
106// directory
107func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
108	reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
109	return queryClientNode(a.client, alloc, reqPath, q,
110		func(q *QueryOptions) {
111			q.Params["path"] = path
112		})
113}
114
115// Stream streams the content of a file blocking on EOF.
116// The parameters are:
117// * path: path to file to stream.
118// * offset: The offset to start streaming data at.
119// * origin: Either "start" or "end" and defines from where the offset is applied.
120// * cancel: A channel that when closed, streaming will end.
121//
122// The return value is a channel that will emit StreamFrames as they are read.
123func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
124	cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
125
126	errCh := make(chan error, 1)
127
128	reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
129	r, err := queryClientNode(a.client, alloc, reqPath, q,
130		func(q *QueryOptions) {
131			q.Params["path"] = path
132			q.Params["offset"] = strconv.FormatInt(offset, 10)
133			q.Params["origin"] = origin
134		})
135	if err != nil {
136		errCh <- err
137		return nil, errCh
138	}
139
140	// Create the output channel
141	frames := make(chan *StreamFrame, 10)
142
143	go func() {
144		// Close the body
145		defer r.Close()
146
147		// Create a decoder
148		dec := json.NewDecoder(r)
149
150		for {
151			// Check if we have been cancelled
152			select {
153			case <-cancel:
154				return
155			default:
156			}
157
158			// Decode the next frame
159			var frame StreamFrame
160			if err := dec.Decode(&frame); err != nil {
161				errCh <- err
162				close(frames)
163				return
164			}
165
166			// Discard heartbeat frames
167			if frame.IsHeartbeat() {
168				continue
169			}
170
171			frames <- &frame
172		}
173	}()
174
175	return frames, errCh
176}
177
178func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
179	nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
180
181	if q == nil {
182		q = &QueryOptions{}
183	}
184	if q.Params == nil {
185		q.Params = make(map[string]string)
186	}
187	if customizeQ != nil {
188		customizeQ(q)
189	}
190
191	var r io.ReadCloser
192	var err error
193
194	if nodeClient != nil {
195		r, err = nodeClient.rawQuery(reqPath, q)
196		if _, ok := err.(net.Error); err != nil && !ok {
197			// found a non networking error talking to client directly
198			return nil, err
199		}
200
201	}
202
203	// failed to query node, access through server directly
204	// or network error when talking to the client directly
205	if r == nil {
206		return c.rawQuery(reqPath, q)
207	}
208
209	return r, err
210}
211
212// Logs streams the content of a tasks logs blocking on EOF.
213// The parameters are:
214// * allocation: the allocation to stream from.
215// * follow: Whether the logs should be followed.
216// * task: the tasks name to stream logs for.
217// * logType: Either "stdout" or "stderr"
218// * origin: Either "start" or "end" and defines from where the offset is applied.
219// * offset: The offset to start streaming data at.
220// * cancel: A channel that when closed, streaming will end.
221//
222// The return value is a channel that will emit StreamFrames as they are read.
223// The chan will be closed when follow=false and the end of the file is
224// reached.
225//
226// Unexpected (non-EOF) errors will be sent on the error chan.
227func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
228	offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
229
230	errCh := make(chan error, 1)
231
232	reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
233	r, err := queryClientNode(a.client, alloc, reqPath, q,
234		func(q *QueryOptions) {
235			q.Params["follow"] = strconv.FormatBool(follow)
236			q.Params["task"] = task
237			q.Params["type"] = logType
238			q.Params["origin"] = origin
239			q.Params["offset"] = strconv.FormatInt(offset, 10)
240		})
241	if err != nil {
242		errCh <- err
243		return nil, errCh
244	}
245
246	// Create the output channel
247	frames := make(chan *StreamFrame, 10)
248
249	go func() {
250		// Close the body
251		defer r.Close()
252
253		// Create a decoder
254		dec := json.NewDecoder(r)
255
256		for {
257			// Check if we have been cancelled
258			select {
259			case <-cancel:
260				return
261			default:
262			}
263
264			// Decode the next frame
265			var frame StreamFrame
266			if err := dec.Decode(&frame); err != nil {
267				if err == io.EOF || err == io.ErrClosedPipe {
268					close(frames)
269				} else {
270					errCh <- err
271				}
272				return
273			}
274
275			// Discard heartbeat frames
276			if frame.IsHeartbeat() {
277				continue
278			}
279
280			frames <- &frame
281		}
282	}()
283
284	return frames, errCh
285}
286
287// FrameReader is used to convert a stream of frames into a read closer.
288type FrameReader struct {
289	frames   <-chan *StreamFrame
290	errCh    <-chan error
291	cancelCh chan struct{}
292
293	closedLock sync.Mutex
294	closed     bool
295
296	unblockTime time.Duration
297
298	frame       *StreamFrame
299	frameOffset int
300
301	byteOffset int
302}
303
304// NewFrameReader takes a channel of frames and returns a FrameReader which
305// implements io.ReadCloser
306func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader {
307	return &FrameReader{
308		frames:   frames,
309		errCh:    errCh,
310		cancelCh: cancelCh,
311	}
312}
313
314// SetUnblockTime sets the time to unblock and return zero bytes read. If the
315// duration is unset or is zero or less, the read will block until data is read.
316func (f *FrameReader) SetUnblockTime(d time.Duration) {
317	f.unblockTime = d
318}
319
320// Offset returns the offset into the stream.
321func (f *FrameReader) Offset() int {
322	return f.byteOffset
323}
324
325// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
326// when there are no more frames.
327func (f *FrameReader) Read(p []byte) (n int, err error) {
328	f.closedLock.Lock()
329	closed := f.closed
330	f.closedLock.Unlock()
331	if closed {
332		return 0, io.EOF
333	}
334
335	if f.frame == nil {
336		var unblock <-chan time.Time
337		if f.unblockTime.Nanoseconds() > 0 {
338			unblock = time.After(f.unblockTime)
339		}
340
341		select {
342		case frame, ok := <-f.frames:
343			if !ok {
344				return 0, io.EOF
345			}
346			f.frame = frame
347
348			// Store the total offset into the file
349			f.byteOffset = int(f.frame.Offset)
350		case <-unblock:
351			return 0, nil
352		case err := <-f.errCh:
353			return 0, err
354		case <-f.cancelCh:
355			return 0, io.EOF
356		}
357	}
358
359	// Copy the data out of the frame and update our offset
360	n = copy(p, f.frame.Data[f.frameOffset:])
361	f.frameOffset += n
362
363	// Clear the frame and its offset once we have read everything
364	if len(f.frame.Data) == f.frameOffset {
365		f.frame = nil
366		f.frameOffset = 0
367	}
368
369	return n, nil
370}
371
372// Close cancels the stream of frames
373func (f *FrameReader) Close() error {
374	f.closedLock.Lock()
375	defer f.closedLock.Unlock()
376	if f.closed {
377		return nil
378	}
379
380	close(f.cancelCh)
381	f.closed = true
382	return nil
383}
384