1package api
2
3import (
4	"encoding/json"
5	"fmt"
6	"io"
7	"net"
8	"strconv"
9	"sync"
10	"time"
11)
12
13const (
14	// OriginStart and OriginEnd are the available parameters for the origin
15	// argument when streaming a file. They respectively offset from the start
16	// and end of a file.
17	OriginStart = "start"
18	OriginEnd   = "end"
19)
20
21// AllocFileInfo holds information about a file inside the AllocDir
22type AllocFileInfo struct {
23	Name     string
24	IsDir    bool
25	Size     int64
26	FileMode string
27	ModTime  time.Time
28}
29
30// StreamFrame is used to frame data of a file when streaming
31type StreamFrame struct {
32	Offset    int64  `json:",omitempty"`
33	Data      []byte `json:",omitempty"`
34	File      string `json:",omitempty"`
35	FileEvent string `json:",omitempty"`
36}
37
38// IsHeartbeat returns if the frame is a heartbeat frame
39func (s *StreamFrame) IsHeartbeat() bool {
40	return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0
41}
42
43// AllocFS is used to introspect an allocation directory on a Nomad client
44type AllocFS struct {
45	client *Client
46}
47
48// AllocFS returns an handle to the AllocFS endpoints
49func (c *Client) AllocFS() *AllocFS {
50	return &AllocFS{client: c}
51}
52
53// List is used to list the files at a given path of an allocation directory
54func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) {
55	if q == nil {
56		q = &QueryOptions{}
57	}
58	if q.Params == nil {
59		q.Params = make(map[string]string)
60	}
61	q.Params["path"] = path
62
63	var resp []*AllocFileInfo
64	qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q)
65	if err != nil {
66		return nil, nil, err
67	}
68
69	return resp, qm, nil
70}
71
72// Stat is used to stat a file at a given path of an allocation directory
73func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) {
74	if q == nil {
75		q = &QueryOptions{}
76	}
77	if q.Params == nil {
78		q.Params = make(map[string]string)
79	}
80
81	q.Params["path"] = path
82
83	var resp AllocFileInfo
84	qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q)
85	if err != nil {
86		return nil, nil, err
87	}
88	return &resp, qm, nil
89}
90
91// ReadAt is used to read bytes at a given offset until limit at the given path
92// in an allocation directory. If limit is <= 0, there is no limit.
93func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
94	nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
95	if err != nil {
96		return nil, err
97	}
98
99	if q == nil {
100		q = &QueryOptions{}
101	}
102	if q.Params == nil {
103		q.Params = make(map[string]string)
104	}
105
106	q.Params["path"] = path
107	q.Params["offset"] = strconv.FormatInt(offset, 10)
108	q.Params["limit"] = strconv.FormatInt(limit, 10)
109
110	reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
111	r, err := nodeClient.rawQuery(reqPath, q)
112	if err != nil {
113		// There was a networking error when talking directly to the client.
114		if _, ok := err.(net.Error); !ok {
115			return nil, err
116		}
117
118		// Try via the server
119		r, err = a.client.rawQuery(reqPath, q)
120		if err != nil {
121			return nil, err
122		}
123	}
124
125	return r, nil
126}
127
128// Cat is used to read contents of a file at the given path in an allocation
129// directory
130func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
131	nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
132	if err != nil {
133		return nil, err
134	}
135
136	if q == nil {
137		q = &QueryOptions{}
138	}
139	if q.Params == nil {
140		q.Params = make(map[string]string)
141	}
142
143	q.Params["path"] = path
144	reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
145	r, err := nodeClient.rawQuery(reqPath, q)
146	if err != nil {
147		// There was a networking error when talking directly to the client.
148		if _, ok := err.(net.Error); !ok {
149			return nil, err
150		}
151
152		// Try via the server
153		r, err = a.client.rawQuery(reqPath, q)
154		if err != nil {
155			return nil, err
156		}
157	}
158
159	return r, nil
160}
161
162// Stream streams the content of a file blocking on EOF.
163// The parameters are:
164// * path: path to file to stream.
165// * offset: The offset to start streaming data at.
166// * origin: Either "start" or "end" and defines from where the offset is applied.
167// * cancel: A channel that when closed, streaming will end.
168//
169// The return value is a channel that will emit StreamFrames as they are read.
170func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
171	cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
172
173	errCh := make(chan error, 1)
174	nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
175	if err != nil {
176		errCh <- err
177		return nil, errCh
178	}
179
180	if q == nil {
181		q = &QueryOptions{}
182	}
183	if q.Params == nil {
184		q.Params = make(map[string]string)
185	}
186
187	q.Params["path"] = path
188	q.Params["offset"] = strconv.FormatInt(offset, 10)
189	q.Params["origin"] = origin
190
191	reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
192	r, err := nodeClient.rawQuery(reqPath, q)
193	if err != nil {
194		// There was a networking error when talking directly to the client.
195		if _, ok := err.(net.Error); !ok {
196			errCh <- err
197			return nil, errCh
198		}
199
200		// Try via the server
201		r, err = a.client.rawQuery(reqPath, q)
202		if err != nil {
203			errCh <- err
204			return nil, errCh
205		}
206	}
207
208	// Create the output channel
209	frames := make(chan *StreamFrame, 10)
210
211	go func() {
212		// Close the body
213		defer r.Close()
214
215		// Create a decoder
216		dec := json.NewDecoder(r)
217
218		for {
219			// Check if we have been cancelled
220			select {
221			case <-cancel:
222				return
223			default:
224			}
225
226			// Decode the next frame
227			var frame StreamFrame
228			if err := dec.Decode(&frame); err != nil {
229				errCh <- err
230				close(frames)
231				return
232			}
233
234			// Discard heartbeat frames
235			if frame.IsHeartbeat() {
236				continue
237			}
238
239			frames <- &frame
240		}
241	}()
242
243	return frames, errCh
244}
245
246// Logs streams the content of a tasks logs blocking on EOF.
247// The parameters are:
248// * allocation: the allocation to stream from.
249// * follow: Whether the logs should be followed.
250// * task: the tasks name to stream logs for.
251// * logType: Either "stdout" or "stderr"
252// * origin: Either "start" or "end" and defines from where the offset is applied.
253// * offset: The offset to start streaming data at.
254// * cancel: A channel that when closed, streaming will end.
255//
256// The return value is a channel that will emit StreamFrames as they are read.
257// The chan will be closed when follow=false and the end of the file is
258// reached.
259//
260// Unexpected (non-EOF) errors will be sent on the error chan.
261func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string,
262	offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
263
264	errCh := make(chan error, 1)
265
266	nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
267	if err != nil {
268		errCh <- err
269		return nil, errCh
270	}
271
272	if q == nil {
273		q = &QueryOptions{}
274	}
275	if q.Params == nil {
276		q.Params = make(map[string]string)
277	}
278
279	q.Params["follow"] = strconv.FormatBool(follow)
280	q.Params["task"] = task
281	q.Params["type"] = logType
282	q.Params["origin"] = origin
283	q.Params["offset"] = strconv.FormatInt(offset, 10)
284
285	reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
286	r, err := nodeClient.rawQuery(reqPath, q)
287	if err != nil {
288		// There was a networking error when talking directly to the client.
289		if _, ok := err.(net.Error); !ok {
290			errCh <- err
291			return nil, errCh
292		}
293
294		// Try via the server
295		r, err = a.client.rawQuery(reqPath, q)
296		if err != nil {
297			errCh <- err
298			return nil, errCh
299		}
300	}
301
302	// Create the output channel
303	frames := make(chan *StreamFrame, 10)
304
305	go func() {
306		// Close the body
307		defer r.Close()
308
309		// Create a decoder
310		dec := json.NewDecoder(r)
311
312		for {
313			// Check if we have been cancelled
314			select {
315			case <-cancel:
316				return
317			default:
318			}
319
320			// Decode the next frame
321			var frame StreamFrame
322			if err := dec.Decode(&frame); err != nil {
323				if err == io.EOF || err == io.ErrClosedPipe {
324					close(frames)
325				} else {
326					errCh <- err
327				}
328				return
329			}
330
331			// Discard heartbeat frames
332			if frame.IsHeartbeat() {
333				continue
334			}
335
336			frames <- &frame
337		}
338	}()
339
340	return frames, errCh
341}
342
343// FrameReader is used to convert a stream of frames into a read closer.
344type FrameReader struct {
345	frames   <-chan *StreamFrame
346	errCh    <-chan error
347	cancelCh chan struct{}
348
349	closedLock sync.Mutex
350	closed     bool
351
352	unblockTime time.Duration
353
354	frame       *StreamFrame
355	frameOffset int
356
357	byteOffset int
358}
359
360// NewFrameReader takes a channel of frames and returns a FrameReader which
361// implements io.ReadCloser
362func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader {
363	return &FrameReader{
364		frames:   frames,
365		errCh:    errCh,
366		cancelCh: cancelCh,
367	}
368}
369
370// SetUnblockTime sets the time to unblock and return zero bytes read. If the
371// duration is unset or is zero or less, the read will block until data is read.
372func (f *FrameReader) SetUnblockTime(d time.Duration) {
373	f.unblockTime = d
374}
375
376// Offset returns the offset into the stream.
377func (f *FrameReader) Offset() int {
378	return f.byteOffset
379}
380
381// Read reads the data of the incoming frames into the bytes buffer. Returns EOF
382// when there are no more frames.
383func (f *FrameReader) Read(p []byte) (n int, err error) {
384	f.closedLock.Lock()
385	closed := f.closed
386	f.closedLock.Unlock()
387	if closed {
388		return 0, io.EOF
389	}
390
391	if f.frame == nil {
392		var unblock <-chan time.Time
393		if f.unblockTime.Nanoseconds() > 0 {
394			unblock = time.After(f.unblockTime)
395		}
396
397		select {
398		case frame, ok := <-f.frames:
399			if !ok {
400				return 0, io.EOF
401			}
402			f.frame = frame
403
404			// Store the total offset into the file
405			f.byteOffset = int(f.frame.Offset)
406		case <-unblock:
407			return 0, nil
408		case err := <-f.errCh:
409			return 0, err
410		case <-f.cancelCh:
411			return 0, io.EOF
412		}
413	}
414
415	// Copy the data out of the frame and update our offset
416	n = copy(p, f.frame.Data[f.frameOffset:])
417	f.frameOffset += n
418
419	// Clear the frame and its offset once we have read everything
420	if len(f.frame.Data) == f.frameOffset {
421		f.frame = nil
422		f.frameOffset = 0
423	}
424
425	return n, nil
426}
427
428// Close cancels the stream of frames
429func (f *FrameReader) Close() error {
430	f.closedLock.Lock()
431	defer f.closedLock.Unlock()
432	if f.closed {
433		return nil
434	}
435
436	close(f.cancelCh)
437	f.closed = true
438	return nil
439}
440