1// Copyright 2011 Google Inc. All rights reserved. 2// Use of this source code is governed by the Apache 2.0 3// license that can be found in the LICENSE file. 4 5/* 6Package delay provides a way to execute code outside the scope of a 7user request by using the taskqueue API. 8 9To declare a function that may be executed later, call Func 10in a top-level assignment context, passing it an arbitrary string key 11and a function whose first argument is of type context.Context. 12The key is used to look up the function so it can be called later. 13 var laterFunc = delay.Func("key", myFunc) 14It is also possible to use a function literal. 15 var laterFunc = delay.Func("key", func(c context.Context, x string) { 16 // ... 17 }) 18 19To call a function, invoke its Call method. 20 laterFunc.Call(c, "something") 21A function may be called any number of times. If the function has any 22return arguments, and the last one is of type error, the function may 23return a non-nil error to signal that the function should be retried. 24 25The arguments to functions may be of any type that is encodable by the gob 26package. If an argument is of interface type, it is the client's responsibility 27to register with the gob package whatever concrete type may be passed for that 28argument; see http://golang.org/pkg/gob/#Register for details. 29 30Any errors during initialization or execution of a function will be 31logged to the application logs. Error logs that occur during initialization will 32be associated with the request that invoked the Call method. 33 34The state of a function invocation that has not yet successfully 35executed is preserved by combining the file name in which it is declared 36with the string key that was passed to the Func function. Updating an app 37with pending function invocations should safe as long as the relevant 38functions have the (filename, key) combination preserved. The filename is 39parsed according to these rules: 40 * Paths in package main are shortened to just the file name (github.com/foo/foo.go -> foo.go) 41 * Paths are stripped to just package paths (/go/src/github.com/foo/bar.go -> github.com/foo/bar.go) 42 * Module versions are stripped (/go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go -> github.com/foo/bar/baz.go) 43 44There is some inherent risk of pending function invocations being lost during 45an update that contains large changes. For example, switching from using GOPATH 46to go.mod is a large change that may inadvertently cause file paths to change. 47 48The delay package uses the Task Queue API to create tasks that call the 49reserved application path "/_ah/queue/go/delay". 50This path must not be marked as "login: required" in app.yaml; 51it must be marked as "login: admin" or have no access restriction. 52*/ 53package delay // import "google.golang.org/appengine/delay" 54 55import ( 56 "bytes" 57 stdctx "context" 58 "encoding/gob" 59 "errors" 60 "fmt" 61 "go/build" 62 stdlog "log" 63 "net/http" 64 "path/filepath" 65 "reflect" 66 "regexp" 67 "runtime" 68 "strings" 69 70 "golang.org/x/net/context" 71 72 "google.golang.org/appengine" 73 "google.golang.org/appengine/internal" 74 "google.golang.org/appengine/log" 75 "google.golang.org/appengine/taskqueue" 76) 77 78// Function represents a function that may have a delayed invocation. 79type Function struct { 80 fv reflect.Value // Kind() == reflect.Func 81 key string 82 err error // any error during initialization 83} 84 85const ( 86 // The HTTP path for invocations. 87 path = "/_ah/queue/go/delay" 88 // Use the default queue. 89 queue = "" 90) 91 92type contextKey int 93 94var ( 95 // registry of all delayed functions 96 funcs = make(map[string]*Function) 97 98 // precomputed types 99 errorType = reflect.TypeOf((*error)(nil)).Elem() 100 101 // errors 102 errFirstArg = errors.New("first argument must be context.Context") 103 errOutsideDelayFunc = errors.New("request headers are only available inside a delay.Func") 104 105 // context keys 106 headersContextKey contextKey = 0 107 stdContextType = reflect.TypeOf((*stdctx.Context)(nil)).Elem() 108 netContextType = reflect.TypeOf((*context.Context)(nil)).Elem() 109) 110 111func isContext(t reflect.Type) bool { 112 return t == stdContextType || t == netContextType 113} 114 115var modVersionPat = regexp.MustCompile("@v[^/]+") 116 117// fileKey finds a stable representation of the caller's file path. 118// For calls from package main: strip all leading path entries, leaving just the filename. 119// For calls from anywhere else, strip $GOPATH/src, leaving just the package path and file path. 120func fileKey(file string) (string, error) { 121 if !internal.IsSecondGen() { 122 return file, nil 123 } 124 // If the caller is in the same Dir as mainPath, then strip everything but the file name. 125 if filepath.Dir(file) == internal.MainPath { 126 return filepath.Base(file), nil 127 } 128 // If the path contains "gopath/src/", which is what the builder uses for 129 // apps which don't use go modules, strip everything up to and including src. 130 // Or, if the path starts with /tmp/staging, then we're importing a package 131 // from the app's module (and we must be using go modules), and we have a 132 // path like /tmp/staging1234/srv/... so strip everything up to and 133 // including the first /srv/. 134 // And be sure to look at the GOPATH, for local development. 135 s := string(filepath.Separator) 136 for _, s := range []string{filepath.Join("gopath", "src") + s, s + "srv" + s, filepath.Join(build.Default.GOPATH, "src") + s} { 137 if idx := strings.Index(file, s); idx > 0 { 138 return file[idx+len(s):], nil 139 } 140 } 141 142 // Finally, if that all fails then we must be using go modules, and the file is a module, 143 // so the path looks like /go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go 144 // So... remove everything up to and including mod, plus the @.... version string. 145 m := "/mod/" 146 if idx := strings.Index(file, m); idx > 0 { 147 file = file[idx+len(m):] 148 } else { 149 return file, fmt.Errorf("fileKey: unknown file path format for %q", file) 150 } 151 return modVersionPat.ReplaceAllString(file, ""), nil 152} 153 154// Func declares a new Function. The second argument must be a function with a 155// first argument of type context.Context. 156// This function must be called at program initialization time. That means it 157// must be called in a global variable declaration or from an init function. 158// This restriction is necessary because the instance that delays a function 159// call may not be the one that executes it. Only the code executed at program 160// initialization time is guaranteed to have been run by an instance before it 161// receives a request. 162func Func(key string, i interface{}) *Function { 163 f := &Function{fv: reflect.ValueOf(i)} 164 165 // Derive unique, somewhat stable key for this func. 166 _, file, _, _ := runtime.Caller(1) 167 fk, err := fileKey(file) 168 if err != nil { 169 // Not fatal, but log the error 170 stdlog.Printf("delay: %v", err) 171 } 172 f.key = fk + ":" + key 173 174 t := f.fv.Type() 175 if t.Kind() != reflect.Func { 176 f.err = errors.New("not a function") 177 return f 178 } 179 if t.NumIn() == 0 || !isContext(t.In(0)) { 180 f.err = errFirstArg 181 return f 182 } 183 184 // Register the function's arguments with the gob package. 185 // This is required because they are marshaled inside a []interface{}. 186 // gob.Register only expects to be called during initialization; 187 // that's fine because this function expects the same. 188 for i := 0; i < t.NumIn(); i++ { 189 // Only concrete types may be registered. If the argument has 190 // interface type, the client is resposible for registering the 191 // concrete types it will hold. 192 if t.In(i).Kind() == reflect.Interface { 193 continue 194 } 195 gob.Register(reflect.Zero(t.In(i)).Interface()) 196 } 197 198 if old := funcs[f.key]; old != nil { 199 old.err = fmt.Errorf("multiple functions registered for %s in %s", key, file) 200 } 201 funcs[f.key] = f 202 return f 203} 204 205type invocation struct { 206 Key string 207 Args []interface{} 208} 209 210// Call invokes a delayed function. 211// err := f.Call(c, ...) 212// is equivalent to 213// t, _ := f.Task(...) 214// _, err := taskqueue.Add(c, t, "") 215func (f *Function) Call(c context.Context, args ...interface{}) error { 216 t, err := f.Task(args...) 217 if err != nil { 218 return err 219 } 220 _, err = taskqueueAdder(c, t, queue) 221 return err 222} 223 224// Task creates a Task that will invoke the function. 225// Its parameters may be tweaked before adding it to a queue. 226// Users should not modify the Path or Payload fields of the returned Task. 227func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) { 228 if f.err != nil { 229 return nil, fmt.Errorf("delay: func is invalid: %v", f.err) 230 } 231 232 nArgs := len(args) + 1 // +1 for the context.Context 233 ft := f.fv.Type() 234 minArgs := ft.NumIn() 235 if ft.IsVariadic() { 236 minArgs-- 237 } 238 if nArgs < minArgs { 239 return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs) 240 } 241 if !ft.IsVariadic() && nArgs > minArgs { 242 return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs) 243 } 244 245 // Check arg types. 246 for i := 1; i < nArgs; i++ { 247 at := reflect.TypeOf(args[i-1]) 248 var dt reflect.Type 249 if i < minArgs { 250 // not a variadic arg 251 dt = ft.In(i) 252 } else { 253 // a variadic arg 254 dt = ft.In(minArgs).Elem() 255 } 256 // nil arguments won't have a type, so they need special handling. 257 if at == nil { 258 // nil interface 259 switch dt.Kind() { 260 case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: 261 continue // may be nil 262 } 263 return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt) 264 } 265 switch at.Kind() { 266 case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: 267 av := reflect.ValueOf(args[i-1]) 268 if av.IsNil() { 269 // nil value in interface; not supported by gob, so we replace it 270 // with a nil interface value 271 args[i-1] = nil 272 } 273 } 274 if !at.AssignableTo(dt) { 275 return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt) 276 } 277 } 278 279 inv := invocation{ 280 Key: f.key, 281 Args: args, 282 } 283 284 buf := new(bytes.Buffer) 285 if err := gob.NewEncoder(buf).Encode(inv); err != nil { 286 return nil, fmt.Errorf("delay: gob encoding failed: %v", err) 287 } 288 289 return &taskqueue.Task{ 290 Path: path, 291 Payload: buf.Bytes(), 292 }, nil 293} 294 295// Request returns the special task-queue HTTP request headers for the current 296// task queue handler. Returns an error if called from outside a delay.Func. 297func RequestHeaders(c context.Context) (*taskqueue.RequestHeaders, error) { 298 if ret, ok := c.Value(headersContextKey).(*taskqueue.RequestHeaders); ok { 299 return ret, nil 300 } 301 return nil, errOutsideDelayFunc 302} 303 304var taskqueueAdder = taskqueue.Add // for testing 305 306func init() { 307 http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) { 308 runFunc(appengine.NewContext(req), w, req) 309 }) 310} 311 312func runFunc(c context.Context, w http.ResponseWriter, req *http.Request) { 313 defer req.Body.Close() 314 315 c = context.WithValue(c, headersContextKey, taskqueue.ParseRequestHeaders(req.Header)) 316 317 var inv invocation 318 if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil { 319 log.Errorf(c, "delay: failed decoding task payload: %v", err) 320 log.Warningf(c, "delay: dropping task") 321 return 322 } 323 324 f := funcs[inv.Key] 325 if f == nil { 326 log.Errorf(c, "delay: no func with key %q found", inv.Key) 327 log.Warningf(c, "delay: dropping task") 328 return 329 } 330 331 ft := f.fv.Type() 332 in := []reflect.Value{reflect.ValueOf(c)} 333 for _, arg := range inv.Args { 334 var v reflect.Value 335 if arg != nil { 336 v = reflect.ValueOf(arg) 337 } else { 338 // Task was passed a nil argument, so we must construct 339 // the zero value for the argument here. 340 n := len(in) // we're constructing the nth argument 341 var at reflect.Type 342 if !ft.IsVariadic() || n < ft.NumIn()-1 { 343 at = ft.In(n) 344 } else { 345 at = ft.In(ft.NumIn() - 1).Elem() 346 } 347 v = reflect.Zero(at) 348 } 349 in = append(in, v) 350 } 351 out := f.fv.Call(in) 352 353 if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType { 354 if errv := out[n-1]; !errv.IsNil() { 355 log.Errorf(c, "delay: func failed (will retry): %v", errv.Interface()) 356 w.WriteHeader(http.StatusInternalServerError) 357 return 358 } 359 } 360} 361