1// Copyright 2016 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"hash/crc32"
22	"io"
23	"io/ioutil"
24	"net/http"
25	"net/url"
26	"strconv"
27	"strings"
28	"time"
29
30	"cloud.google.com/go/internal/trace"
31	"google.golang.org/api/googleapi"
32	storagepb "google.golang.org/genproto/googleapis/storage/v2"
33)
34
35var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
36
37// ReaderObjectAttrs are attributes about the object being read. These are populated
38// during the New call. This struct only holds a subset of object attributes: to
39// get the full set of attributes, use ObjectHandle.Attrs.
40//
41// Each field is read-only.
42type ReaderObjectAttrs struct {
43	// Size is the length of the object's content.
44	Size int64
45
46	// StartOffset is the byte offset within the object
47	// from which reading begins.
48	// This value is only non-zero for range requests.
49	StartOffset int64
50
51	// ContentType is the MIME type of the object's content.
52	ContentType string
53
54	// ContentEncoding is the encoding of the object's content.
55	ContentEncoding string
56
57	// CacheControl specifies whether and for how long browser and Internet
58	// caches are allowed to cache your objects.
59	CacheControl string
60
61	// LastModified is the time that the object was last modified.
62	LastModified time.Time
63
64	// Generation is the generation number of the object's content.
65	Generation int64
66
67	// Metageneration is the version of the metadata for this object at
68	// this generation. This field is used for preconditions and for
69	// detecting changes in metadata. A metageneration number is only
70	// meaningful in the context of a particular generation of a
71	// particular object.
72	Metageneration int64
73}
74
75// NewReader creates a new Reader to read the contents of the
76// object.
77// ErrObjectNotExist will be returned if the object is not found.
78//
79// The caller must call Close on the returned Reader when done reading.
80func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) {
81	return o.NewRangeReader(ctx, 0, -1)
82}
83
84// NewRangeReader reads part of an object, reading at most length bytes
85// starting at the given offset. If length is negative, the object is read
86// until the end. If offset is negative, the object is read abs(offset) bytes
87// from the end, and length must also be negative to indicate all remaining
88// bytes will be read.
89//
90// If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies
91// decompressive transcoding per https://cloud.google.com/storage/docs/transcoding
92// that file will be served back whole, regardless of the requested range as
93// Google Cloud Storage dictates.
94func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) {
95	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader")
96	defer func() { trace.EndSpan(ctx, err) }()
97
98	if o.c.gc != nil {
99		return o.newRangeReaderWithGRPC(ctx, offset, length)
100	}
101
102	if err := o.validate(); err != nil {
103		return nil, err
104	}
105	if offset < 0 && length >= 0 {
106		return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", offset)
107	}
108	if o.conds != nil {
109		if err := o.conds.validate("NewRangeReader"); err != nil {
110			return nil, err
111		}
112	}
113	u := &url.URL{
114		Scheme: o.c.scheme,
115		Host:   o.c.readHost,
116		Path:   fmt.Sprintf("/%s/%s", o.bucket, o.object),
117	}
118	verb := "GET"
119	if length == 0 {
120		verb = "HEAD"
121	}
122	req, err := http.NewRequest(verb, u.String(), nil)
123	if err != nil {
124		return nil, err
125	}
126	req = req.WithContext(ctx)
127	if o.userProject != "" {
128		req.Header.Set("X-Goog-User-Project", o.userProject)
129	}
130	if o.readCompressed {
131		req.Header.Set("Accept-Encoding", "gzip")
132	}
133	if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil {
134		return nil, err
135	}
136
137	gen := o.gen
138
139	// Define a function that initiates a Read with offset and length, assuming we
140	// have already read seen bytes.
141	reopen := func(seen int64) (*http.Response, error) {
142		// If the context has already expired, return immediately without making a
143		// call.
144		if err := ctx.Err(); err != nil {
145			return nil, err
146		}
147		start := offset + seen
148		if length < 0 && start < 0 {
149			req.Header.Set("Range", fmt.Sprintf("bytes=%d", start))
150		} else if length < 0 && start > 0 {
151			req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start))
152		} else if length > 0 {
153			// The end character isn't affected by how many bytes we've seen.
154			req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1))
155		}
156		// We wait to assign conditions here because the generation number can change in between reopen() runs.
157		if err := setConditionsHeaders(req.Header, o.conds); err != nil {
158			return nil, err
159		}
160		// If an object generation is specified, include generation as query string parameters.
161		if gen >= 0 {
162			req.URL.RawQuery = fmt.Sprintf("generation=%d", gen)
163		}
164
165		var res *http.Response
166		err = runWithRetry(ctx, func() error {
167			res, err = o.c.hc.Do(req)
168			if err != nil {
169				return err
170			}
171			if res.StatusCode == http.StatusNotFound {
172				res.Body.Close()
173				return ErrObjectNotExist
174			}
175			if res.StatusCode < 200 || res.StatusCode > 299 {
176				body, _ := ioutil.ReadAll(res.Body)
177				res.Body.Close()
178				return &googleapi.Error{
179					Code:   res.StatusCode,
180					Header: res.Header,
181					Body:   string(body),
182				}
183			}
184
185			partialContentNotSatisfied :=
186				!decompressiveTranscoding(res) &&
187					start > 0 && length != 0 &&
188					res.StatusCode != http.StatusPartialContent
189
190			if partialContentNotSatisfied {
191				res.Body.Close()
192				return errors.New("storage: partial request not satisfied")
193			}
194
195			// With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves
196			// back the whole file regardless of the range count passed in as per:
197			//      https://cloud.google.com/storage/docs/transcoding#range,
198			// thus we have to manually move the body forward by seen bytes.
199			if decompressiveTranscoding(res) && seen > 0 {
200				_, _ = io.CopyN(ioutil.Discard, res.Body, seen)
201			}
202
203			// If a generation hasn't been specified, and this is the first response we get, let's record the
204			// generation. In future requests we'll use this generation as a precondition to avoid data races.
205			if gen < 0 && res.Header.Get("X-Goog-Generation") != "" {
206				gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64)
207				if err != nil {
208					return err
209				}
210				gen = gen64
211			}
212			return nil
213		})
214		if err != nil {
215			return nil, err
216		}
217		return res, nil
218	}
219
220	res, err := reopen(0)
221	if err != nil {
222		return nil, err
223	}
224	var (
225		size        int64 // total size of object, even if a range was requested.
226		checkCRC    bool
227		crc         uint32
228		startOffset int64 // non-zero if range request.
229	)
230	if res.StatusCode == http.StatusPartialContent {
231		cr := strings.TrimSpace(res.Header.Get("Content-Range"))
232		if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") {
233			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
234		}
235		// Content range is formatted <first byte>-<last byte>/<total size>. We take
236		// the total size.
237		size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64)
238		if err != nil {
239			return nil, fmt.Errorf("storage: invalid Content-Range %q", cr)
240		}
241
242		dashIndex := strings.Index(cr, "-")
243		if dashIndex >= 0 {
244			startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64)
245			if err != nil {
246				return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err)
247			}
248		}
249	} else {
250		size = res.ContentLength
251		// Check the CRC iff all of the following hold:
252		// - We asked for content (length != 0).
253		// - We got all the content (status != PartialContent).
254		// - The server sent a CRC header.
255		// - The Go http stack did not uncompress the file.
256		// - We were not served compressed data that was uncompressed on download.
257		// The problem with the last two cases is that the CRC will not match -- GCS
258		// computes it on the compressed contents, but we compute it on the
259		// uncompressed contents.
260		if length != 0 && !res.Uncompressed && !uncompressedByServer(res) {
261			crc, checkCRC = parseCRC32c(res)
262		}
263	}
264
265	remain := res.ContentLength
266	body := res.Body
267	if length == 0 {
268		remain = 0
269		body.Close()
270		body = emptyBody
271	}
272	var metaGen int64
273	if res.Header.Get("X-Goog-Metageneration") != "" {
274		metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64)
275		if err != nil {
276			return nil, err
277		}
278	}
279
280	var lm time.Time
281	if res.Header.Get("Last-Modified") != "" {
282		lm, err = http.ParseTime(res.Header.Get("Last-Modified"))
283		if err != nil {
284			return nil, err
285		}
286	}
287
288	attrs := ReaderObjectAttrs{
289		Size:            size,
290		ContentType:     res.Header.Get("Content-Type"),
291		ContentEncoding: res.Header.Get("Content-Encoding"),
292		CacheControl:    res.Header.Get("Cache-Control"),
293		LastModified:    lm,
294		StartOffset:     startOffset,
295		Generation:      gen,
296		Metageneration:  metaGen,
297	}
298	return &Reader{
299		Attrs:    attrs,
300		body:     body,
301		size:     size,
302		remain:   remain,
303		wantCRC:  crc,
304		checkCRC: checkCRC,
305		reopen:   reopen,
306	}, nil
307}
308
309// decompressiveTranscoding returns true if the request was served decompressed
310// and different than its original storage form. This happens when the "Content-Encoding"
311// header is "gzip".
312// See:
313//  * https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip
314//  * https://github.com/googleapis/google-cloud-go/issues/1800
315func decompressiveTranscoding(res *http.Response) bool {
316	// Decompressive Transcoding.
317	return res.Header.Get("Content-Encoding") == "gzip" ||
318		res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip"
319}
320
321func uncompressedByServer(res *http.Response) bool {
322	// If the data is stored as gzip but is not encoded as gzip, then it
323	// was uncompressed by the server.
324	return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" &&
325		res.Header.Get("Content-Encoding") != "gzip"
326}
327
328func parseCRC32c(res *http.Response) (uint32, bool) {
329	const prefix = "crc32c="
330	for _, spec := range res.Header["X-Goog-Hash"] {
331		if strings.HasPrefix(spec, prefix) {
332			c, err := decodeUint32(spec[len(prefix):])
333			if err == nil {
334				return c, true
335			}
336		}
337	}
338	return 0, false
339}
340
341// setConditionsHeaders sets precondition request headers for downloads
342// using the XML API. It assumes that the conditions have been validated.
343func setConditionsHeaders(headers http.Header, conds *Conditions) error {
344	if conds == nil {
345		return nil
346	}
347	if conds.MetagenerationMatch != 0 {
348		headers.Set("x-goog-if-metageneration-match", fmt.Sprint(conds.MetagenerationMatch))
349	}
350	switch {
351	case conds.GenerationMatch != 0:
352		headers.Set("x-goog-if-generation-match", fmt.Sprint(conds.GenerationMatch))
353	case conds.DoesNotExist:
354		headers.Set("x-goog-if-generation-match", "0")
355	}
356	return nil
357}
358
359var emptyBody = ioutil.NopCloser(strings.NewReader(""))
360
361// Reader reads a Cloud Storage object.
362// It implements io.Reader.
363//
364// Typically, a Reader computes the CRC of the downloaded content and compares it to
365// the stored CRC, returning an error from Read if there is a mismatch. This integrity check
366// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding.
367type Reader struct {
368	Attrs              ReaderObjectAttrs
369	body               io.ReadCloser
370	seen, remain, size int64
371	checkCRC           bool   // should we check the CRC?
372	wantCRC            uint32 // the CRC32c value the server sent in the header
373	gotCRC             uint32 // running crc
374	reopen             func(seen int64) (*http.Response, error)
375
376	// The following fields are only for use in the gRPC hybrid client.
377	stream         storagepb.Storage_ReadObjectClient
378	reopenWithGRPC func(seen int64) (*readStreamResponse, context.CancelFunc, error)
379	leftovers      []byte
380	cancelStream   context.CancelFunc
381}
382
383type readStreamResponse struct {
384	stream   storagepb.Storage_ReadObjectClient
385	response *storagepb.ReadObjectResponse
386}
387
388// Close closes the Reader. It must be called when done reading.
389func (r *Reader) Close() error {
390	if r.body != nil {
391		return r.body.Close()
392	}
393
394	r.closeStream()
395	return nil
396}
397
398func (r *Reader) Read(p []byte) (int, error) {
399	read := r.readWithRetry
400	if r.reopenWithGRPC != nil {
401		read = r.readWithGRPC
402	}
403
404	n, err := read(p)
405	if r.remain != -1 {
406		r.remain -= int64(n)
407	}
408	if r.checkCRC {
409		r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
410		// Check CRC here. It would be natural to check it in Close, but
411		// everybody defers Close on the assumption that it doesn't return
412		// anything worth looking at.
413		if err == io.EOF {
414			if r.gotCRC != r.wantCRC {
415				return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
416					r.gotCRC, r.wantCRC)
417			}
418		}
419	}
420	return n, err
421}
422
423// newRangeReaderWithGRPC creates a new Reader with the given range that uses
424// gRPC to read Object content.
425//
426// This is an experimental API and not intended for public use.
427func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, length int64) (r *Reader, err error) {
428	ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.newRangeReaderWithGRPC")
429	defer func() { trace.EndSpan(ctx, err) }()
430
431	if o.c.gc == nil {
432		err = fmt.Errorf("handle doesn't have a gRPC client initialized")
433		return
434	}
435	if err = o.validate(); err != nil {
436		return
437	}
438
439	// A negative length means "read to the end of the object", but the
440	// read_limit field it corresponds to uses zero to mean the same thing. Thus
441	// we coerce the length to 0 to read to the end of the object.
442	if length < 0 {
443		length = 0
444	}
445
446	// For now, there are only globally unique buckets, and "_" is the alias
447	// project ID for such buckets.
448	b := bucketResourceName("_", o.bucket)
449	req := &storagepb.ReadObjectRequest{
450		Bucket: b,
451		Object: o.object,
452	}
453	// The default is a negative value, which means latest.
454	if o.gen >= 0 {
455		req.Generation = o.gen
456	}
457
458	// Define a function that initiates a Read with offset and length, assuming
459	// we have already read seen bytes.
460	reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) {
461		// If the context has already expired, return immediately without making
462		// we call.
463		if err := ctx.Err(); err != nil {
464			return nil, nil, err
465		}
466
467		cc, cancel := context.WithCancel(ctx)
468
469		start := offset + seen
470		// Only set a ReadLimit if length is greater than zero, because zero
471		// means read it all.
472		if length > 0 {
473			req.ReadLimit = length - seen
474		}
475		req.ReadOffset = start
476
477		if err := applyCondsProto("reopenWithGRPC", o.gen, o.conds, req); err != nil {
478			cancel()
479			return nil, nil, err
480		}
481
482		var stream storagepb.Storage_ReadObjectClient
483		var msg *storagepb.ReadObjectResponse
484		var err error
485
486		err = runWithRetry(cc, func() error {
487			stream, err = o.c.gc.ReadObject(cc, req)
488			if err != nil {
489				return err
490			}
491
492			msg, err = stream.Recv()
493
494			return err
495		})
496		if err != nil {
497			// Close the stream context we just created to ensure we don't leak
498			// resources.
499			cancel()
500			return nil, nil, err
501		}
502
503		return &readStreamResponse{stream, msg}, cancel, nil
504	}
505
506	res, cancel, err := reopen(0)
507	if err != nil {
508		return nil, err
509	}
510
511	r = &Reader{
512		stream:         res.stream,
513		reopenWithGRPC: reopen,
514		cancelStream:   cancel,
515	}
516
517	// The first message was Recv'd on stream open, use it to populate the
518	// object metadata.
519	msg := res.response
520	obj := msg.GetMetadata()
521	// This is the size of the entire object, even if only a range was requested.
522	size := obj.GetSize()
523
524	r.Attrs = ReaderObjectAttrs{
525		Size:            size,
526		ContentType:     obj.GetContentType(),
527		ContentEncoding: obj.GetContentEncoding(),
528		CacheControl:    obj.GetCacheControl(),
529		LastModified:    obj.GetUpdateTime().AsTime(),
530		Metageneration:  obj.GetMetageneration(),
531		Generation:      obj.GetGeneration(),
532	}
533
534	r.size = size
535	cr := msg.GetContentRange()
536	if cr != nil {
537		r.Attrs.StartOffset = cr.GetStart()
538		r.remain = cr.GetEnd() - cr.GetStart() + 1
539	} else {
540		r.remain = size
541	}
542
543	// Only support checksums when reading an entire object, not a range.
544	if msg.GetObjectChecksums().Crc32C != nil && offset == 0 && length == 0 {
545		r.wantCRC = msg.GetObjectChecksums().GetCrc32C()
546		r.checkCRC = true
547	}
548
549	// Store the content from the first Recv in the client buffer for reading
550	// later.
551	r.leftovers = msg.GetChecksummedData().GetContent()
552
553	return r, nil
554}
555
556func (r *Reader) readWithRetry(p []byte) (int, error) {
557	n := 0
558	for len(p[n:]) > 0 {
559		m, err := r.body.Read(p[n:])
560		n += m
561		r.seen += int64(m)
562		if err == nil || err == io.EOF {
563			return n, err
564		}
565		// Read failed (likely due to connection issues), but we will try to reopen
566		// the pipe and continue. Send a ranged read request that takes into account
567		// the number of bytes we've already seen.
568		res, err := r.reopen(r.seen)
569		if err != nil {
570			// reopen already retries
571			return n, err
572		}
573		r.body.Close()
574		r.body = res.Body
575	}
576	return n, nil
577}
578
579// closeStream cancels a stream's context in order for it to be closed and
580// collected.
581//
582// This is an experimental API and not intended for public use.
583func (r *Reader) closeStream() {
584	if r.cancelStream != nil {
585		r.cancelStream()
586	}
587	r.stream = nil
588}
589
590// readWithGRPC reads bytes into the user's buffer from an open gRPC stream.
591//
592// This is an experimental API and not intended for public use.
593func (r *Reader) readWithGRPC(p []byte) (int, error) {
594	// No stream to read from, either never initiliazed or Close was called.
595	// Note: There is a potential concurrency issue if multiple routines are
596	// using the same reader. One encounters an error and the stream is closed
597	// and then reopened while the other routine attempts to read from it.
598	if r.stream == nil {
599		return 0, fmt.Errorf("reader has been closed")
600	}
601
602	// The entire object has been read by this reader, return EOF.
603	if r.size != 0 && r.size == r.seen {
604		return 0, io.EOF
605	}
606
607	var n int
608	// Read leftovers and return what was available to conform to the Reader
609	// interface: https://pkg.go.dev/io#Reader.
610	if len(r.leftovers) > 0 {
611		n = copy(p, r.leftovers)
612		r.seen += int64(n)
613		r.leftovers = r.leftovers[n:]
614		return n, nil
615	}
616
617	// Attempt to Recv the next message on the stream.
618	msg, err := r.recv()
619	if err != nil {
620		return 0, err
621	}
622
623	// TODO: Determine if we need to capture incremental CRC32C for this
624	// chunk. The Object CRC32C checksum is captured when directed to read
625	// the entire Object. If directed to read a range, we may need to
626	// calculate the range's checksum for verification if the checksum is
627	// present in the response here.
628	// TODO: Figure out if we need to support decompressive transcoding
629	// https://cloud.google.com/storage/docs/transcoding.
630	content := msg.GetChecksummedData().GetContent()
631	n = copy(p[n:], content)
632	leftover := len(content) - n
633	if leftover > 0 {
634		// Wasn't able to copy all of the data in the message, store for
635		// future Read calls.
636		r.leftovers = content[n:]
637	}
638	r.seen += int64(n)
639
640	return n, nil
641}
642
643// recv attempts to Recv the next message on the stream. In the event
644// that a retryable error is encountered, the stream will be closed, reopened,
645// and Recv again. This will attempt to Recv until one of the following is true:
646//
647// * Recv is successful
648// * A non-retryable error is encountered
649// * The Reader's context is canceled
650//
651// The last error received is the one that is returned, which could be from
652// an attempt to reopen the stream.
653//
654// This is an experimental API and not intended for public use.
655func (r *Reader) recv() (*storagepb.ReadObjectResponse, error) {
656	msg, err := r.stream.Recv()
657	if err != nil && shouldRetry(err) {
658		// This will "close" the existing stream and immediately attempt to
659		// reopen the stream, but will backoff if further attempts are necessary.
660		// Reopening the stream Recvs the first message, so if retrying is
661		// successful, the next logical chunk will be returned.
662		msg, err = r.reopenStream(r.seen)
663	}
664
665	return msg, err
666}
667
668// reopenStream "closes" the existing stream and attempts to reopen a stream and
669// sets the Reader's stream and cancelStream properties in the process.
670//
671// This is an experimental API and not intended for public use.
672func (r *Reader) reopenStream(seen int64) (*storagepb.ReadObjectResponse, error) {
673	// Close existing stream and initialize new stream with updated offset.
674	r.closeStream()
675
676	res, cancel, err := r.reopenWithGRPC(r.seen)
677	if err != nil {
678		return nil, err
679	}
680	r.stream = res.stream
681	r.cancelStream = cancel
682	return res.response, nil
683}
684
685// Size returns the size of the object in bytes.
686// The returned value is always the same and is not affected by
687// calls to Read or Close.
688//
689// Deprecated: use Reader.Attrs.Size.
690func (r *Reader) Size() int64 {
691	return r.Attrs.Size
692}
693
694// Remain returns the number of bytes left to read, or -1 if unknown.
695func (r *Reader) Remain() int64 {
696	return r.remain
697}
698
699// ContentType returns the content type of the object.
700//
701// Deprecated: use Reader.Attrs.ContentType.
702func (r *Reader) ContentType() string {
703	return r.Attrs.ContentType
704}
705
706// ContentEncoding returns the content encoding of the object.
707//
708// Deprecated: use Reader.Attrs.ContentEncoding.
709func (r *Reader) ContentEncoding() string {
710	return r.Attrs.ContentEncoding
711}
712
713// CacheControl returns the cache control of the object.
714//
715// Deprecated: use Reader.Attrs.CacheControl.
716func (r *Reader) CacheControl() string {
717	return r.Attrs.CacheControl
718}
719
720// LastModified returns the value of the Last-Modified header.
721//
722// Deprecated: use Reader.Attrs.LastModified.
723func (r *Reader) LastModified() (time.Time, error) {
724	return r.Attrs.LastModified, nil
725}
726