1package api 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io" 7 "net" 8 "strconv" 9 "sync" 10 "time" 11) 12 13const ( 14 // OriginStart and OriginEnd are the available parameters for the origin 15 // argument when streaming a file. They respectively offset from the start 16 // and end of a file. 17 OriginStart = "start" 18 OriginEnd = "end" 19) 20 21// AllocFileInfo holds information about a file inside the AllocDir 22type AllocFileInfo struct { 23 Name string 24 IsDir bool 25 Size int64 26 FileMode string 27 ModTime time.Time 28 ContentType string 29} 30 31// StreamFrame is used to frame data of a file when streaming 32type StreamFrame struct { 33 Offset int64 `json:",omitempty"` 34 Data []byte `json:",omitempty"` 35 File string `json:",omitempty"` 36 FileEvent string `json:",omitempty"` 37} 38 39// IsHeartbeat returns if the frame is a heartbeat frame 40func (s *StreamFrame) IsHeartbeat() bool { 41 return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0 42} 43 44// AllocFS is used to introspect an allocation directory on a Nomad client 45type AllocFS struct { 46 client *Client 47} 48 49// AllocFS returns an handle to the AllocFS endpoints 50func (c *Client) AllocFS() *AllocFS { 51 return &AllocFS{client: c} 52} 53 54// List is used to list the files at a given path of an allocation directory 55func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) { 56 if q == nil { 57 q = &QueryOptions{} 58 } 59 if q.Params == nil { 60 q.Params = make(map[string]string) 61 } 62 q.Params["path"] = path 63 64 var resp []*AllocFileInfo 65 qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q) 66 if err != nil { 67 return nil, nil, err 68 } 69 70 return resp, qm, nil 71} 72 73// Stat is used to stat a file at a given path of an allocation directory 74func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) { 75 if q == nil { 76 q = &QueryOptions{} 77 } 78 if q.Params == nil { 79 q.Params = make(map[string]string) 80 } 81 82 q.Params["path"] = path 83 84 var resp AllocFileInfo 85 qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q) 86 if err != nil { 87 return nil, nil, err 88 } 89 return &resp, qm, nil 90} 91 92// ReadAt is used to read bytes at a given offset until limit at the given path 93// in an allocation directory. If limit is <= 0, there is no limit. 94func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) { 95 reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID) 96 97 return queryClientNode(a.client, alloc, reqPath, q, 98 func(q *QueryOptions) { 99 q.Params["path"] = path 100 q.Params["offset"] = strconv.FormatInt(offset, 10) 101 q.Params["limit"] = strconv.FormatInt(limit, 10) 102 }) 103} 104 105// Cat is used to read contents of a file at the given path in an allocation 106// directory 107func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) { 108 reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID) 109 return queryClientNode(a.client, alloc, reqPath, q, 110 func(q *QueryOptions) { 111 q.Params["path"] = path 112 }) 113} 114 115// Stream streams the content of a file blocking on EOF. 116// The parameters are: 117// * path: path to file to stream. 118// * offset: The offset to start streaming data at. 119// * origin: Either "start" or "end" and defines from where the offset is applied. 120// * cancel: A channel that when closed, streaming will end. 121// 122// The return value is a channel that will emit StreamFrames as they are read. 123func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, 124 cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 125 126 errCh := make(chan error, 1) 127 128 reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID) 129 r, err := queryClientNode(a.client, alloc, reqPath, q, 130 func(q *QueryOptions) { 131 q.Params["path"] = path 132 q.Params["offset"] = strconv.FormatInt(offset, 10) 133 q.Params["origin"] = origin 134 }) 135 if err != nil { 136 errCh <- err 137 return nil, errCh 138 } 139 140 // Create the output channel 141 frames := make(chan *StreamFrame, 10) 142 143 go func() { 144 // Close the body 145 defer r.Close() 146 147 // Create a decoder 148 dec := json.NewDecoder(r) 149 150 for { 151 // Check if we have been cancelled 152 select { 153 case <-cancel: 154 return 155 default: 156 } 157 158 // Decode the next frame 159 var frame StreamFrame 160 if err := dec.Decode(&frame); err != nil { 161 errCh <- err 162 close(frames) 163 return 164 } 165 166 // Discard heartbeat frames 167 if frame.IsHeartbeat() { 168 continue 169 } 170 171 frames <- &frame 172 } 173 }() 174 175 return frames, errCh 176} 177 178func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) { 179 nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) 180 181 if q == nil { 182 q = &QueryOptions{} 183 } 184 if q.Params == nil { 185 q.Params = make(map[string]string) 186 } 187 if customizeQ != nil { 188 customizeQ(q) 189 } 190 191 var r io.ReadCloser 192 var err error 193 194 if nodeClient != nil { 195 r, err = nodeClient.rawQuery(reqPath, q) 196 if _, ok := err.(net.Error); err != nil && !ok { 197 // found a non networking error talking to client directly 198 return nil, err 199 } 200 201 } 202 203 // failed to query node, access through server directly 204 // or network error when talking to the client directly 205 if r == nil { 206 return c.rawQuery(reqPath, q) 207 } 208 209 return r, err 210} 211 212// Logs streams the content of a tasks logs blocking on EOF. 213// The parameters are: 214// * allocation: the allocation to stream from. 215// * follow: Whether the logs should be followed. 216// * task: the tasks name to stream logs for. 217// * logType: Either "stdout" or "stderr" 218// * origin: Either "start" or "end" and defines from where the offset is applied. 219// * offset: The offset to start streaming data at. 220// * cancel: A channel that when closed, streaming will end. 221// 222// The return value is a channel that will emit StreamFrames as they are read. 223// The chan will be closed when follow=false and the end of the file is 224// reached. 225// 226// Unexpected (non-EOF) errors will be sent on the error chan. 227func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, 228 offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 229 230 errCh := make(chan error, 1) 231 232 reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) 233 r, err := queryClientNode(a.client, alloc, reqPath, q, 234 func(q *QueryOptions) { 235 q.Params["follow"] = strconv.FormatBool(follow) 236 q.Params["task"] = task 237 q.Params["type"] = logType 238 q.Params["origin"] = origin 239 q.Params["offset"] = strconv.FormatInt(offset, 10) 240 }) 241 if err != nil { 242 errCh <- err 243 return nil, errCh 244 } 245 246 // Create the output channel 247 frames := make(chan *StreamFrame, 10) 248 249 go func() { 250 // Close the body 251 defer r.Close() 252 253 // Create a decoder 254 dec := json.NewDecoder(r) 255 256 for { 257 // Check if we have been cancelled 258 select { 259 case <-cancel: 260 return 261 default: 262 } 263 264 // Decode the next frame 265 var frame StreamFrame 266 if err := dec.Decode(&frame); err != nil { 267 if err == io.EOF || err == io.ErrClosedPipe { 268 close(frames) 269 } else { 270 errCh <- err 271 } 272 return 273 } 274 275 // Discard heartbeat frames 276 if frame.IsHeartbeat() { 277 continue 278 } 279 280 frames <- &frame 281 } 282 }() 283 284 return frames, errCh 285} 286 287// FrameReader is used to convert a stream of frames into a read closer. 288type FrameReader struct { 289 frames <-chan *StreamFrame 290 errCh <-chan error 291 cancelCh chan struct{} 292 293 closedLock sync.Mutex 294 closed bool 295 296 unblockTime time.Duration 297 298 frame *StreamFrame 299 frameOffset int 300 301 byteOffset int 302} 303 304// NewFrameReader takes a channel of frames and returns a FrameReader which 305// implements io.ReadCloser 306func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader { 307 return &FrameReader{ 308 frames: frames, 309 errCh: errCh, 310 cancelCh: cancelCh, 311 } 312} 313 314// SetUnblockTime sets the time to unblock and return zero bytes read. If the 315// duration is unset or is zero or less, the read will block until data is read. 316func (f *FrameReader) SetUnblockTime(d time.Duration) { 317 f.unblockTime = d 318} 319 320// Offset returns the offset into the stream. 321func (f *FrameReader) Offset() int { 322 return f.byteOffset 323} 324 325// Read reads the data of the incoming frames into the bytes buffer. Returns EOF 326// when there are no more frames. 327func (f *FrameReader) Read(p []byte) (n int, err error) { 328 f.closedLock.Lock() 329 closed := f.closed 330 f.closedLock.Unlock() 331 if closed { 332 return 0, io.EOF 333 } 334 335 if f.frame == nil { 336 var unblock <-chan time.Time 337 if f.unblockTime.Nanoseconds() > 0 { 338 unblock = time.After(f.unblockTime) 339 } 340 341 select { 342 case frame, ok := <-f.frames: 343 if !ok { 344 return 0, io.EOF 345 } 346 f.frame = frame 347 348 // Store the total offset into the file 349 f.byteOffset = int(f.frame.Offset) 350 case <-unblock: 351 return 0, nil 352 case err := <-f.errCh: 353 return 0, err 354 case <-f.cancelCh: 355 return 0, io.EOF 356 } 357 } 358 359 // Copy the data out of the frame and update our offset 360 n = copy(p, f.frame.Data[f.frameOffset:]) 361 f.frameOffset += n 362 363 // Clear the frame and its offset once we have read everything 364 if len(f.frame.Data) == f.frameOffset { 365 f.frame = nil 366 f.frameOffset = 0 367 } 368 369 return n, nil 370} 371 372// Close cancels the stream of frames 373func (f *FrameReader) Close() error { 374 f.closedLock.Lock() 375 defer f.closedLock.Unlock() 376 if f.closed { 377 return nil 378 } 379 380 close(f.cancelCh) 381 f.closed = true 382 return nil 383} 384