1// Copyright 2016 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package gensupport 6 7import ( 8 "context" 9 "errors" 10 "fmt" 11 "io" 12 "net/http" 13 "sync" 14 "time" 15) 16 17const ( 18 // statusTooManyRequests is returned by the storage API if the 19 // per-project limits have been temporarily exceeded. The request 20 // should be retried. 21 // https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes 22 statusTooManyRequests = 429 23) 24 25// ResumableUpload is used by the generated APIs to provide resumable uploads. 26// It is not used by developers directly. 27type ResumableUpload struct { 28 Client *http.Client 29 // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable". 30 URI string 31 UserAgent string // User-Agent for header of the request 32 // Media is the object being uploaded. 33 Media *MediaBuffer 34 // MediaType defines the media type, e.g. "image/jpeg". 35 MediaType string 36 37 mu sync.Mutex // guards progress 38 progress int64 // number of bytes uploaded so far 39 40 // Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded. 41 Callback func(int64) 42 43 // If not specified, a default exponential backoff strategy will be used. 44 Backoff BackoffStrategy 45} 46 47// Progress returns the number of bytes uploaded at this point. 48func (rx *ResumableUpload) Progress() int64 { 49 rx.mu.Lock() 50 defer rx.mu.Unlock() 51 return rx.progress 52} 53 54// doUploadRequest performs a single HTTP request to upload data. 55// off specifies the offset in rx.Media from which data is drawn. 56// size is the number of bytes in data. 57// final specifies whether data is the final chunk to be uploaded. 58func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) { 59 req, err := http.NewRequest("POST", rx.URI, data) 60 if err != nil { 61 return nil, err 62 } 63 64 req.ContentLength = size 65 var contentRange string 66 if final { 67 if size == 0 { 68 contentRange = fmt.Sprintf("bytes */%v", off) 69 } else { 70 contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size) 71 } 72 } else { 73 contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1) 74 } 75 req.Header.Set("Content-Range", contentRange) 76 req.Header.Set("Content-Type", rx.MediaType) 77 req.Header.Set("User-Agent", rx.UserAgent) 78 79 // Google's upload endpoint uses status code 308 for a 80 // different purpose than the "308 Permanent Redirect" 81 // since-standardized in RFC 7238. Because of the conflict in 82 // semantics, Google added this new request header which 83 // causes it to not use "308" and instead reply with 200 OK 84 // and sets the upload-specific "X-HTTP-Status-Code-Override: 85 // 308" response header. 86 req.Header.Set("X-GUploader-No-308", "yes") 87 88 return SendRequest(ctx, rx.Client, req) 89} 90 91func statusResumeIncomplete(resp *http.Response) bool { 92 // This is how the server signals "status resume incomplete" 93 // when X-GUploader-No-308 is set to "yes": 94 return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308" 95} 96 97// reportProgress calls a user-supplied callback to report upload progress. 98// If old==updated, the callback is not called. 99func (rx *ResumableUpload) reportProgress(old, updated int64) { 100 if updated-old == 0 { 101 return 102 } 103 rx.mu.Lock() 104 rx.progress = updated 105 rx.mu.Unlock() 106 if rx.Callback != nil { 107 rx.Callback(updated) 108 } 109} 110 111// transferChunk performs a single HTTP request to upload a single chunk from rx.Media. 112func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) { 113 chunk, off, size, err := rx.Media.Chunk() 114 115 done := err == io.EOF 116 if !done && err != nil { 117 return nil, err 118 } 119 120 res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done) 121 if err != nil { 122 return res, err 123 } 124 125 // We sent "X-GUploader-No-308: yes" (see comment elsewhere in 126 // this file), so we don't expect to get a 308. 127 if res.StatusCode == 308 { 128 return nil, errors.New("unexpected 308 response status code") 129 } 130 131 if res.StatusCode == http.StatusOK { 132 rx.reportProgress(off, off+int64(size)) 133 } 134 135 if statusResumeIncomplete(res) { 136 rx.Media.Next() 137 } 138 return res, nil 139} 140 141func contextDone(ctx context.Context) bool { 142 select { 143 case <-ctx.Done(): 144 return true 145 default: 146 return false 147 } 148} 149 150// Upload starts the process of a resumable upload with a cancellable context. 151// It retries using the provided back off strategy until cancelled or the 152// strategy indicates to stop retrying. 153// It is called from the auto-generated API code and is not visible to the user. 154// Before sending an HTTP request, Upload calls any registered hook functions, 155// and calls the returned functions after the request returns (see send.go). 156// rx is private to the auto-generated API code. 157// Exactly one of resp or err will be nil. If resp is non-nil, the caller must call resp.Body.Close. 158func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) { 159 var pause time.Duration 160 backoff := rx.Backoff 161 if backoff == nil { 162 backoff = DefaultBackoffStrategy() 163 } 164 165 for { 166 // Ensure that we return in the case of cancelled context, even if pause is 0. 167 if contextDone(ctx) { 168 return nil, ctx.Err() 169 } 170 select { 171 case <-ctx.Done(): 172 return nil, ctx.Err() 173 case <-time.After(pause): 174 } 175 176 resp, err = rx.transferChunk(ctx) 177 178 var status int 179 if resp != nil { 180 status = resp.StatusCode 181 } 182 183 // Check if we should retry the request. 184 if shouldRetry(status, err) { 185 var retry bool 186 pause, retry = backoff.Pause() 187 if retry { 188 if resp != nil && resp.Body != nil { 189 resp.Body.Close() 190 } 191 continue 192 } 193 } 194 195 // If the chunk was uploaded successfully, but there's still 196 // more to go, upload the next chunk without any delay. 197 if statusResumeIncomplete(resp) { 198 pause = 0 199 backoff.Reset() 200 resp.Body.Close() 201 continue 202 } 203 204 // It's possible for err and resp to both be non-nil here, but we expose a simpler 205 // contract to our callers: exactly one of resp and err will be non-nil. This means 206 // that any response body must be closed here before returning a non-nil error. 207 if err != nil { 208 if resp != nil && resp.Body != nil { 209 resp.Body.Close() 210 } 211 return nil, err 212 } 213 214 return resp, nil 215 } 216} 217