1/* 2Copyright (c) 2016-2017 VMware, Inc. All Rights Reserved. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package object 18 19import ( 20 "bytes" 21 "context" 22 "errors" 23 "fmt" 24 "io" 25 "net/http" 26 "os" 27 "path" 28 "sync" 29 "time" 30 31 "github.com/vmware/govmomi/vim25/soap" 32) 33 34// DatastoreFile implements io.Reader, io.Seeker and io.Closer interfaces for datastore file access. 35type DatastoreFile struct { 36 d Datastore 37 ctx context.Context 38 name string 39 40 buf io.Reader 41 body io.ReadCloser 42 length int64 43 offset struct { 44 read, seek int64 45 } 46} 47 48// Open opens the named file relative to the Datastore. 49func (d Datastore) Open(ctx context.Context, name string) (*DatastoreFile, error) { 50 return &DatastoreFile{ 51 d: d, 52 name: name, 53 length: -1, 54 ctx: ctx, 55 }, nil 56} 57 58// Read reads up to len(b) bytes from the DatastoreFile. 59func (f *DatastoreFile) Read(b []byte) (int, error) { 60 if f.offset.read != f.offset.seek { 61 // A Seek() call changed the offset, we need to issue a new GET 62 _ = f.Close() 63 64 f.offset.read = f.offset.seek 65 } else if f.buf != nil { 66 // f.buf + f behaves like an io.MultiReader 67 n, err := f.buf.Read(b) 68 if err == io.EOF { 69 f.buf = nil // buffer has been drained 70 } 71 if n > 0 { 72 return n, nil 73 } 74 } 75 76 body, err := f.get() 77 if err != nil { 78 return 0, err 79 } 80 81 n, err := body.Read(b) 82 83 f.offset.read += int64(n) 84 f.offset.seek += int64(n) 85 86 return n, err 87} 88 89// Close closes the DatastoreFile. 90func (f *DatastoreFile) Close() error { 91 var err error 92 93 if f.body != nil { 94 err = f.body.Close() 95 f.body = nil 96 } 97 98 f.buf = nil 99 100 return err 101} 102 103// Seek sets the offset for the next Read on the DatastoreFile. 104func (f *DatastoreFile) Seek(offset int64, whence int) (int64, error) { 105 switch whence { 106 case io.SeekStart: 107 case io.SeekCurrent: 108 offset += f.offset.seek 109 case io.SeekEnd: 110 if f.length < 0 { 111 _, err := f.Stat() 112 if err != nil { 113 return 0, err 114 } 115 } 116 offset += f.length 117 default: 118 return 0, errors.New("Seek: invalid whence") 119 } 120 121 // allow negative SeekStart for initial Range request 122 if offset < 0 { 123 return 0, errors.New("Seek: invalid offset") 124 } 125 126 f.offset.seek = offset 127 128 return offset, nil 129} 130 131type fileStat struct { 132 file *DatastoreFile 133 header http.Header 134} 135 136func (s *fileStat) Name() string { 137 return path.Base(s.file.name) 138} 139 140func (s *fileStat) Size() int64 { 141 return s.file.length 142} 143 144func (s *fileStat) Mode() os.FileMode { 145 return 0 146} 147 148func (s *fileStat) ModTime() time.Time { 149 return time.Now() // no Last-Modified 150} 151 152func (s *fileStat) IsDir() bool { 153 return false 154} 155 156func (s *fileStat) Sys() interface{} { 157 return s.header 158} 159 160func statusError(res *http.Response) error { 161 if res.StatusCode == http.StatusNotFound { 162 return os.ErrNotExist 163 } 164 return errors.New(res.Status) 165} 166 167// Stat returns the os.FileInfo interface describing file. 168func (f *DatastoreFile) Stat() (os.FileInfo, error) { 169 // TODO: consider using Datastore.Stat() instead 170 u, p, err := f.d.downloadTicket(f.ctx, f.name, &soap.Download{Method: "HEAD"}) 171 if err != nil { 172 return nil, err 173 } 174 175 res, err := f.d.Client().DownloadRequest(f.ctx, u, p) 176 if err != nil { 177 return nil, err 178 } 179 180 if res.StatusCode != http.StatusOK { 181 return nil, statusError(res) 182 } 183 184 f.length = res.ContentLength 185 186 return &fileStat{f, res.Header}, nil 187} 188 189func (f *DatastoreFile) get() (io.Reader, error) { 190 if f.body != nil { 191 return f.body, nil 192 } 193 194 u, p, err := f.d.downloadTicket(f.ctx, f.name, nil) 195 if err != nil { 196 return nil, err 197 } 198 199 if f.offset.read != 0 { 200 p.Headers = map[string]string{ 201 "Range": fmt.Sprintf("bytes=%d-", f.offset.read), 202 } 203 } 204 205 res, err := f.d.Client().DownloadRequest(f.ctx, u, p) 206 if err != nil { 207 return nil, err 208 } 209 210 switch res.StatusCode { 211 case http.StatusOK: 212 f.length = res.ContentLength 213 case http.StatusPartialContent: 214 var start, end int 215 cr := res.Header.Get("Content-Range") 216 _, err = fmt.Sscanf(cr, "bytes %d-%d/%d", &start, &end, &f.length) 217 if err != nil { 218 f.length = -1 219 } 220 case http.StatusRequestedRangeNotSatisfiable: 221 // ok: Read() will return io.EOF 222 default: 223 return nil, statusError(res) 224 } 225 226 if f.length < 0 { 227 _ = res.Body.Close() 228 return nil, errors.New("unable to determine file size") 229 } 230 231 f.body = res.Body 232 233 return f.body, nil 234} 235 236func lastIndexLines(s []byte, line *int, include func(l int, m string) bool) (int64, bool) { 237 i := len(s) - 1 238 done := false 239 240 for i > 0 { 241 o := bytes.LastIndexByte(s[:i], '\n') 242 if o < 0 { 243 break 244 } 245 246 msg := string(s[o+1 : i+1]) 247 if !include(*line, msg) { 248 done = true 249 break 250 } else { 251 i = o 252 *line++ 253 } 254 } 255 256 return int64(i), done 257} 258 259// Tail seeks to the position of the last N lines of the file. 260func (f *DatastoreFile) Tail(n int) error { 261 return f.TailFunc(n, func(line int, _ string) bool { return n > line }) 262} 263 264// TailFunc will seek backwards in the datastore file until it hits a line that does 265// not satisfy the supplied `include` function. 266func (f *DatastoreFile) TailFunc(lines int, include func(line int, message string) bool) error { 267 // Read the file in reverse using bsize chunks 268 const bsize = int64(1024 * 16) 269 270 fsize, err := f.Seek(0, io.SeekEnd) 271 if err != nil { 272 return err 273 } 274 275 if lines == 0 { 276 return nil 277 } 278 279 chunk := int64(-1) 280 281 buf := bytes.NewBuffer(make([]byte, 0, bsize)) 282 line := 0 283 284 for { 285 var eof bool 286 var pos int64 287 288 nread := bsize 289 290 offset := chunk * bsize 291 remain := fsize + offset 292 293 if remain < 0 { 294 if pos, err = f.Seek(0, io.SeekStart); err != nil { 295 return err 296 } 297 298 nread = bsize + remain 299 eof = true 300 } else { 301 if pos, err = f.Seek(offset, io.SeekEnd); err != nil { 302 return err 303 } 304 } 305 306 if _, err = io.CopyN(buf, f, nread); err != nil { 307 if err != io.EOF { 308 return err 309 } 310 } 311 312 b := buf.Bytes() 313 idx, done := lastIndexLines(b, &line, include) 314 315 if done { 316 if chunk == -1 { 317 // We found all N lines in the last chunk of the file. 318 // The seek offset is also now at the current end of file. 319 // Save this buffer to avoid another GET request when Read() is called. 320 buf.Next(int(idx + 1)) 321 f.buf = buf 322 return nil 323 } 324 325 if _, err = f.Seek(pos+idx+1, io.SeekStart); err != nil { 326 return err 327 } 328 329 break 330 } 331 332 if eof { 333 if remain < 0 { 334 // We found < N lines in the entire file, so seek to the start. 335 _, _ = f.Seek(0, io.SeekStart) 336 } 337 break 338 } 339 340 chunk-- 341 buf.Reset() 342 } 343 344 return nil 345} 346 347type followDatastoreFile struct { 348 r *DatastoreFile 349 c chan struct{} 350 i time.Duration 351 o sync.Once 352} 353 354// Read reads up to len(b) bytes from the DatastoreFile being followed. 355// This method will block until data is read, an error other than io.EOF is returned or Close() is called. 356func (f *followDatastoreFile) Read(p []byte) (int, error) { 357 offset := f.r.offset.seek 358 stop := false 359 360 for { 361 n, err := f.r.Read(p) 362 if err != nil && err == io.EOF { 363 _ = f.r.Close() // GET request body has been drained. 364 if stop { 365 return n, err 366 } 367 err = nil 368 } 369 370 if n > 0 { 371 return n, err 372 } 373 374 select { 375 case <-f.c: 376 // Wake up and stop polling once the body has been drained 377 stop = true 378 case <-time.After(f.i): 379 } 380 381 info, serr := f.r.Stat() 382 if serr != nil { 383 // Return EOF rather than 404 if the file goes away 384 if serr == os.ErrNotExist { 385 _ = f.r.Close() 386 return 0, io.EOF 387 } 388 return 0, serr 389 } 390 391 if info.Size() < offset { 392 // assume file has be truncated 393 offset, err = f.r.Seek(0, io.SeekStart) 394 if err != nil { 395 return 0, err 396 } 397 } 398 } 399} 400 401// Close will stop Follow polling and close the underlying DatastoreFile. 402func (f *followDatastoreFile) Close() error { 403 f.o.Do(func() { close(f.c) }) 404 return nil 405} 406 407// Follow returns an io.ReadCloser to stream the file contents as data is appended. 408func (f *DatastoreFile) Follow(interval time.Duration) io.ReadCloser { 409 return &followDatastoreFile{ 410 r: f, 411 c: make(chan struct{}), 412 i: interval, 413 } 414} 415