1// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5/*
6Package delay provides a way to execute code outside the scope of a
7user request by using the taskqueue API.
8
9To declare a function that may be executed later, call Func
10in a top-level assignment context, passing it an arbitrary string key
11and a function whose first argument is of type context.Context.
12The key is used to look up the function so it can be called later.
13	var laterFunc = delay.Func("key", myFunc)
14It is also possible to use a function literal.
15	var laterFunc = delay.Func("key", func(c context.Context, x string) {
16		// ...
17	})
18
19To call a function, invoke its Call method.
20	laterFunc.Call(c, "something")
21A function may be called any number of times. If the function has any
22return arguments, and the last one is of type error, the function may
23return a non-nil error to signal that the function should be retried.
24
25The arguments to functions may be of any type that is encodable by the gob
26package. If an argument is of interface type, it is the client's responsibility
27to register with the gob package whatever concrete type may be passed for that
28argument; see http://golang.org/pkg/gob/#Register for details.
29
30Any errors during initialization or execution of a function will be
31logged to the application logs. Error logs that occur during initialization will
32be associated with the request that invoked the Call method.
33
34The state of a function invocation that has not yet successfully
35executed is preserved by combining the file name in which it is declared
36with the string key that was passed to the Func function. Updating an app
37with pending function invocations should safe as long as the relevant
38functions have the (filename, key) combination preserved. The filename is
39parsed according to these rules:
40  * Paths in package main are shortened to just the file name (github.com/foo/foo.go -> foo.go)
41  * Paths are stripped to just package paths (/go/src/github.com/foo/bar.go -> github.com/foo/bar.go)
42  * Module versions are stripped (/go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go -> github.com/foo/bar/baz.go)
43
44There is some inherent risk of pending function invocations being lost during
45an update that contains large changes. For example, switching from using GOPATH
46to go.mod is a large change that may inadvertently cause file paths to change.
47
48The delay package uses the Task Queue API to create tasks that call the
49reserved application path "/_ah/queue/go/delay".
50This path must not be marked as "login: required" in app.yaml;
51it must be marked as "login: admin" or have no access restriction.
52*/
53package delay // import "google.golang.org/appengine/delay"
54
55import (
56	"bytes"
57	stdctx "context"
58	"encoding/gob"
59	"errors"
60	"fmt"
61	"go/build"
62	stdlog "log"
63	"net/http"
64	"path/filepath"
65	"reflect"
66	"regexp"
67	"runtime"
68	"strings"
69
70	"golang.org/x/net/context"
71
72	"google.golang.org/appengine"
73	"google.golang.org/appengine/internal"
74	"google.golang.org/appengine/log"
75	"google.golang.org/appengine/taskqueue"
76)
77
78// Function represents a function that may have a delayed invocation.
79type Function struct {
80	fv  reflect.Value // Kind() == reflect.Func
81	key string
82	err error // any error during initialization
83}
84
85const (
86	// The HTTP path for invocations.
87	path = "/_ah/queue/go/delay"
88	// Use the default queue.
89	queue = ""
90)
91
92type contextKey int
93
94var (
95	// registry of all delayed functions
96	funcs = make(map[string]*Function)
97
98	// precomputed types
99	errorType = reflect.TypeOf((*error)(nil)).Elem()
100
101	// errors
102	errFirstArg         = errors.New("first argument must be context.Context")
103	errOutsideDelayFunc = errors.New("request headers are only available inside a delay.Func")
104
105	// context keys
106	headersContextKey contextKey = 0
107	stdContextType               = reflect.TypeOf((*stdctx.Context)(nil)).Elem()
108	netContextType               = reflect.TypeOf((*context.Context)(nil)).Elem()
109)
110
111func isContext(t reflect.Type) bool {
112	return t == stdContextType || t == netContextType
113}
114
115var modVersionPat = regexp.MustCompile("@v[^/]+")
116
117// fileKey finds a stable representation of the caller's file path.
118// For calls from package main: strip all leading path entries, leaving just the filename.
119// For calls from anywhere else, strip $GOPATH/src, leaving just the package path and file path.
120func fileKey(file string) (string, error) {
121	if !internal.IsSecondGen() {
122		return file, nil
123	}
124	// If the caller is in the same Dir as mainPath, then strip everything but the file name.
125	if filepath.Dir(file) == internal.MainPath {
126		return filepath.Base(file), nil
127	}
128	// If the path contains "gopath/src/", which is what the builder uses for
129	// apps which don't use go modules, strip everything up to and including src.
130	// Or, if the path starts with /tmp/staging, then we're importing a package
131	// from the app's module (and we must be using go modules), and we have a
132	// path like /tmp/staging1234/srv/... so strip everything up to and
133	// including the first /srv/.
134	// And be sure to look at the GOPATH, for local development.
135	s := string(filepath.Separator)
136	for _, s := range []string{filepath.Join("gopath", "src") + s, s + "srv" + s, filepath.Join(build.Default.GOPATH, "src") + s} {
137		if idx := strings.Index(file, s); idx > 0 {
138			return file[idx+len(s):], nil
139		}
140	}
141
142	// Finally, if that all fails then we must be using go modules, and the file is a module,
143	// so the path looks like /go/pkg/mod/github.com/foo/bar@v0.0.0-20181026220418-f595d03440dc/baz.go
144	// So... remove everything up to and including mod, plus the @.... version string.
145	m := "/mod/"
146	if idx := strings.Index(file, m); idx > 0 {
147		file = file[idx+len(m):]
148	} else {
149		return file, fmt.Errorf("fileKey: unknown file path format for %q", file)
150	}
151	return modVersionPat.ReplaceAllString(file, ""), nil
152}
153
154// Func declares a new Function. The second argument must be a function with a
155// first argument of type context.Context.
156// This function must be called at program initialization time. That means it
157// must be called in a global variable declaration or from an init function.
158// This restriction is necessary because the instance that delays a function
159// call may not be the one that executes it. Only the code executed at program
160// initialization time is guaranteed to have been run by an instance before it
161// receives a request.
162func Func(key string, i interface{}) *Function {
163	f := &Function{fv: reflect.ValueOf(i)}
164
165	// Derive unique, somewhat stable key for this func.
166	_, file, _, _ := runtime.Caller(1)
167	fk, err := fileKey(file)
168	if err != nil {
169		// Not fatal, but log the error
170		stdlog.Printf("delay: %v", err)
171	}
172	f.key = fk + ":" + key
173
174	t := f.fv.Type()
175	if t.Kind() != reflect.Func {
176		f.err = errors.New("not a function")
177		return f
178	}
179	if t.NumIn() == 0 || !isContext(t.In(0)) {
180		f.err = errFirstArg
181		return f
182	}
183
184	// Register the function's arguments with the gob package.
185	// This is required because they are marshaled inside a []interface{}.
186	// gob.Register only expects to be called during initialization;
187	// that's fine because this function expects the same.
188	for i := 0; i < t.NumIn(); i++ {
189		// Only concrete types may be registered. If the argument has
190		// interface type, the client is resposible for registering the
191		// concrete types it will hold.
192		if t.In(i).Kind() == reflect.Interface {
193			continue
194		}
195		gob.Register(reflect.Zero(t.In(i)).Interface())
196	}
197
198	if old := funcs[f.key]; old != nil {
199		old.err = fmt.Errorf("multiple functions registered for %s in %s", key, file)
200	}
201	funcs[f.key] = f
202	return f
203}
204
205type invocation struct {
206	Key  string
207	Args []interface{}
208}
209
210// Call invokes a delayed function.
211//   err := f.Call(c, ...)
212// is equivalent to
213//   t, _ := f.Task(...)
214//   _, err := taskqueue.Add(c, t, "")
215func (f *Function) Call(c context.Context, args ...interface{}) error {
216	t, err := f.Task(args...)
217	if err != nil {
218		return err
219	}
220	_, err = taskqueueAdder(c, t, queue)
221	return err
222}
223
224// Task creates a Task that will invoke the function.
225// Its parameters may be tweaked before adding it to a queue.
226// Users should not modify the Path or Payload fields of the returned Task.
227func (f *Function) Task(args ...interface{}) (*taskqueue.Task, error) {
228	if f.err != nil {
229		return nil, fmt.Errorf("delay: func is invalid: %v", f.err)
230	}
231
232	nArgs := len(args) + 1 // +1 for the context.Context
233	ft := f.fv.Type()
234	minArgs := ft.NumIn()
235	if ft.IsVariadic() {
236		minArgs--
237	}
238	if nArgs < minArgs {
239		return nil, fmt.Errorf("delay: too few arguments to func: %d < %d", nArgs, minArgs)
240	}
241	if !ft.IsVariadic() && nArgs > minArgs {
242		return nil, fmt.Errorf("delay: too many arguments to func: %d > %d", nArgs, minArgs)
243	}
244
245	// Check arg types.
246	for i := 1; i < nArgs; i++ {
247		at := reflect.TypeOf(args[i-1])
248		var dt reflect.Type
249		if i < minArgs {
250			// not a variadic arg
251			dt = ft.In(i)
252		} else {
253			// a variadic arg
254			dt = ft.In(minArgs).Elem()
255		}
256		// nil arguments won't have a type, so they need special handling.
257		if at == nil {
258			// nil interface
259			switch dt.Kind() {
260			case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
261				continue // may be nil
262			}
263			return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not nilable", i, dt)
264		}
265		switch at.Kind() {
266		case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
267			av := reflect.ValueOf(args[i-1])
268			if av.IsNil() {
269				// nil value in interface; not supported by gob, so we replace it
270				// with a nil interface value
271				args[i-1] = nil
272			}
273		}
274		if !at.AssignableTo(dt) {
275			return nil, fmt.Errorf("delay: argument %d has wrong type: %v is not assignable to %v", i, at, dt)
276		}
277	}
278
279	inv := invocation{
280		Key:  f.key,
281		Args: args,
282	}
283
284	buf := new(bytes.Buffer)
285	if err := gob.NewEncoder(buf).Encode(inv); err != nil {
286		return nil, fmt.Errorf("delay: gob encoding failed: %v", err)
287	}
288
289	return &taskqueue.Task{
290		Path:    path,
291		Payload: buf.Bytes(),
292	}, nil
293}
294
295// Request returns the special task-queue HTTP request headers for the current
296// task queue handler. Returns an error if called from outside a delay.Func.
297func RequestHeaders(c context.Context) (*taskqueue.RequestHeaders, error) {
298	if ret, ok := c.Value(headersContextKey).(*taskqueue.RequestHeaders); ok {
299		return ret, nil
300	}
301	return nil, errOutsideDelayFunc
302}
303
304var taskqueueAdder = taskqueue.Add // for testing
305
306func init() {
307	http.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
308		runFunc(appengine.NewContext(req), w, req)
309	})
310}
311
312func runFunc(c context.Context, w http.ResponseWriter, req *http.Request) {
313	defer req.Body.Close()
314
315	c = context.WithValue(c, headersContextKey, taskqueue.ParseRequestHeaders(req.Header))
316
317	var inv invocation
318	if err := gob.NewDecoder(req.Body).Decode(&inv); err != nil {
319		log.Errorf(c, "delay: failed decoding task payload: %v", err)
320		log.Warningf(c, "delay: dropping task")
321		return
322	}
323
324	f := funcs[inv.Key]
325	if f == nil {
326		log.Errorf(c, "delay: no func with key %q found", inv.Key)
327		log.Warningf(c, "delay: dropping task")
328		return
329	}
330
331	ft := f.fv.Type()
332	in := []reflect.Value{reflect.ValueOf(c)}
333	for _, arg := range inv.Args {
334		var v reflect.Value
335		if arg != nil {
336			v = reflect.ValueOf(arg)
337		} else {
338			// Task was passed a nil argument, so we must construct
339			// the zero value for the argument here.
340			n := len(in) // we're constructing the nth argument
341			var at reflect.Type
342			if !ft.IsVariadic() || n < ft.NumIn()-1 {
343				at = ft.In(n)
344			} else {
345				at = ft.In(ft.NumIn() - 1).Elem()
346			}
347			v = reflect.Zero(at)
348		}
349		in = append(in, v)
350	}
351	out := f.fv.Call(in)
352
353	if n := ft.NumOut(); n > 0 && ft.Out(n-1) == errorType {
354		if errv := out[n-1]; !errv.IsNil() {
355			log.Errorf(c, "delay: func failed (will retry): %v", errv.Interface())
356			w.WriteHeader(http.StatusInternalServerError)
357			return
358		}
359	}
360}
361