1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"hash/crc32"
22	"io"
23	"io/ioutil"
24	"net/http"
25	"net/url"
26	"reflect"
27	"strconv"
28	"strings"
29	"time"
30
31	"cloud.google.com/go/internal/trace"
32	"google.golang.org/api/googleapi"
33)
34
35var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
36
37// ReaderObjectAttrs are attributes about the object being read. These are populated
38// during the New call. This struct only holds a subset of object attributes: to
39// get the full set of attributes, use ObjectHandle.Attrs.
40//
41// Each field is read-only.
42type ReaderObjectAttrs struct {
43	// Size is the length of the object's content.
44	Size int64
45
46	// StartOffset is the byte offset within the object
47	// from which reading begins.
48	// This value is only non-zero for range requests.
49	StartOffset int64
50
51	// ContentType is the MIME type of the object's content.
52	ContentType string
53
54	// ContentEncoding is the encoding of the object's content.
55	ContentEncoding string
56
57	// CacheControl specifies whether and for how long browser and Internet
58	// caches are allowed to cache your objects.
59	CacheControl string
60
61	// LastModified is the time that the object was last modified.
62	LastModified time.Time
63
64	// Generation is the generation number of the object's content.
65	Generation int64
66
67	// Metageneration is the version of the metadata for this object at
68	// this generation. This field is used for preconditions and for
69	// detecting changes in metadata. A metageneration number is only
70	// meaningful in the context of a particular generation of a
71	// particular object.
72	Metageneration int64
73}
74
75// NewReader creates a new Reader to read the contents of the
76// object.
77// ErrObjectNotExist will be returned if the object is not found.
78//
79// The caller must call Close on the returned Reader when done reading.
80func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
81	return o.NewRangeReader(ctx, 0, -1)
82}
83
84// NewRangeReader reads part of an object, reading at most length bytes
85// starting at the given offset. If length is negative, the object is read
86// until the end. If offset is negative, the object is read abs(offset) bytes
87// from the end, and length must also be negative to indicate all remaining
88// bytes will be read.
89func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) {
90	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
91	defer func() { trace.EndSpan(ctx, err) }()
92
93	if err := o.validate(); err != nil {
94		return nil, err
95	}
96	if offset < 0 && length >= 0 {
97		return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", offset)
98	}
99	if o.conds != nil {
100		if err := o.conds.validate("NewRangeReader"); err != nil {
101			return nil, err
102		}
103	}
104	u := &url.URL{
105		Scheme: o.c.scheme,
106		Host:   o.c.readHost,
107		Path:   fmt.Sprintf("/%s/%s", o.bucket, o.object),
108	}
109	verb := "GET"
110	if length == 0 {
111		verb = "HEAD"
112	}
113	req, err := http.NewRequest(verb, u.String(), nil)
114	if err != nil {
115		return nil, err
116	}
117	req = req.WithContext(ctx)
118	if o.userProject != "" {
119		req.Header.Set("X-Goog-User-Project", o.userProject)
120	}
121	if o.readCompressed {
122		req.Header.Set("Accept-Encoding", "gzip")
123	}
124	if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
125		return nil, err
126	}
127
128	gen := o.gen
129
130	// Define a function that initiates a Read with offset and length, assuming we
131	// have already read seen bytes.
132	reopen := func(seen int64) (*http.Response, error) {
133		start := offset + seen
134		if length < 0 && start < 0 {
135			req.Header.Set("Range", fmt.Sprintf("bytes=%d", start))
136		} else if length < 0 && start > 0 {
137			req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
138		} else if length > 0 {
139			// The end character isn't affected by how many bytes we've seen.
140			req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
141		}
142		// We wait to assign conditions here because the generation number can change in between reopen() runs.
143		req.URL.RawQuery = conditionsQuery(gen, o.conds)
144		var res *http.Response
145		err = runWithRetry(ctx, func() error {
146			res, err = o.c.hc.Do(req)
147			if err != nil {
148				return err
149			}
150			if res.StatusCode == http.StatusNotFound {
151				res.Body.Close()
152				return ErrObjectNotExist
153			}
154			if res.StatusCode < 200 || res.StatusCode > 299 {
155				body, _ := ioutil.ReadAll(res.Body)
156				res.Body.Close()
157				return &googleapi.Error{
158					Code:   res.StatusCode,
159					Header: res.Header,
160					Body:   string(body),
161				}
162			}
163			if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent {
164				res.Body.Close()
165				return errors.New("storage: partial request not satisfied")
166			}
167			// If a generation hasn't been specified, and this is the first response we get, let's record the
168			// generation. In future requests we'll use this generation as a precondition to avoid data races.
169			if gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
170				gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
171				if err != nil {
172					return err
173				}
174				gen = gen64
175			}
176			return nil
177		})
178		if err != nil {
179			return nil, err
180		}
181		return res, nil
182	}
183
184	res, err := reopen(0)
185	if err != nil {
186		return nil, err
187	}
188	var (
189		size        int64 // total size of object, even if a range was requested.
190		checkCRC    bool
191		crc         uint32
192		startOffset int64 // non-zero if range request.
193	)
194	if res.StatusCode == http.StatusPartialContent {
195		cr := strings.TrimSpace(res.Header.Get("Content-Range"))
196		if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
197			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
198		}
199		size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
200		if err != nil {
201			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
202		}
203
204		dashIndex := strings.Index(cr, "-")
205		if dashIndex >= 0 {
206			startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64)
207			if err != nil {
208				return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err)
209			}
210		}
211	} else {
212		size = res.ContentLength
213		// Check the CRC iff all of the following hold:
214		// - We asked for content (length != 0).
215		// - We got all the content (status != PartialContent).
216		// - The server sent a CRC header.
217		// - The Go http stack did not uncompress the file.
218		// - We were not served compressed data that was uncompressed on download.
219		// The problem with the last two cases is that the CRC will not match -- GCS
220		// computes it on the compressed contents, but we compute it on the
221		// uncompressed contents.
222		if length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
223			crc, checkCRC = parseCRC32c(res)
224		}
225	}
226
227	remain := res.ContentLength
228	body := res.Body
229	if length == 0 {
230		remain = 0
231		body.Close()
232		body = emptyBody
233	}
234	var metaGen int64
235	if res.Header.Get("X-Goog-Metageneration") != "" {
236		metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
237		if err != nil {
238			return nil, err
239		}
240	}
241
242	var lm time.Time
243	if res.Header.Get("Last-Modified") != "" {
244		lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
245		if err != nil {
246			return nil, err
247		}
248	}
249
250	attrs := ReaderObjectAttrs{
251		Size:            size,
252		ContentType:     res.Header.Get("Content-Type"),
253		ContentEncoding: res.Header.Get("Content-Encoding"),
254		CacheControl:    res.Header.Get("Cache-Control"),
255		LastModified:    lm,
256		StartOffset:     startOffset,
257		Generation:      gen,
258		Metageneration:  metaGen,
259	}
260	return &Reader{
261		Attrs:    attrs,
262		body:     body,
263		size:     size,
264		remain:   remain,
265		wantCRC:  crc,
266		checkCRC: checkCRC,
267		reopen:   reopen,
268	}, nil
269}
270
271func uncompressedByServer(res *http.Response) bool {
272	// If the data is stored as gzip but is not encoded as gzip, then it
273	// was uncompressed by the server.
274	return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" &&
275		res.Header.Get("Content-Encoding") != "gzip"
276}
277
278func parseCRC32c(res *http.Response) (uint32, bool) {
279	const prefix = "crc32c="
280	for _, spec := range res.Header["X-Goog-Hash"] {
281		if strings.HasPrefix(spec, prefix) {
282			c, err := decodeUint32(spec[len(prefix):])
283			if err == nil {
284				return c, true
285			}
286		}
287	}
288	return 0, false
289}
290
291var emptyBody = ioutil.NopCloser(strings.NewReader(""))
292
293// Reader reads a Cloud Storage object.
294// It implements io.Reader.
295//
296// Typically, a Reader computes the CRC of the downloaded content and compares it to
297// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
298// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
299type Reader struct {
300	Attrs              ReaderObjectAttrs
301	body               io.ReadCloser
302	seen, remain, size int64
303	checkCRC           bool   // should we check the CRC?
304	wantCRC            uint32 // the CRC32c value the server sent in the header
305	gotCRC             uint32 // running crc
306	reopen             func(seen int64) (*http.Response, error)
307}
308
309// Close closes the Reader. It must be called when done reading.
310func (r *Reader) Close() error {
311	return r.body.Close()
312}
313
314func (r *Reader) Read(p []byte) (int, error) {
315	n, err := r.readWithRetry(p)
316	if r.remain != -1 {
317		r.remain -= int64(n)
318	}
319	if r.checkCRC {
320		r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
321		// Check CRC here. It would be natural to check it in Close, but
322		// everybody defers Close on the assumption that it doesn't return
323		// anything worth looking at.
324		if err == io.EOF {
325			if r.gotCRC != r.wantCRC {
326				return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
327					r.gotCRC, r.wantCRC)
328			}
329		}
330	}
331	return n, err
332}
333
334func (r *Reader) readWithRetry(p []byte) (int, error) {
335	n := 0
336	for len(p[n:]) > 0 {
337		m, err := r.body.Read(p[n:])
338		n += m
339		r.seen += int64(m)
340		if !shouldRetryRead(err) {
341			return n, err
342		}
343		// Read failed, but we will try again. Send a ranged read request that takes
344		// into account the number of bytes we've already seen.
345		res, err := r.reopen(r.seen)
346		if err != nil {
347			// reopen already retries
348			return n, err
349		}
350		r.body.Close()
351		r.body = res.Body
352	}
353	return n, nil
354}
355
356func shouldRetryRead(err error) bool {
357	if err == nil {
358		return false
359	}
360	return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2")
361}
362
363// Size returns the size of the object in bytes.
364// The returned value is always the same and is not affected by
365// calls to Read or Close.
366//
367// Deprecated: use Reader.Attrs.Size.
368func (r *Reader) Size() int64 {
369	return r.Attrs.Size
370}
371
372// Remain returns the number of bytes left to read, or -1 if unknown.
373func (r *Reader) Remain() int64 {
374	return r.remain
375}
376
377// ContentType returns the content type of the object.
378//
379// Deprecated: use Reader.Attrs.ContentType.
380func (r *Reader) ContentType() string {
381	return r.Attrs.ContentType
382}
383
384// ContentEncoding returns the content encoding of the object.
385//
386// Deprecated: use Reader.Attrs.ContentEncoding.
387func (r *Reader) ContentEncoding() string {
388	return r.Attrs.ContentEncoding
389}
390
391// CacheControl returns the cache control of the object.
392//
393// Deprecated: use Reader.Attrs.CacheControl.
394func (r *Reader) CacheControl() string {
395	return r.Attrs.CacheControl
396}
397
398// LastModified returns the value of the Last-Modified header.
399//
400// Deprecated: use Reader.Attrs.LastModified.
401func (r *Reader) LastModified() (time.Time, error) {
402	return r.Attrs.LastModified, nil
403}
404