1// Copyright 2019 The Go Cloud Development Kit Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package httpvar provides a runtimevar implementation with variables
16// backed by http endpoint. Use OpenVariable to construct a *runtimevar.Variable.
17//
18// URLs
19//
20// For runtimevar.OpenVariable, httpvar registers for the schemes "http" and
21// "https". The default URL opener will use http.DefaultClient.
22// To customize the URL opener, or for more details on the URL format,
23// see URLOpener.
24// See https://gocloud.dev/concepts/urls/ for background information.
25//
26// As
27//
28// httpvar exposes the following types for As:
29//  - Snapshot: *http.Response
30//  - Error: httpvar.RequestError, url.Error
31package httpvar // import "gocloud.dev/runtimevar/httpvar"
32
33import (
34	"bytes"
35	"context"
36	"fmt"
37	"io/ioutil"
38	"net/http"
39	"net/url"
40	"time"
41
42	"gocloud.dev/gcerrors"
43	"gocloud.dev/internal/gcerr"
44	"gocloud.dev/runtimevar"
45	"gocloud.dev/runtimevar/driver"
46)
47
48func init() {
49	o := &URLOpener{Client: http.DefaultClient}
50	for _, scheme := range Schemes {
51		runtimevar.DefaultURLMux().RegisterVariable(scheme, o)
52	}
53}
54
55// Schemes are the URL schemes httpvar registers its URLOpener under on runtimevar.DefaultMux.
56var Schemes = []string{"http", "https"}
57
58// URLOpener opens HTTP URLs like "http://myserver.com/foo.txt".
59//
60// The full URL, including scheme, is used as the endpoint, except that the
61// the following URL parameters are removed if present:
62//   - decoder: The decoder to use. Defaults to runtimevar.BytesDecoder.
63//       See runtimevar.DecoderByName for supported values.
64type URLOpener struct {
65	// The Client to use; required.
66	Client *http.Client
67
68	// Decoder specifies the decoder to use if one is not specified in the URL.
69	// Defaults to runtimevar.BytesDecoder.
70	Decoder *runtimevar.Decoder
71
72	// Options specifies the options to pass to OpenVariable.
73	Options Options
74}
75
76// OpenVariableURL opens a httpvar Variable for u.
77func (o *URLOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) {
78	// Clone u because we may strip some query parameters.
79	u2 := *u
80	q := u2.Query()
81
82	decoderName := q.Get("decoder")
83	q.Del("decoder")
84	decoder, err := runtimevar.DecoderByName(ctx, decoderName, o.Decoder)
85	if err != nil {
86		return nil, fmt.Errorf("open variable %v: invalid decoder: %v", u, err)
87	}
88	// See if we changed the query parameters.
89	if rawq := q.Encode(); rawq != u.Query().Encode() {
90		u2.RawQuery = rawq
91	}
92	return OpenVariable(o.Client, u2.String(), decoder, &o.Options)
93}
94
95// Options sets options.
96type Options struct {
97	// WaitDuration controls the rate at which the HTTP endpoint is called to check for changes.
98	// Defaults to 30 seconds.
99	WaitDuration time.Duration
100}
101
102// RequestError represents an HTTP error that occurred during endpoint call.
103type RequestError struct {
104	Response *http.Response
105}
106
107func (e *RequestError) Error() string {
108	return fmt.Sprintf("httpvar: received status code %d", e.Response.StatusCode)
109}
110
111func newRequestError(response *http.Response) *RequestError {
112	return &RequestError{Response: response}
113}
114
115// OpenVariable constructs a *runtimevar.Variable that uses client
116// to retrieve the variable contents from the URL urlStr.
117func OpenVariable(client *http.Client, urlStr string, decoder *runtimevar.Decoder, opts *Options) (*runtimevar.Variable, error) {
118	endpointURL, err := url.Parse(urlStr)
119	if err != nil {
120		return nil, fmt.Errorf("httpvar: failed to parse url %q: %v", urlStr, err)
121	}
122
123	return runtimevar.New(newWatcher(client, endpointURL, decoder, opts)), nil
124}
125
126type state struct {
127	val        interface{}
128	raw        *http.Response
129	rawBytes   []byte
130	updateTime time.Time
131	err        error
132}
133
134// Value implements driver.State.Value.
135func (s *state) Value() (interface{}, error) {
136	return s.val, s.err
137}
138
139// UpdateTime implements driver.State.UpdateTime.
140func (s *state) UpdateTime() time.Time {
141	return s.updateTime
142}
143
144// As implements driver.State.As.
145func (s *state) As(i interface{}) bool {
146	if s.raw == nil {
147		return false
148	}
149	p, ok := i.(**http.Response)
150	if !ok {
151		return false
152	}
153	*p = s.raw
154	return true
155}
156
157// errorState returns a new State with err, unless prevS also represents
158// the same error, in which case it returns nil.
159func errorState(err error, prevS driver.State) driver.State {
160	s := &state{err: err}
161	if prevS == nil {
162		return s
163	}
164	prev := prevS.(*state)
165	if prev.err == nil {
166		// New error.
167		return s
168	}
169	if equivalentError(err, prev.err) {
170		// Same error, return nil to indicate no change.
171		return nil
172	}
173	return s
174}
175
176// equivalentError returns true if err1 and err2 represent an equivalent error;
177// i.e., we don't want to return it to the user as a different error.
178func equivalentError(err1, err2 error) bool {
179	if err1 == err2 || err1.Error() == err2.Error() {
180		return true
181	}
182	var code1, code2 int
183	if e, ok := err1.(*RequestError); ok {
184		code1 = e.Response.StatusCode
185	}
186	if e, ok := err2.(*RequestError); ok {
187		code2 = e.Response.StatusCode
188	}
189	return code1 != 0 && code1 == code2
190}
191
192// watcher implements driver.Watcher for configurations provided by the Runtime Configurator
193// service.
194type watcher struct {
195	client   *http.Client
196	endpoint *url.URL
197	decoder  *runtimevar.Decoder
198	wait     time.Duration
199}
200
201// WatchVariable implements driver.WatchVariable.
202func (w *watcher) WatchVariable(ctx context.Context, prev driver.State) (driver.State, time.Duration) {
203	req, err := http.NewRequestWithContext(ctx, http.MethodGet, w.endpoint.String(), nil)
204	if err != nil {
205		return errorState(err, prev), w.wait
206	}
207	resp, err := w.client.Do(req)
208	if err != nil {
209		return errorState(err, prev), w.wait
210	}
211	defer resp.Body.Close()
212
213	if resp.StatusCode != http.StatusOK {
214		err := newRequestError(resp)
215		return errorState(err, prev), w.wait
216	}
217
218	respBodyBytes, err := ioutil.ReadAll(resp.Body)
219	if err != nil {
220		return errorState(err, prev), w.wait
221	}
222
223	// When endpoint returns the same response again, we return nil as state to not trigger variable update.
224	if prev != nil && bytes.Equal(respBodyBytes, prev.(*state).rawBytes) {
225		return nil, w.wait
226	}
227
228	val, err := w.decoder.Decode(ctx, respBodyBytes)
229	if err != nil {
230		return errorState(err, prev), w.wait
231	}
232
233	return &state{
234		val:        val,
235		raw:        resp,
236		rawBytes:   respBodyBytes,
237		updateTime: time.Now(),
238	}, w.wait
239}
240
241// Close implements driver.Close.
242func (w *watcher) Close() error {
243	return nil
244}
245
246// ErrorAs implements driver.ErrorAs.
247func (w *watcher) ErrorAs(err error, i interface{}) bool {
248	switch v := err.(type) {
249	case *url.Error:
250		if p, ok := i.(*url.Error); ok {
251			*p = *v
252			return true
253		}
254	case *RequestError:
255		if p, ok := i.(*RequestError); ok {
256			*p = *v
257			return true
258		}
259	}
260	return false
261}
262
263// ErrorCode implements driver.ErrorCode.
264func (*watcher) ErrorCode(err error) gcerrors.ErrorCode {
265	if requestErr, ok := err.(*RequestError); ok {
266		switch requestErr.Response.StatusCode {
267		case http.StatusBadRequest:
268			return gcerr.InvalidArgument
269		case http.StatusNotFound:
270			return gcerr.NotFound
271		case http.StatusUnauthorized:
272			return gcerr.PermissionDenied
273		case http.StatusGatewayTimeout, http.StatusRequestTimeout:
274			return gcerr.DeadlineExceeded
275		case http.StatusInternalServerError, http.StatusServiceUnavailable, http.StatusBadGateway:
276			return gcerr.Internal
277		}
278	}
279	return gcerr.Unknown
280}
281
282func newWatcher(client *http.Client, endpoint *url.URL, decoder *runtimevar.Decoder, opts *Options) driver.Watcher {
283	if opts == nil {
284		opts = &Options{}
285	}
286	return &watcher{
287		client:   client,
288		endpoint: endpoint,
289		decoder:  decoder,
290		wait:     driver.WaitDuration(opts.WaitDuration),
291	}
292}
293