1// Copyright 2019 The Go Cloud Development Kit Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package httpvar provides a runtimevar implementation with variables 16// backed by http endpoint. Use OpenVariable to construct a *runtimevar.Variable. 17// 18// URLs 19// 20// For runtimevar.OpenVariable, httpvar registers for the schemes "http" and 21// "https". The default URL opener will use http.DefaultClient. 22// To customize the URL opener, or for more details on the URL format, 23// see URLOpener. 24// See https://gocloud.dev/concepts/urls/ for background information. 25// 26// As 27// 28// httpvar exposes the following types for As: 29// - Snapshot: *http.Response 30// - Error: httpvar.RequestError, url.Error 31package httpvar // import "gocloud.dev/runtimevar/httpvar" 32 33import ( 34 "bytes" 35 "context" 36 "fmt" 37 "io/ioutil" 38 "net/http" 39 "net/url" 40 "time" 41 42 "gocloud.dev/gcerrors" 43 "gocloud.dev/internal/gcerr" 44 "gocloud.dev/runtimevar" 45 "gocloud.dev/runtimevar/driver" 46) 47 48func init() { 49 o := &URLOpener{Client: http.DefaultClient} 50 for _, scheme := range Schemes { 51 runtimevar.DefaultURLMux().RegisterVariable(scheme, o) 52 } 53} 54 55// Schemes are the URL schemes httpvar registers its URLOpener under on runtimevar.DefaultMux. 56var Schemes = []string{"http", "https"} 57 58// URLOpener opens HTTP URLs like "http://myserver.com/foo.txt". 59// 60// The full URL, including scheme, is used as the endpoint, except that the 61// the following URL parameters are removed if present: 62// - decoder: The decoder to use. Defaults to runtimevar.BytesDecoder. 63// See runtimevar.DecoderByName for supported values. 64type URLOpener struct { 65 // The Client to use; required. 66 Client *http.Client 67 68 // Decoder specifies the decoder to use if one is not specified in the URL. 69 // Defaults to runtimevar.BytesDecoder. 70 Decoder *runtimevar.Decoder 71 72 // Options specifies the options to pass to OpenVariable. 73 Options Options 74} 75 76// OpenVariableURL opens a httpvar Variable for u. 77func (o *URLOpener) OpenVariableURL(ctx context.Context, u *url.URL) (*runtimevar.Variable, error) { 78 // Clone u because we may strip some query parameters. 79 u2 := *u 80 q := u2.Query() 81 82 decoderName := q.Get("decoder") 83 q.Del("decoder") 84 decoder, err := runtimevar.DecoderByName(ctx, decoderName, o.Decoder) 85 if err != nil { 86 return nil, fmt.Errorf("open variable %v: invalid decoder: %v", u, err) 87 } 88 // See if we changed the query parameters. 89 if rawq := q.Encode(); rawq != u.Query().Encode() { 90 u2.RawQuery = rawq 91 } 92 return OpenVariable(o.Client, u2.String(), decoder, &o.Options) 93} 94 95// Options sets options. 96type Options struct { 97 // WaitDuration controls the rate at which the HTTP endpoint is called to check for changes. 98 // Defaults to 30 seconds. 99 WaitDuration time.Duration 100} 101 102// RequestError represents an HTTP error that occurred during endpoint call. 103type RequestError struct { 104 Response *http.Response 105} 106 107func (e *RequestError) Error() string { 108 return fmt.Sprintf("httpvar: received status code %d", e.Response.StatusCode) 109} 110 111func newRequestError(response *http.Response) *RequestError { 112 return &RequestError{Response: response} 113} 114 115// OpenVariable constructs a *runtimevar.Variable that uses client 116// to retrieve the variable contents from the URL urlStr. 117func OpenVariable(client *http.Client, urlStr string, decoder *runtimevar.Decoder, opts *Options) (*runtimevar.Variable, error) { 118 endpointURL, err := url.Parse(urlStr) 119 if err != nil { 120 return nil, fmt.Errorf("httpvar: failed to parse url %q: %v", urlStr, err) 121 } 122 123 return runtimevar.New(newWatcher(client, endpointURL, decoder, opts)), nil 124} 125 126type state struct { 127 val interface{} 128 raw *http.Response 129 rawBytes []byte 130 updateTime time.Time 131 err error 132} 133 134// Value implements driver.State.Value. 135func (s *state) Value() (interface{}, error) { 136 return s.val, s.err 137} 138 139// UpdateTime implements driver.State.UpdateTime. 140func (s *state) UpdateTime() time.Time { 141 return s.updateTime 142} 143 144// As implements driver.State.As. 145func (s *state) As(i interface{}) bool { 146 if s.raw == nil { 147 return false 148 } 149 p, ok := i.(**http.Response) 150 if !ok { 151 return false 152 } 153 *p = s.raw 154 return true 155} 156 157// errorState returns a new State with err, unless prevS also represents 158// the same error, in which case it returns nil. 159func errorState(err error, prevS driver.State) driver.State { 160 s := &state{err: err} 161 if prevS == nil { 162 return s 163 } 164 prev := prevS.(*state) 165 if prev.err == nil { 166 // New error. 167 return s 168 } 169 if equivalentError(err, prev.err) { 170 // Same error, return nil to indicate no change. 171 return nil 172 } 173 return s 174} 175 176// equivalentError returns true if err1 and err2 represent an equivalent error; 177// i.e., we don't want to return it to the user as a different error. 178func equivalentError(err1, err2 error) bool { 179 if err1 == err2 || err1.Error() == err2.Error() { 180 return true 181 } 182 var code1, code2 int 183 if e, ok := err1.(*RequestError); ok { 184 code1 = e.Response.StatusCode 185 } 186 if e, ok := err2.(*RequestError); ok { 187 code2 = e.Response.StatusCode 188 } 189 return code1 != 0 && code1 == code2 190} 191 192// watcher implements driver.Watcher for configurations provided by the Runtime Configurator 193// service. 194type watcher struct { 195 client *http.Client 196 endpoint *url.URL 197 decoder *runtimevar.Decoder 198 wait time.Duration 199} 200 201// WatchVariable implements driver.WatchVariable. 202func (w *watcher) WatchVariable(ctx context.Context, prev driver.State) (driver.State, time.Duration) { 203 req, err := http.NewRequestWithContext(ctx, http.MethodGet, w.endpoint.String(), nil) 204 if err != nil { 205 return errorState(err, prev), w.wait 206 } 207 resp, err := w.client.Do(req) 208 if err != nil { 209 return errorState(err, prev), w.wait 210 } 211 defer resp.Body.Close() 212 213 if resp.StatusCode != http.StatusOK { 214 err := newRequestError(resp) 215 return errorState(err, prev), w.wait 216 } 217 218 respBodyBytes, err := ioutil.ReadAll(resp.Body) 219 if err != nil { 220 return errorState(err, prev), w.wait 221 } 222 223 // When endpoint returns the same response again, we return nil as state to not trigger variable update. 224 if prev != nil && bytes.Equal(respBodyBytes, prev.(*state).rawBytes) { 225 return nil, w.wait 226 } 227 228 val, err := w.decoder.Decode(ctx, respBodyBytes) 229 if err != nil { 230 return errorState(err, prev), w.wait 231 } 232 233 return &state{ 234 val: val, 235 raw: resp, 236 rawBytes: respBodyBytes, 237 updateTime: time.Now(), 238 }, w.wait 239} 240 241// Close implements driver.Close. 242func (w *watcher) Close() error { 243 return nil 244} 245 246// ErrorAs implements driver.ErrorAs. 247func (w *watcher) ErrorAs(err error, i interface{}) bool { 248 switch v := err.(type) { 249 case *url.Error: 250 if p, ok := i.(*url.Error); ok { 251 *p = *v 252 return true 253 } 254 case *RequestError: 255 if p, ok := i.(*RequestError); ok { 256 *p = *v 257 return true 258 } 259 } 260 return false 261} 262 263// ErrorCode implements driver.ErrorCode. 264func (*watcher) ErrorCode(err error) gcerrors.ErrorCode { 265 if requestErr, ok := err.(*RequestError); ok { 266 switch requestErr.Response.StatusCode { 267 case http.StatusBadRequest: 268 return gcerr.InvalidArgument 269 case http.StatusNotFound: 270 return gcerr.NotFound 271 case http.StatusUnauthorized: 272 return gcerr.PermissionDenied 273 case http.StatusGatewayTimeout, http.StatusRequestTimeout: 274 return gcerr.DeadlineExceeded 275 case http.StatusInternalServerError, http.StatusServiceUnavailable, http.StatusBadGateway: 276 return gcerr.Internal 277 } 278 } 279 return gcerr.Unknown 280} 281 282func newWatcher(client *http.Client, endpoint *url.URL, decoder *runtimevar.Decoder, opts *Options) driver.Watcher { 283 if opts == nil { 284 opts = &Options{} 285 } 286 return &watcher{ 287 client: client, 288 endpoint: endpoint, 289 decoder: decoder, 290 wait: driver.WaitDuration(opts.WaitDuration), 291 } 292} 293