1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "hash/crc32" 22 "io" 23 "io/ioutil" 24 "net/http" 25 "net/url" 26 "strconv" 27 "strings" 28 "time" 29 30 "cloud.google.com/go/internal/trace" 31 "google.golang.org/api/googleapi" 32 storagepb "google.golang.org/genproto/googleapis/storage/v2" 33) 34 35var crc32cTable = crc32.MakeTable(crc32.Castagnoli) 36 37// ReaderObjectAttrs are attributes about the object being read. These are populated 38// during the New call. This struct only holds a subset of object attributes: to 39// get the full set of attributes, use ObjectHandle.Attrs. 40// 41// Each field is read-only. 42type ReaderObjectAttrs struct { 43 // Size is the length of the object's content. 44 Size int64 45 46 // StartOffset is the byte offset within the object 47 // from which reading begins. 48 // This value is only non-zero for range requests. 49 StartOffset int64 50 51 // ContentType is the MIME type of the object's content. 52 ContentType string 53 54 // ContentEncoding is the encoding of the object's content. 55 ContentEncoding string 56 57 // CacheControl specifies whether and for how long browser and Internet 58 // caches are allowed to cache your objects. 59 CacheControl string 60 61 // LastModified is the time that the object was last modified. 62 LastModified time.Time 63 64 // Generation is the generation number of the object's content. 65 Generation int64 66 67 // Metageneration is the version of the metadata for this object at 68 // this generation. This field is used for preconditions and for 69 // detecting changes in metadata. A metageneration number is only 70 // meaningful in the context of a particular generation of a 71 // particular object. 72 Metageneration int64 73} 74 75// NewReader creates a new Reader to read the contents of the 76// object. 77// ErrObjectNotExist will be returned if the object is not found. 78// 79// The caller must call Close on the returned Reader when done reading. 80func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) { 81 return o.NewRangeReader(ctx, 0, -1) 82} 83 84// NewRangeReader reads part of an object, reading at most length bytes 85// starting at the given offset. If length is negative, the object is read 86// until the end. If offset is negative, the object is read abs(offset) bytes 87// from the end, and length must also be negative to indicate all remaining 88// bytes will be read. 89// 90// If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies 91// decompressive transcoding per https://cloud.google.com/storage/docs/transcoding 92// that file will be served back whole, regardless of the requested range as 93// Google Cloud Storage dictates. 94func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) { 95 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader") 96 defer func() { trace.EndSpan(ctx, err) }() 97 98 if o.c.gc != nil { 99 return o.newRangeReaderWithGRPC(ctx, offset, length) 100 } 101 102 if err := o.validate(); err != nil { 103 return nil, err 104 } 105 if offset < 0 && length >= 0 { 106 return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", offset) 107 } 108 if o.conds != nil { 109 if err := o.conds.validate("NewRangeReader"); err != nil { 110 return nil, err 111 } 112 } 113 u := &url.URL{ 114 Scheme: o.c.scheme, 115 Host: o.c.readHost, 116 Path: fmt.Sprintf("/%s/%s", o.bucket, o.object), 117 } 118 verb := "GET" 119 if length == 0 { 120 verb = "HEAD" 121 } 122 req, err := http.NewRequest(verb, u.String(), nil) 123 if err != nil { 124 return nil, err 125 } 126 req = req.WithContext(ctx) 127 if o.userProject != "" { 128 req.Header.Set("X-Goog-User-Project", o.userProject) 129 } 130 if o.readCompressed { 131 req.Header.Set("Accept-Encoding", "gzip") 132 } 133 if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil { 134 return nil, err 135 } 136 137 gen := o.gen 138 139 // Define a function that initiates a Read with offset and length, assuming we 140 // have already read seen bytes. 141 reopen := func(seen int64) (*http.Response, error) { 142 // If the context has already expired, return immediately without making a 143 // call. 144 if err := ctx.Err(); err != nil { 145 return nil, err 146 } 147 start := offset + seen 148 if length < 0 && start < 0 { 149 req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) 150 } else if length < 0 && start > 0 { 151 req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start)) 152 } else if length > 0 { 153 // The end character isn't affected by how many bytes we've seen. 154 req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1)) 155 } 156 // We wait to assign conditions here because the generation number can change in between reopen() runs. 157 if err := setConditionsHeaders(req.Header, o.conds); err != nil { 158 return nil, err 159 } 160 // If an object generation is specified, include generation as query string parameters. 161 if gen >= 0 { 162 req.URL.RawQuery = fmt.Sprintf("generation=%d", gen) 163 } 164 165 var res *http.Response 166 err = runWithRetry(ctx, func() error { 167 res, err = o.c.hc.Do(req) 168 if err != nil { 169 return err 170 } 171 if res.StatusCode == http.StatusNotFound { 172 res.Body.Close() 173 return ErrObjectNotExist 174 } 175 if res.StatusCode < 200 || res.StatusCode > 299 { 176 body, _ := ioutil.ReadAll(res.Body) 177 res.Body.Close() 178 return &googleapi.Error{ 179 Code: res.StatusCode, 180 Header: res.Header, 181 Body: string(body), 182 } 183 } 184 185 partialContentNotSatisfied := 186 !decompressiveTranscoding(res) && 187 start > 0 && length != 0 && 188 res.StatusCode != http.StatusPartialContent 189 190 if partialContentNotSatisfied { 191 res.Body.Close() 192 return errors.New("storage: partial request not satisfied") 193 } 194 195 // With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves 196 // back the whole file regardless of the range count passed in as per: 197 // https://cloud.google.com/storage/docs/transcoding#range, 198 // thus we have to manually move the body forward by seen bytes. 199 if decompressiveTranscoding(res) && seen > 0 { 200 _, _ = io.CopyN(ioutil.Discard, res.Body, seen) 201 } 202 203 // If a generation hasn't been specified, and this is the first response we get, let's record the 204 // generation. In future requests we'll use this generation as a precondition to avoid data races. 205 if gen < 0 && res.Header.Get("X-Goog-Generation") != "" { 206 gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64) 207 if err != nil { 208 return err 209 } 210 gen = gen64 211 } 212 return nil 213 }) 214 if err != nil { 215 return nil, err 216 } 217 return res, nil 218 } 219 220 res, err := reopen(0) 221 if err != nil { 222 return nil, err 223 } 224 var ( 225 size int64 // total size of object, even if a range was requested. 226 checkCRC bool 227 crc uint32 228 startOffset int64 // non-zero if range request. 229 ) 230 if res.StatusCode == http.StatusPartialContent { 231 cr := strings.TrimSpace(res.Header.Get("Content-Range")) 232 if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") { 233 return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) 234 } 235 // Content range is formatted <first byte>-<last byte>/<total size>. We take 236 // the total size. 237 size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64) 238 if err != nil { 239 return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) 240 } 241 242 dashIndex := strings.Index(cr, "-") 243 if dashIndex >= 0 { 244 startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64) 245 if err != nil { 246 return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err) 247 } 248 } 249 } else { 250 size = res.ContentLength 251 // Check the CRC iff all of the following hold: 252 // - We asked for content (length != 0). 253 // - We got all the content (status != PartialContent). 254 // - The server sent a CRC header. 255 // - The Go http stack did not uncompress the file. 256 // - We were not served compressed data that was uncompressed on download. 257 // The problem with the last two cases is that the CRC will not match -- GCS 258 // computes it on the compressed contents, but we compute it on the 259 // uncompressed contents. 260 if length != 0 && !res.Uncompressed && !uncompressedByServer(res) { 261 crc, checkCRC = parseCRC32c(res) 262 } 263 } 264 265 remain := res.ContentLength 266 body := res.Body 267 if length == 0 { 268 remain = 0 269 body.Close() 270 body = emptyBody 271 } 272 var metaGen int64 273 if res.Header.Get("X-Goog-Metageneration") != "" { 274 metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64) 275 if err != nil { 276 return nil, err 277 } 278 } 279 280 var lm time.Time 281 if res.Header.Get("Last-Modified") != "" { 282 lm, err = http.ParseTime(res.Header.Get("Last-Modified")) 283 if err != nil { 284 return nil, err 285 } 286 } 287 288 attrs := ReaderObjectAttrs{ 289 Size: size, 290 ContentType: res.Header.Get("Content-Type"), 291 ContentEncoding: res.Header.Get("Content-Encoding"), 292 CacheControl: res.Header.Get("Cache-Control"), 293 LastModified: lm, 294 StartOffset: startOffset, 295 Generation: gen, 296 Metageneration: metaGen, 297 } 298 return &Reader{ 299 Attrs: attrs, 300 body: body, 301 size: size, 302 remain: remain, 303 wantCRC: crc, 304 checkCRC: checkCRC, 305 reopen: reopen, 306 }, nil 307} 308 309// decompressiveTranscoding returns true if the request was served decompressed 310// and different than its original storage form. This happens when the "Content-Encoding" 311// header is "gzip". 312// See: 313// * https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip 314// * https://github.com/googleapis/google-cloud-go/issues/1800 315func decompressiveTranscoding(res *http.Response) bool { 316 // Decompressive Transcoding. 317 return res.Header.Get("Content-Encoding") == "gzip" || 318 res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" 319} 320 321func uncompressedByServer(res *http.Response) bool { 322 // If the data is stored as gzip but is not encoded as gzip, then it 323 // was uncompressed by the server. 324 return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" && 325 res.Header.Get("Content-Encoding") != "gzip" 326} 327 328func parseCRC32c(res *http.Response) (uint32, bool) { 329 const prefix = "crc32c=" 330 for _, spec := range res.Header["X-Goog-Hash"] { 331 if strings.HasPrefix(spec, prefix) { 332 c, err := decodeUint32(spec[len(prefix):]) 333 if err == nil { 334 return c, true 335 } 336 } 337 } 338 return 0, false 339} 340 341// setConditionsHeaders sets precondition request headers for downloads 342// using the XML API. It assumes that the conditions have been validated. 343func setConditionsHeaders(headers http.Header, conds *Conditions) error { 344 if conds == nil { 345 return nil 346 } 347 if conds.MetagenerationMatch != 0 { 348 headers.Set("x-goog-if-metageneration-match", fmt.Sprint(conds.MetagenerationMatch)) 349 } 350 switch { 351 case conds.GenerationMatch != 0: 352 headers.Set("x-goog-if-generation-match", fmt.Sprint(conds.GenerationMatch)) 353 case conds.DoesNotExist: 354 headers.Set("x-goog-if-generation-match", "0") 355 } 356 return nil 357} 358 359var emptyBody = ioutil.NopCloser(strings.NewReader("")) 360 361// Reader reads a Cloud Storage object. 362// It implements io.Reader. 363// 364// Typically, a Reader computes the CRC of the downloaded content and compares it to 365// the stored CRC, returning an error from Read if there is a mismatch. This integrity check 366// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding. 367type Reader struct { 368 Attrs ReaderObjectAttrs 369 body io.ReadCloser 370 seen, remain, size int64 371 checkCRC bool // should we check the CRC? 372 wantCRC uint32 // the CRC32c value the server sent in the header 373 gotCRC uint32 // running crc 374 reopen func(seen int64) (*http.Response, error) 375 376 // The following fields are only for use in the gRPC hybrid client. 377 stream storagepb.Storage_ReadObjectClient 378 reopenWithGRPC func(seen int64) (*readStreamResponse, context.CancelFunc, error) 379 leftovers []byte 380 cancelStream context.CancelFunc 381} 382 383type readStreamResponse struct { 384 stream storagepb.Storage_ReadObjectClient 385 response *storagepb.ReadObjectResponse 386} 387 388// Close closes the Reader. It must be called when done reading. 389func (r *Reader) Close() error { 390 if r.body != nil { 391 return r.body.Close() 392 } 393 394 r.closeStream() 395 return nil 396} 397 398func (r *Reader) Read(p []byte) (int, error) { 399 read := r.readWithRetry 400 if r.reopenWithGRPC != nil { 401 read = r.readWithGRPC 402 } 403 404 n, err := read(p) 405 if r.remain != -1 { 406 r.remain -= int64(n) 407 } 408 if r.checkCRC { 409 r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n]) 410 // Check CRC here. It would be natural to check it in Close, but 411 // everybody defers Close on the assumption that it doesn't return 412 // anything worth looking at. 413 if err == io.EOF { 414 if r.gotCRC != r.wantCRC { 415 return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d", 416 r.gotCRC, r.wantCRC) 417 } 418 } 419 } 420 return n, err 421} 422 423// newRangeReaderWithGRPC creates a new Reader with the given range that uses 424// gRPC to read Object content. 425// 426// This is an experimental API and not intended for public use. 427func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, length int64) (r *Reader, err error) { 428 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.newRangeReaderWithGRPC") 429 defer func() { trace.EndSpan(ctx, err) }() 430 431 if o.c.gc == nil { 432 err = fmt.Errorf("handle doesn't have a gRPC client initialized") 433 return 434 } 435 if err = o.validate(); err != nil { 436 return 437 } 438 439 // A negative length means "read to the end of the object", but the 440 // read_limit field it corresponds to uses zero to mean the same thing. Thus 441 // we coerce the length to 0 to read to the end of the object. 442 if length < 0 { 443 length = 0 444 } 445 446 // For now, there are only globally unique buckets, and "_" is the alias 447 // project ID for such buckets. 448 b := bucketResourceName("_", o.bucket) 449 req := &storagepb.ReadObjectRequest{ 450 Bucket: b, 451 Object: o.object, 452 } 453 // The default is a negative value, which means latest. 454 if o.gen >= 0 { 455 req.Generation = o.gen 456 } 457 458 // Define a function that initiates a Read with offset and length, assuming 459 // we have already read seen bytes. 460 reopen := func(seen int64) (*readStreamResponse, context.CancelFunc, error) { 461 // If the context has already expired, return immediately without making 462 // we call. 463 if err := ctx.Err(); err != nil { 464 return nil, nil, err 465 } 466 467 cc, cancel := context.WithCancel(ctx) 468 469 start := offset + seen 470 // Only set a ReadLimit if length is greater than zero, because zero 471 // means read it all. 472 if length > 0 { 473 req.ReadLimit = length - seen 474 } 475 req.ReadOffset = start 476 477 if err := applyCondsProto("reopenWithGRPC", o.gen, o.conds, req); err != nil { 478 cancel() 479 return nil, nil, err 480 } 481 482 var stream storagepb.Storage_ReadObjectClient 483 var msg *storagepb.ReadObjectResponse 484 var err error 485 486 err = runWithRetry(cc, func() error { 487 stream, err = o.c.gc.ReadObject(cc, req) 488 if err != nil { 489 return err 490 } 491 492 msg, err = stream.Recv() 493 494 return err 495 }) 496 if err != nil { 497 // Close the stream context we just created to ensure we don't leak 498 // resources. 499 cancel() 500 return nil, nil, err 501 } 502 503 return &readStreamResponse{stream, msg}, cancel, nil 504 } 505 506 res, cancel, err := reopen(0) 507 if err != nil { 508 return nil, err 509 } 510 511 r = &Reader{ 512 stream: res.stream, 513 reopenWithGRPC: reopen, 514 cancelStream: cancel, 515 } 516 517 // The first message was Recv'd on stream open, use it to populate the 518 // object metadata. 519 msg := res.response 520 obj := msg.GetMetadata() 521 // This is the size of the entire object, even if only a range was requested. 522 size := obj.GetSize() 523 524 r.Attrs = ReaderObjectAttrs{ 525 Size: size, 526 ContentType: obj.GetContentType(), 527 ContentEncoding: obj.GetContentEncoding(), 528 CacheControl: obj.GetCacheControl(), 529 LastModified: obj.GetUpdateTime().AsTime(), 530 Metageneration: obj.GetMetageneration(), 531 Generation: obj.GetGeneration(), 532 } 533 534 r.size = size 535 cr := msg.GetContentRange() 536 if cr != nil { 537 r.Attrs.StartOffset = cr.GetStart() 538 r.remain = cr.GetEnd() - cr.GetStart() + 1 539 } else { 540 r.remain = size 541 } 542 543 // Only support checksums when reading an entire object, not a range. 544 if msg.GetObjectChecksums().Crc32C != nil && offset == 0 && length == 0 { 545 r.wantCRC = msg.GetObjectChecksums().GetCrc32C() 546 r.checkCRC = true 547 } 548 549 // Store the content from the first Recv in the client buffer for reading 550 // later. 551 r.leftovers = msg.GetChecksummedData().GetContent() 552 553 return r, nil 554} 555 556func (r *Reader) readWithRetry(p []byte) (int, error) { 557 n := 0 558 for len(p[n:]) > 0 { 559 m, err := r.body.Read(p[n:]) 560 n += m 561 r.seen += int64(m) 562 if err == nil || err == io.EOF { 563 return n, err 564 } 565 // Read failed (likely due to connection issues), but we will try to reopen 566 // the pipe and continue. Send a ranged read request that takes into account 567 // the number of bytes we've already seen. 568 res, err := r.reopen(r.seen) 569 if err != nil { 570 // reopen already retries 571 return n, err 572 } 573 r.body.Close() 574 r.body = res.Body 575 } 576 return n, nil 577} 578 579// closeStream cancels a stream's context in order for it to be closed and 580// collected. 581// 582// This is an experimental API and not intended for public use. 583func (r *Reader) closeStream() { 584 if r.cancelStream != nil { 585 r.cancelStream() 586 } 587 r.stream = nil 588} 589 590// readWithGRPC reads bytes into the user's buffer from an open gRPC stream. 591// 592// This is an experimental API and not intended for public use. 593func (r *Reader) readWithGRPC(p []byte) (int, error) { 594 // No stream to read from, either never initiliazed or Close was called. 595 // Note: There is a potential concurrency issue if multiple routines are 596 // using the same reader. One encounters an error and the stream is closed 597 // and then reopened while the other routine attempts to read from it. 598 if r.stream == nil { 599 return 0, fmt.Errorf("reader has been closed") 600 } 601 602 // The entire object has been read by this reader, return EOF. 603 if r.size != 0 && r.size == r.seen { 604 return 0, io.EOF 605 } 606 607 var n int 608 // Read leftovers and return what was available to conform to the Reader 609 // interface: https://pkg.go.dev/io#Reader. 610 if len(r.leftovers) > 0 { 611 n = copy(p, r.leftovers) 612 r.seen += int64(n) 613 r.leftovers = r.leftovers[n:] 614 return n, nil 615 } 616 617 // Attempt to Recv the next message on the stream. 618 msg, err := r.recv() 619 if err != nil { 620 return 0, err 621 } 622 623 // TODO: Determine if we need to capture incremental CRC32C for this 624 // chunk. The Object CRC32C checksum is captured when directed to read 625 // the entire Object. If directed to read a range, we may need to 626 // calculate the range's checksum for verification if the checksum is 627 // present in the response here. 628 // TODO: Figure out if we need to support decompressive transcoding 629 // https://cloud.google.com/storage/docs/transcoding. 630 content := msg.GetChecksummedData().GetContent() 631 n = copy(p[n:], content) 632 leftover := len(content) - n 633 if leftover > 0 { 634 // Wasn't able to copy all of the data in the message, store for 635 // future Read calls. 636 r.leftovers = content[n:] 637 } 638 r.seen += int64(n) 639 640 return n, nil 641} 642 643// recv attempts to Recv the next message on the stream. In the event 644// that a retryable error is encountered, the stream will be closed, reopened, 645// and Recv again. This will attempt to Recv until one of the following is true: 646// 647// * Recv is successful 648// * A non-retryable error is encountered 649// * The Reader's context is canceled 650// 651// The last error received is the one that is returned, which could be from 652// an attempt to reopen the stream. 653// 654// This is an experimental API and not intended for public use. 655func (r *Reader) recv() (*storagepb.ReadObjectResponse, error) { 656 msg, err := r.stream.Recv() 657 if err != nil && shouldRetry(err) { 658 // This will "close" the existing stream and immediately attempt to 659 // reopen the stream, but will backoff if further attempts are necessary. 660 // Reopening the stream Recvs the first message, so if retrying is 661 // successful, the next logical chunk will be returned. 662 msg, err = r.reopenStream(r.seen) 663 } 664 665 return msg, err 666} 667 668// reopenStream "closes" the existing stream and attempts to reopen a stream and 669// sets the Reader's stream and cancelStream properties in the process. 670// 671// This is an experimental API and not intended for public use. 672func (r *Reader) reopenStream(seen int64) (*storagepb.ReadObjectResponse, error) { 673 // Close existing stream and initialize new stream with updated offset. 674 r.closeStream() 675 676 res, cancel, err := r.reopenWithGRPC(r.seen) 677 if err != nil { 678 return nil, err 679 } 680 r.stream = res.stream 681 r.cancelStream = cancel 682 return res.response, nil 683} 684 685// Size returns the size of the object in bytes. 686// The returned value is always the same and is not affected by 687// calls to Read or Close. 688// 689// Deprecated: use Reader.Attrs.Size. 690func (r *Reader) Size() int64 { 691 return r.Attrs.Size 692} 693 694// Remain returns the number of bytes left to read, or -1 if unknown. 695func (r *Reader) Remain() int64 { 696 return r.remain 697} 698 699// ContentType returns the content type of the object. 700// 701// Deprecated: use Reader.Attrs.ContentType. 702func (r *Reader) ContentType() string { 703 return r.Attrs.ContentType 704} 705 706// ContentEncoding returns the content encoding of the object. 707// 708// Deprecated: use Reader.Attrs.ContentEncoding. 709func (r *Reader) ContentEncoding() string { 710 return r.Attrs.ContentEncoding 711} 712 713// CacheControl returns the cache control of the object. 714// 715// Deprecated: use Reader.Attrs.CacheControl. 716func (r *Reader) CacheControl() string { 717 return r.Attrs.CacheControl 718} 719 720// LastModified returns the value of the Last-Modified header. 721// 722// Deprecated: use Reader.Attrs.LastModified. 723func (r *Reader) LastModified() (time.Time, error) { 724 return r.Attrs.LastModified, nil 725} 726