1// Copyright 2016 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gensupport
6
7import (
8	"context"
9	"errors"
10	"fmt"
11	"io"
12	"net/http"
13	"sync"
14	"time"
15)
16
17const (
18	// statusTooManyRequests is returned by the storage API if the
19	// per-project limits have been temporarily exceeded. The request
20	// should be retried.
21	// https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
22	statusTooManyRequests = 429
23)
24
25// ResumableUpload is used by the generated APIs to provide resumable uploads.
26// It is not used by developers directly.
27type ResumableUpload struct {
28	Client *http.Client
29	// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
30	URI       string
31	UserAgent string // User-Agent for header of the request
32	// Media is the object being uploaded.
33	Media *MediaBuffer
34	// MediaType defines the media type, e.g. "image/jpeg".
35	MediaType string
36
37	mu       sync.Mutex // guards progress
38	progress int64      // number of bytes uploaded so far
39
40	// Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
41	Callback func(int64)
42
43	// If not specified, a default exponential backoff strategy will be used.
44	Backoff BackoffStrategy
45}
46
47// Progress returns the number of bytes uploaded at this point.
48func (rx *ResumableUpload) Progress() int64 {
49	rx.mu.Lock()
50	defer rx.mu.Unlock()
51	return rx.progress
52}
53
54// doUploadRequest performs a single HTTP request to upload data.
55// off specifies the offset in rx.Media from which data is drawn.
56// size is the number of bytes in data.
57// final specifies whether data is the final chunk to be uploaded.
58func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
59	req, err := http.NewRequest("POST", rx.URI, data)
60	if err != nil {
61		return nil, err
62	}
63
64	req.ContentLength = size
65	var contentRange string
66	if final {
67		if size == 0 {
68			contentRange = fmt.Sprintf("bytes */%v", off)
69		} else {
70			contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
71		}
72	} else {
73		contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
74	}
75	req.Header.Set("Content-Range", contentRange)
76	req.Header.Set("Content-Type", rx.MediaType)
77	req.Header.Set("User-Agent", rx.UserAgent)
78
79	// Google's upload endpoint uses status code 308 for a
80	// different purpose than the "308 Permanent Redirect"
81	// since-standardized in RFC 7238. Because of the conflict in
82	// semantics, Google added this new request header which
83	// causes it to not use "308" and instead reply with 200 OK
84	// and sets the upload-specific "X-HTTP-Status-Code-Override:
85	// 308" response header.
86	req.Header.Set("X-GUploader-No-308", "yes")
87
88	return SendRequest(ctx, rx.Client, req)
89}
90
91func statusResumeIncomplete(resp *http.Response) bool {
92	// This is how the server signals "status resume incomplete"
93	// when X-GUploader-No-308 is set to "yes":
94	return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
95}
96
97// reportProgress calls a user-supplied callback to report upload progress.
98// If old==updated, the callback is not called.
99func (rx *ResumableUpload) reportProgress(old, updated int64) {
100	if updated-old == 0 {
101		return
102	}
103	rx.mu.Lock()
104	rx.progress = updated
105	rx.mu.Unlock()
106	if rx.Callback != nil {
107		rx.Callback(updated)
108	}
109}
110
111// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
112func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
113	chunk, off, size, err := rx.Media.Chunk()
114
115	done := err == io.EOF
116	if !done && err != nil {
117		return nil, err
118	}
119
120	res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
121	if err != nil {
122		return res, err
123	}
124
125	// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
126	// this file), so we don't expect to get a 308.
127	if res.StatusCode == 308 {
128		return nil, errors.New("unexpected 308 response status code")
129	}
130
131	if res.StatusCode == http.StatusOK {
132		rx.reportProgress(off, off+int64(size))
133	}
134
135	if statusResumeIncomplete(res) {
136		rx.Media.Next()
137	}
138	return res, nil
139}
140
141func contextDone(ctx context.Context) bool {
142	select {
143	case <-ctx.Done():
144		return true
145	default:
146		return false
147	}
148}
149
150// Upload starts the process of a resumable upload with a cancellable context.
151// It retries using the provided back off strategy until cancelled or the
152// strategy indicates to stop retrying.
153// It is called from the auto-generated API code and is not visible to the user.
154// Before sending an HTTP request, Upload calls any registered hook functions,
155// and calls the returned functions after the request returns (see send.go).
156// rx is private to the auto-generated API code.
157// Exactly one of resp or err will be nil.  If resp is non-nil, the caller must call resp.Body.Close.
158func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
159	var pause time.Duration
160	backoff := rx.Backoff
161	if backoff == nil {
162		backoff = DefaultBackoffStrategy()
163	}
164
165	for {
166		// Ensure that we return in the case of cancelled context, even if pause is 0.
167		if contextDone(ctx) {
168			return nil, ctx.Err()
169		}
170		select {
171		case <-ctx.Done():
172			return nil, ctx.Err()
173		case <-time.After(pause):
174		}
175
176		resp, err = rx.transferChunk(ctx)
177
178		var status int
179		if resp != nil {
180			status = resp.StatusCode
181		}
182
183		// Check if we should retry the request.
184		if shouldRetry(status, err) {
185			var retry bool
186			pause, retry = backoff.Pause()
187			if retry {
188				if resp != nil && resp.Body != nil {
189					resp.Body.Close()
190				}
191				continue
192			}
193		}
194
195		// If the chunk was uploaded successfully, but there's still
196		// more to go, upload the next chunk without any delay.
197		if statusResumeIncomplete(resp) {
198			pause = 0
199			backoff.Reset()
200			resp.Body.Close()
201			continue
202		}
203
204		// It's possible for err and resp to both be non-nil here, but we expose a simpler
205		// contract to our callers: exactly one of resp and err will be non-nil.  This means
206		// that any response body must be closed here before returning a non-nil error.
207		if err != nil {
208			if resp != nil && resp.Body != nil {
209				resp.Body.Close()
210			}
211			return nil, err
212		}
213
214		return resp, nil
215	}
216}
217