1/*
2Copyright (c) 2016-2017 VMware, Inc. All Rights Reserved.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package object
18
19import (
20	"bytes"
21	"context"
22	"errors"
23	"fmt"
24	"io"
25	"net/http"
26	"os"
27	"path"
28	"sync"
29	"time"
30
31	"github.com/vmware/govmomi/vim25/soap"
32)
33
34// DatastoreFile implements io.Reader, io.Seeker and io.Closer interfaces for datastore file access.
35type DatastoreFile struct {
36	d    Datastore
37	ctx  context.Context
38	name string
39
40	buf    io.Reader
41	body   io.ReadCloser
42	length int64
43	offset struct {
44		read, seek int64
45	}
46}
47
48// Open opens the named file relative to the Datastore.
49func (d Datastore) Open(ctx context.Context, name string) (*DatastoreFile, error) {
50	return &DatastoreFile{
51		d:      d,
52		name:   name,
53		length: -1,
54		ctx:    ctx,
55	}, nil
56}
57
58// Read reads up to len(b) bytes from the DatastoreFile.
59func (f *DatastoreFile) Read(b []byte) (int, error) {
60	if f.offset.read != f.offset.seek {
61		// A Seek() call changed the offset, we need to issue a new GET
62		_ = f.Close()
63
64		f.offset.read = f.offset.seek
65	} else if f.buf != nil {
66		// f.buf + f behaves like an io.MultiReader
67		n, err := f.buf.Read(b)
68		if err == io.EOF {
69			f.buf = nil // buffer has been drained
70		}
71		if n > 0 {
72			return n, nil
73		}
74	}
75
76	body, err := f.get()
77	if err != nil {
78		return 0, err
79	}
80
81	n, err := body.Read(b)
82
83	f.offset.read += int64(n)
84	f.offset.seek += int64(n)
85
86	return n, err
87}
88
89// Close closes the DatastoreFile.
90func (f *DatastoreFile) Close() error {
91	var err error
92
93	if f.body != nil {
94		err = f.body.Close()
95		f.body = nil
96	}
97
98	f.buf = nil
99
100	return err
101}
102
103// Seek sets the offset for the next Read on the DatastoreFile.
104func (f *DatastoreFile) Seek(offset int64, whence int) (int64, error) {
105	switch whence {
106	case io.SeekStart:
107	case io.SeekCurrent:
108		offset += f.offset.seek
109	case io.SeekEnd:
110		if f.length < 0 {
111			_, err := f.Stat()
112			if err != nil {
113				return 0, err
114			}
115		}
116		offset += f.length
117	default:
118		return 0, errors.New("Seek: invalid whence")
119	}
120
121	// allow negative SeekStart for initial Range request
122	if offset < 0 {
123		return 0, errors.New("Seek: invalid offset")
124	}
125
126	f.offset.seek = offset
127
128	return offset, nil
129}
130
131type fileStat struct {
132	file   *DatastoreFile
133	header http.Header
134}
135
136func (s *fileStat) Name() string {
137	return path.Base(s.file.name)
138}
139
140func (s *fileStat) Size() int64 {
141	return s.file.length
142}
143
144func (s *fileStat) Mode() os.FileMode {
145	return 0
146}
147
148func (s *fileStat) ModTime() time.Time {
149	return time.Now() // no Last-Modified
150}
151
152func (s *fileStat) IsDir() bool {
153	return false
154}
155
156func (s *fileStat) Sys() interface{} {
157	return s.header
158}
159
160func statusError(res *http.Response) error {
161	if res.StatusCode == http.StatusNotFound {
162		return os.ErrNotExist
163	}
164	return errors.New(res.Status)
165}
166
167// Stat returns the os.FileInfo interface describing file.
168func (f *DatastoreFile) Stat() (os.FileInfo, error) {
169	// TODO: consider using Datastore.Stat() instead
170	u, p, err := f.d.downloadTicket(f.ctx, f.name, &soap.Download{Method: "HEAD"})
171	if err != nil {
172		return nil, err
173	}
174
175	res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
176	if err != nil {
177		return nil, err
178	}
179
180	if res.StatusCode != http.StatusOK {
181		return nil, statusError(res)
182	}
183
184	f.length = res.ContentLength
185
186	return &fileStat{f, res.Header}, nil
187}
188
189func (f *DatastoreFile) get() (io.Reader, error) {
190	if f.body != nil {
191		return f.body, nil
192	}
193
194	u, p, err := f.d.downloadTicket(f.ctx, f.name, nil)
195	if err != nil {
196		return nil, err
197	}
198
199	if f.offset.read != 0 {
200		p.Headers = map[string]string{
201			"Range": fmt.Sprintf("bytes=%d-", f.offset.read),
202		}
203	}
204
205	res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
206	if err != nil {
207		return nil, err
208	}
209
210	switch res.StatusCode {
211	case http.StatusOK:
212		f.length = res.ContentLength
213	case http.StatusPartialContent:
214		var start, end int
215		cr := res.Header.Get("Content-Range")
216		_, err = fmt.Sscanf(cr, "bytes %d-%d/%d", &start, &end, &f.length)
217		if err != nil {
218			f.length = -1
219		}
220	case http.StatusRequestedRangeNotSatisfiable:
221		// ok: Read() will return io.EOF
222	default:
223		return nil, statusError(res)
224	}
225
226	if f.length < 0 {
227		_ = res.Body.Close()
228		return nil, errors.New("unable to determine file size")
229	}
230
231	f.body = res.Body
232
233	return f.body, nil
234}
235
236func lastIndexLines(s []byte, line *int, include func(l int, m string) bool) (int64, bool) {
237	i := len(s) - 1
238	done := false
239
240	for i > 0 {
241		o := bytes.LastIndexByte(s[:i], '\n')
242		if o < 0 {
243			break
244		}
245
246		msg := string(s[o+1 : i+1])
247		if !include(*line, msg) {
248			done = true
249			break
250		} else {
251			i = o
252			*line++
253		}
254	}
255
256	return int64(i), done
257}
258
259// Tail seeks to the position of the last N lines of the file.
260func (f *DatastoreFile) Tail(n int) error {
261	return f.TailFunc(n, func(line int, _ string) bool { return n > line })
262}
263
264// TailFunc will seek backwards in the datastore file until it hits a line that does
265// not satisfy the supplied `include` function.
266func (f *DatastoreFile) TailFunc(lines int, include func(line int, message string) bool) error {
267	// Read the file in reverse using bsize chunks
268	const bsize = int64(1024 * 16)
269
270	fsize, err := f.Seek(0, io.SeekEnd)
271	if err != nil {
272		return err
273	}
274
275	if lines == 0 {
276		return nil
277	}
278
279	chunk := int64(-1)
280
281	buf := bytes.NewBuffer(make([]byte, 0, bsize))
282	line := 0
283
284	for {
285		var eof bool
286		var pos int64
287
288		nread := bsize
289
290		offset := chunk * bsize
291		remain := fsize + offset
292
293		if remain < 0 {
294			if pos, err = f.Seek(0, io.SeekStart); err != nil {
295				return err
296			}
297
298			nread = bsize + remain
299			eof = true
300		} else {
301			if pos, err = f.Seek(offset, io.SeekEnd); err != nil {
302				return err
303			}
304		}
305
306		if _, err = io.CopyN(buf, f, nread); err != nil {
307			if err != io.EOF {
308				return err
309			}
310		}
311
312		b := buf.Bytes()
313		idx, done := lastIndexLines(b, &line, include)
314
315		if done {
316			if chunk == -1 {
317				// We found all N lines in the last chunk of the file.
318				// The seek offset is also now at the current end of file.
319				// Save this buffer to avoid another GET request when Read() is called.
320				buf.Next(int(idx + 1))
321				f.buf = buf
322				return nil
323			}
324
325			if _, err = f.Seek(pos+idx+1, io.SeekStart); err != nil {
326				return err
327			}
328
329			break
330		}
331
332		if eof {
333			if remain < 0 {
334				// We found < N lines in the entire file, so seek to the start.
335				_, _ = f.Seek(0, io.SeekStart)
336			}
337			break
338		}
339
340		chunk--
341		buf.Reset()
342	}
343
344	return nil
345}
346
347type followDatastoreFile struct {
348	r *DatastoreFile
349	c chan struct{}
350	i time.Duration
351	o sync.Once
352}
353
354// Read reads up to len(b) bytes from the DatastoreFile being followed.
355// This method will block until data is read, an error other than io.EOF is returned or Close() is called.
356func (f *followDatastoreFile) Read(p []byte) (int, error) {
357	offset := f.r.offset.seek
358	stop := false
359
360	for {
361		n, err := f.r.Read(p)
362		if err != nil && err == io.EOF {
363			_ = f.r.Close() // GET request body has been drained.
364			if stop {
365				return n, err
366			}
367			err = nil
368		}
369
370		if n > 0 {
371			return n, err
372		}
373
374		select {
375		case <-f.c:
376			// Wake up and stop polling once the body has been drained
377			stop = true
378		case <-time.After(f.i):
379		}
380
381		info, serr := f.r.Stat()
382		if serr != nil {
383			// Return EOF rather than 404 if the file goes away
384			if serr == os.ErrNotExist {
385				_ = f.r.Close()
386				return 0, io.EOF
387			}
388			return 0, serr
389		}
390
391		if info.Size() < offset {
392			// assume file has be truncated
393			offset, err = f.r.Seek(0, io.SeekStart)
394			if err != nil {
395				return 0, err
396			}
397		}
398	}
399}
400
401// Close will stop Follow polling and close the underlying DatastoreFile.
402func (f *followDatastoreFile) Close() error {
403	f.o.Do(func() { close(f.c) })
404	return nil
405}
406
407// Follow returns an io.ReadCloser to stream the file contents as data is appended.
408func (f *DatastoreFile) Follow(interval time.Duration) io.ReadCloser {
409	return &followDatastoreFile{
410		r: f,
411		c: make(chan struct{}),
412		i: interval,
413	}
414}
415