1package api 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io" 7 "net" 8 "strconv" 9 "sync" 10 "time" 11) 12 13const ( 14 // OriginStart and OriginEnd are the available parameters for the origin 15 // argument when streaming a file. They respectively offset from the start 16 // and end of a file. 17 OriginStart = "start" 18 OriginEnd = "end" 19) 20 21// AllocFileInfo holds information about a file inside the AllocDir 22type AllocFileInfo struct { 23 Name string 24 IsDir bool 25 Size int64 26 FileMode string 27 ModTime time.Time 28} 29 30// StreamFrame is used to frame data of a file when streaming 31type StreamFrame struct { 32 Offset int64 `json:",omitempty"` 33 Data []byte `json:",omitempty"` 34 File string `json:",omitempty"` 35 FileEvent string `json:",omitempty"` 36} 37 38// IsHeartbeat returns if the frame is a heartbeat frame 39func (s *StreamFrame) IsHeartbeat() bool { 40 return len(s.Data) == 0 && s.FileEvent == "" && s.File == "" && s.Offset == 0 41} 42 43// AllocFS is used to introspect an allocation directory on a Nomad client 44type AllocFS struct { 45 client *Client 46} 47 48// AllocFS returns an handle to the AllocFS endpoints 49func (c *Client) AllocFS() *AllocFS { 50 return &AllocFS{client: c} 51} 52 53// List is used to list the files at a given path of an allocation directory 54func (a *AllocFS) List(alloc *Allocation, path string, q *QueryOptions) ([]*AllocFileInfo, *QueryMeta, error) { 55 if q == nil { 56 q = &QueryOptions{} 57 } 58 if q.Params == nil { 59 q.Params = make(map[string]string) 60 } 61 q.Params["path"] = path 62 63 var resp []*AllocFileInfo 64 qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/ls/%s", alloc.ID), &resp, q) 65 if err != nil { 66 return nil, nil, err 67 } 68 69 return resp, qm, nil 70} 71 72// Stat is used to stat a file at a given path of an allocation directory 73func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocFileInfo, *QueryMeta, error) { 74 if q == nil { 75 q = &QueryOptions{} 76 } 77 if q.Params == nil { 78 q.Params = make(map[string]string) 79 } 80 81 q.Params["path"] = path 82 83 var resp AllocFileInfo 84 qm, err := a.client.query(fmt.Sprintf("/v1/client/fs/stat/%s", alloc.ID), &resp, q) 85 if err != nil { 86 return nil, nil, err 87 } 88 return &resp, qm, nil 89} 90 91// ReadAt is used to read bytes at a given offset until limit at the given path 92// in an allocation directory. If limit is <= 0, there is no limit. 93func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) { 94 nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) 95 if err != nil { 96 return nil, err 97 } 98 99 if q == nil { 100 q = &QueryOptions{} 101 } 102 if q.Params == nil { 103 q.Params = make(map[string]string) 104 } 105 106 q.Params["path"] = path 107 q.Params["offset"] = strconv.FormatInt(offset, 10) 108 q.Params["limit"] = strconv.FormatInt(limit, 10) 109 110 reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID) 111 r, err := nodeClient.rawQuery(reqPath, q) 112 if err != nil { 113 // There was a networking error when talking directly to the client. 114 if _, ok := err.(net.Error); !ok { 115 return nil, err 116 } 117 118 // Try via the server 119 r, err = a.client.rawQuery(reqPath, q) 120 if err != nil { 121 return nil, err 122 } 123 } 124 125 return r, nil 126} 127 128// Cat is used to read contents of a file at the given path in an allocation 129// directory 130func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) { 131 nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) 132 if err != nil { 133 return nil, err 134 } 135 136 if q == nil { 137 q = &QueryOptions{} 138 } 139 if q.Params == nil { 140 q.Params = make(map[string]string) 141 } 142 143 q.Params["path"] = path 144 reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID) 145 r, err := nodeClient.rawQuery(reqPath, q) 146 if err != nil { 147 // There was a networking error when talking directly to the client. 148 if _, ok := err.(net.Error); !ok { 149 return nil, err 150 } 151 152 // Try via the server 153 r, err = a.client.rawQuery(reqPath, q) 154 if err != nil { 155 return nil, err 156 } 157 } 158 159 return r, nil 160} 161 162// Stream streams the content of a file blocking on EOF. 163// The parameters are: 164// * path: path to file to stream. 165// * offset: The offset to start streaming data at. 166// * origin: Either "start" or "end" and defines from where the offset is applied. 167// * cancel: A channel that when closed, streaming will end. 168// 169// The return value is a channel that will emit StreamFrames as they are read. 170func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64, 171 cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 172 173 errCh := make(chan error, 1) 174 nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) 175 if err != nil { 176 errCh <- err 177 return nil, errCh 178 } 179 180 if q == nil { 181 q = &QueryOptions{} 182 } 183 if q.Params == nil { 184 q.Params = make(map[string]string) 185 } 186 187 q.Params["path"] = path 188 q.Params["offset"] = strconv.FormatInt(offset, 10) 189 q.Params["origin"] = origin 190 191 reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID) 192 r, err := nodeClient.rawQuery(reqPath, q) 193 if err != nil { 194 // There was a networking error when talking directly to the client. 195 if _, ok := err.(net.Error); !ok { 196 errCh <- err 197 return nil, errCh 198 } 199 200 // Try via the server 201 r, err = a.client.rawQuery(reqPath, q) 202 if err != nil { 203 errCh <- err 204 return nil, errCh 205 } 206 } 207 208 // Create the output channel 209 frames := make(chan *StreamFrame, 10) 210 211 go func() { 212 // Close the body 213 defer r.Close() 214 215 // Create a decoder 216 dec := json.NewDecoder(r) 217 218 for { 219 // Check if we have been cancelled 220 select { 221 case <-cancel: 222 return 223 default: 224 } 225 226 // Decode the next frame 227 var frame StreamFrame 228 if err := dec.Decode(&frame); err != nil { 229 errCh <- err 230 close(frames) 231 return 232 } 233 234 // Discard heartbeat frames 235 if frame.IsHeartbeat() { 236 continue 237 } 238 239 frames <- &frame 240 } 241 }() 242 243 return frames, errCh 244} 245 246// Logs streams the content of a tasks logs blocking on EOF. 247// The parameters are: 248// * allocation: the allocation to stream from. 249// * follow: Whether the logs should be followed. 250// * task: the tasks name to stream logs for. 251// * logType: Either "stdout" or "stderr" 252// * origin: Either "start" or "end" and defines from where the offset is applied. 253// * offset: The offset to start streaming data at. 254// * cancel: A channel that when closed, streaming will end. 255// 256// The return value is a channel that will emit StreamFrames as they are read. 257// The chan will be closed when follow=false and the end of the file is 258// reached. 259// 260// Unexpected (non-EOF) errors will be sent on the error chan. 261func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin string, 262 offset int64, cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 263 264 errCh := make(chan error, 1) 265 266 nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q) 267 if err != nil { 268 errCh <- err 269 return nil, errCh 270 } 271 272 if q == nil { 273 q = &QueryOptions{} 274 } 275 if q.Params == nil { 276 q.Params = make(map[string]string) 277 } 278 279 q.Params["follow"] = strconv.FormatBool(follow) 280 q.Params["task"] = task 281 q.Params["type"] = logType 282 q.Params["origin"] = origin 283 q.Params["offset"] = strconv.FormatInt(offset, 10) 284 285 reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID) 286 r, err := nodeClient.rawQuery(reqPath, q) 287 if err != nil { 288 // There was a networking error when talking directly to the client. 289 if _, ok := err.(net.Error); !ok { 290 errCh <- err 291 return nil, errCh 292 } 293 294 // Try via the server 295 r, err = a.client.rawQuery(reqPath, q) 296 if err != nil { 297 errCh <- err 298 return nil, errCh 299 } 300 } 301 302 // Create the output channel 303 frames := make(chan *StreamFrame, 10) 304 305 go func() { 306 // Close the body 307 defer r.Close() 308 309 // Create a decoder 310 dec := json.NewDecoder(r) 311 312 for { 313 // Check if we have been cancelled 314 select { 315 case <-cancel: 316 return 317 default: 318 } 319 320 // Decode the next frame 321 var frame StreamFrame 322 if err := dec.Decode(&frame); err != nil { 323 if err == io.EOF || err == io.ErrClosedPipe { 324 close(frames) 325 } else { 326 errCh <- err 327 } 328 return 329 } 330 331 // Discard heartbeat frames 332 if frame.IsHeartbeat() { 333 continue 334 } 335 336 frames <- &frame 337 } 338 }() 339 340 return frames, errCh 341} 342 343// FrameReader is used to convert a stream of frames into a read closer. 344type FrameReader struct { 345 frames <-chan *StreamFrame 346 errCh <-chan error 347 cancelCh chan struct{} 348 349 closedLock sync.Mutex 350 closed bool 351 352 unblockTime time.Duration 353 354 frame *StreamFrame 355 frameOffset int 356 357 byteOffset int 358} 359 360// NewFrameReader takes a channel of frames and returns a FrameReader which 361// implements io.ReadCloser 362func NewFrameReader(frames <-chan *StreamFrame, errCh <-chan error, cancelCh chan struct{}) *FrameReader { 363 return &FrameReader{ 364 frames: frames, 365 errCh: errCh, 366 cancelCh: cancelCh, 367 } 368} 369 370// SetUnblockTime sets the time to unblock and return zero bytes read. If the 371// duration is unset or is zero or less, the read will block until data is read. 372func (f *FrameReader) SetUnblockTime(d time.Duration) { 373 f.unblockTime = d 374} 375 376// Offset returns the offset into the stream. 377func (f *FrameReader) Offset() int { 378 return f.byteOffset 379} 380 381// Read reads the data of the incoming frames into the bytes buffer. Returns EOF 382// when there are no more frames. 383func (f *FrameReader) Read(p []byte) (n int, err error) { 384 f.closedLock.Lock() 385 closed := f.closed 386 f.closedLock.Unlock() 387 if closed { 388 return 0, io.EOF 389 } 390 391 if f.frame == nil { 392 var unblock <-chan time.Time 393 if f.unblockTime.Nanoseconds() > 0 { 394 unblock = time.After(f.unblockTime) 395 } 396 397 select { 398 case frame, ok := <-f.frames: 399 if !ok { 400 return 0, io.EOF 401 } 402 f.frame = frame 403 404 // Store the total offset into the file 405 f.byteOffset = int(f.frame.Offset) 406 case <-unblock: 407 return 0, nil 408 case err := <-f.errCh: 409 return 0, err 410 case <-f.cancelCh: 411 return 0, io.EOF 412 } 413 } 414 415 // Copy the data out of the frame and update our offset 416 n = copy(p, f.frame.Data[f.frameOffset:]) 417 f.frameOffset += n 418 419 // Clear the frame and its offset once we have read everything 420 if len(f.frame.Data) == f.frameOffset { 421 f.frame = nil 422 f.frameOffset = 0 423 } 424 425 return n, nil 426} 427 428// Close cancels the stream of frames 429func (f *FrameReader) Close() error { 430 f.closedLock.Lock() 431 defer f.closedLock.Unlock() 432 if f.closed { 433 return nil 434 } 435 436 close(f.cancelCh) 437 f.closed = true 438 return nil 439} 440