1// Copyright 2016 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "hash/crc32" 22 "io" 23 "io/ioutil" 24 "net/http" 25 "net/url" 26 "reflect" 27 "strconv" 28 "strings" 29 "time" 30 31 "cloud.google.com/go/internal/trace" 32 "google.golang.org/api/googleapi" 33) 34 35var crc32cTable = crc32.MakeTable(crc32.Castagnoli) 36 37// ReaderObjectAttrs are attributes about the object being read. These are populated 38// during the New call. This struct only holds a subset of object attributes: to 39// get the full set of attributes, use ObjectHandle.Attrs. 40// 41// Each field is read-only. 42type ReaderObjectAttrs struct { 43 // Size is the length of the object's content. 44 Size int64 45 46 // StartOffset is the byte offset within the object 47 // from which reading begins. 48 // This value is only non-zero for range requests. 49 StartOffset int64 50 51 // ContentType is the MIME type of the object's content. 52 ContentType string 53 54 // ContentEncoding is the encoding of the object's content. 55 ContentEncoding string 56 57 // CacheControl specifies whether and for how long browser and Internet 58 // caches are allowed to cache your objects. 59 CacheControl string 60 61 // LastModified is the time that the object was last modified. 62 LastModified time.Time 63 64 // Generation is the generation number of the object's content. 65 Generation int64 66 67 // Metageneration is the version of the metadata for this object at 68 // this generation. This field is used for preconditions and for 69 // detecting changes in metadata. A metageneration number is only 70 // meaningful in the context of a particular generation of a 71 // particular object. 72 Metageneration int64 73} 74 75// NewReader creates a new Reader to read the contents of the 76// object. 77// ErrObjectNotExist will be returned if the object is not found. 78// 79// The caller must call Close on the returned Reader when done reading. 80func (o *ObjectHandle) NewReader(ctx context.Context) (*Reader, error) { 81 return o.NewRangeReader(ctx, 0, -1) 82} 83 84// NewRangeReader reads part of an object, reading at most length bytes 85// starting at the given offset. If length is negative, the object is read 86// until the end. If offset is negative, the object is read abs(offset) bytes 87// from the end, and length must also be negative to indicate all remaining 88// bytes will be read. 89func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) (r *Reader, err error) { 90 ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader") 91 defer func() { trace.EndSpan(ctx, err) }() 92 93 if err := o.validate(); err != nil { 94 return nil, err 95 } 96 if offset < 0 && length >= 0 { 97 return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", offset) 98 } 99 if o.conds != nil { 100 if err := o.conds.validate("NewRangeReader"); err != nil { 101 return nil, err 102 } 103 } 104 u := &url.URL{ 105 Scheme: o.c.scheme, 106 Host: o.c.readHost, 107 Path: fmt.Sprintf("/%s/%s", o.bucket, o.object), 108 } 109 verb := "GET" 110 if length == 0 { 111 verb = "HEAD" 112 } 113 req, err := http.NewRequest(verb, u.String(), nil) 114 if err != nil { 115 return nil, err 116 } 117 req = req.WithContext(ctx) 118 if o.userProject != "" { 119 req.Header.Set("X-Goog-User-Project", o.userProject) 120 } 121 if o.readCompressed { 122 req.Header.Set("Accept-Encoding", "gzip") 123 } 124 if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil { 125 return nil, err 126 } 127 128 gen := o.gen 129 130 // Define a function that initiates a Read with offset and length, assuming we 131 // have already read seen bytes. 132 reopen := func(seen int64) (*http.Response, error) { 133 start := offset + seen 134 if length < 0 && start < 0 { 135 req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) 136 } else if length < 0 && start > 0 { 137 req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start)) 138 } else if length > 0 { 139 // The end character isn't affected by how many bytes we've seen. 140 req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1)) 141 } 142 // We wait to assign conditions here because the generation number can change in between reopen() runs. 143 req.URL.RawQuery = conditionsQuery(gen, o.conds) 144 var res *http.Response 145 err = runWithRetry(ctx, func() error { 146 res, err = o.c.hc.Do(req) 147 if err != nil { 148 return err 149 } 150 if res.StatusCode == http.StatusNotFound { 151 res.Body.Close() 152 return ErrObjectNotExist 153 } 154 if res.StatusCode < 200 || res.StatusCode > 299 { 155 body, _ := ioutil.ReadAll(res.Body) 156 res.Body.Close() 157 return &googleapi.Error{ 158 Code: res.StatusCode, 159 Header: res.Header, 160 Body: string(body), 161 } 162 } 163 if start > 0 && length != 0 && res.StatusCode != http.StatusPartialContent { 164 res.Body.Close() 165 return errors.New("storage: partial request not satisfied") 166 } 167 // If a generation hasn't been specified, and this is the first response we get, let's record the 168 // generation. In future requests we'll use this generation as a precondition to avoid data races. 169 if gen < 0 && res.Header.Get("X-Goog-Generation") != "" { 170 gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64) 171 if err != nil { 172 return err 173 } 174 gen = gen64 175 } 176 return nil 177 }) 178 if err != nil { 179 return nil, err 180 } 181 return res, nil 182 } 183 184 res, err := reopen(0) 185 if err != nil { 186 return nil, err 187 } 188 var ( 189 size int64 // total size of object, even if a range was requested. 190 checkCRC bool 191 crc uint32 192 startOffset int64 // non-zero if range request. 193 ) 194 if res.StatusCode == http.StatusPartialContent { 195 cr := strings.TrimSpace(res.Header.Get("Content-Range")) 196 if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") { 197 return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) 198 } 199 size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64) 200 if err != nil { 201 return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) 202 } 203 204 dashIndex := strings.Index(cr, "-") 205 if dashIndex >= 0 { 206 startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64) 207 if err != nil { 208 return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err) 209 } 210 } 211 } else { 212 size = res.ContentLength 213 // Check the CRC iff all of the following hold: 214 // - We asked for content (length != 0). 215 // - We got all the content (status != PartialContent). 216 // - The server sent a CRC header. 217 // - The Go http stack did not uncompress the file. 218 // - We were not served compressed data that was uncompressed on download. 219 // The problem with the last two cases is that the CRC will not match -- GCS 220 // computes it on the compressed contents, but we compute it on the 221 // uncompressed contents. 222 if length != 0 && !res.Uncompressed && !uncompressedByServer(res) { 223 crc, checkCRC = parseCRC32c(res) 224 } 225 } 226 227 remain := res.ContentLength 228 body := res.Body 229 if length == 0 { 230 remain = 0 231 body.Close() 232 body = emptyBody 233 } 234 var metaGen int64 235 if res.Header.Get("X-Goog-Metageneration") != "" { 236 metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64) 237 if err != nil { 238 return nil, err 239 } 240 } 241 242 var lm time.Time 243 if res.Header.Get("Last-Modified") != "" { 244 lm, err = http.ParseTime(res.Header.Get("Last-Modified")) 245 if err != nil { 246 return nil, err 247 } 248 } 249 250 attrs := ReaderObjectAttrs{ 251 Size: size, 252 ContentType: res.Header.Get("Content-Type"), 253 ContentEncoding: res.Header.Get("Content-Encoding"), 254 CacheControl: res.Header.Get("Cache-Control"), 255 LastModified: lm, 256 StartOffset: startOffset, 257 Generation: gen, 258 Metageneration: metaGen, 259 } 260 return &Reader{ 261 Attrs: attrs, 262 body: body, 263 size: size, 264 remain: remain, 265 wantCRC: crc, 266 checkCRC: checkCRC, 267 reopen: reopen, 268 }, nil 269} 270 271func uncompressedByServer(res *http.Response) bool { 272 // If the data is stored as gzip but is not encoded as gzip, then it 273 // was uncompressed by the server. 274 return res.Header.Get("X-Goog-Stored-Content-Encoding") == "gzip" && 275 res.Header.Get("Content-Encoding") != "gzip" 276} 277 278func parseCRC32c(res *http.Response) (uint32, bool) { 279 const prefix = "crc32c=" 280 for _, spec := range res.Header["X-Goog-Hash"] { 281 if strings.HasPrefix(spec, prefix) { 282 c, err := decodeUint32(spec[len(prefix):]) 283 if err == nil { 284 return c, true 285 } 286 } 287 } 288 return 0, false 289} 290 291var emptyBody = ioutil.NopCloser(strings.NewReader("")) 292 293// Reader reads a Cloud Storage object. 294// It implements io.Reader. 295// 296// Typically, a Reader computes the CRC of the downloaded content and compares it to 297// the stored CRC, returning an error from Read if there is a mismatch. This integrity check 298// is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding. 299type Reader struct { 300 Attrs ReaderObjectAttrs 301 body io.ReadCloser 302 seen, remain, size int64 303 checkCRC bool // should we check the CRC? 304 wantCRC uint32 // the CRC32c value the server sent in the header 305 gotCRC uint32 // running crc 306 reopen func(seen int64) (*http.Response, error) 307} 308 309// Close closes the Reader. It must be called when done reading. 310func (r *Reader) Close() error { 311 return r.body.Close() 312} 313 314func (r *Reader) Read(p []byte) (int, error) { 315 n, err := r.readWithRetry(p) 316 if r.remain != -1 { 317 r.remain -= int64(n) 318 } 319 if r.checkCRC { 320 r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n]) 321 // Check CRC here. It would be natural to check it in Close, but 322 // everybody defers Close on the assumption that it doesn't return 323 // anything worth looking at. 324 if err == io.EOF { 325 if r.gotCRC != r.wantCRC { 326 return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d", 327 r.gotCRC, r.wantCRC) 328 } 329 } 330 } 331 return n, err 332} 333 334func (r *Reader) readWithRetry(p []byte) (int, error) { 335 n := 0 336 for len(p[n:]) > 0 { 337 m, err := r.body.Read(p[n:]) 338 n += m 339 r.seen += int64(m) 340 if !shouldRetryRead(err) { 341 return n, err 342 } 343 // Read failed, but we will try again. Send a ranged read request that takes 344 // into account the number of bytes we've already seen. 345 res, err := r.reopen(r.seen) 346 if err != nil { 347 // reopen already retries 348 return n, err 349 } 350 r.body.Close() 351 r.body = res.Body 352 } 353 return n, nil 354} 355 356func shouldRetryRead(err error) bool { 357 if err == nil { 358 return false 359 } 360 return strings.HasSuffix(err.Error(), "INTERNAL_ERROR") && strings.Contains(reflect.TypeOf(err).String(), "http2") 361} 362 363// Size returns the size of the object in bytes. 364// The returned value is always the same and is not affected by 365// calls to Read or Close. 366// 367// Deprecated: use Reader.Attrs.Size. 368func (r *Reader) Size() int64 { 369 return r.Attrs.Size 370} 371 372// Remain returns the number of bytes left to read, or -1 if unknown. 373func (r *Reader) Remain() int64 { 374 return r.remain 375} 376 377// ContentType returns the content type of the object. 378// 379// Deprecated: use Reader.Attrs.ContentType. 380func (r *Reader) ContentType() string { 381 return r.Attrs.ContentType 382} 383 384// ContentEncoding returns the content encoding of the object. 385// 386// Deprecated: use Reader.Attrs.ContentEncoding. 387func (r *Reader) ContentEncoding() string { 388 return r.Attrs.ContentEncoding 389} 390 391// CacheControl returns the cache control of the object. 392// 393// Deprecated: use Reader.Attrs.CacheControl. 394func (r *Reader) CacheControl() string { 395 return r.Attrs.CacheControl 396} 397 398// LastModified returns the value of the Last-Modified header. 399// 400// Deprecated: use Reader.Attrs.LastModified. 401func (r *Reader) LastModified() (time.Time, error) { 402 return r.Attrs.LastModified, nil 403} 404