1package solver
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/moby/buildkit/solver/internal/pipe"
9	digest "github.com/opencontainers/go-digest"
10	"github.com/pkg/errors"
11	"github.com/sirupsen/logrus"
12)
13
14type edgeStatusType int
15
16const (
17	edgeStatusInitial edgeStatusType = iota
18	edgeStatusCacheFast
19	edgeStatusCacheSlow
20	edgeStatusComplete
21)
22
23func (t edgeStatusType) String() string {
24	return []string{"initial", "cache-fast", "cache-slow", "complete"}[t]
25}
26
27func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
28	e := &edge{
29		edge:               ed,
30		op:                 op,
31		depRequests:        map[pipe.Receiver]*dep{},
32		keyMap:             map[string]struct{}{},
33		cacheRecords:       map[string]*CacheRecord{},
34		cacheRecordsLoaded: map[string]struct{}{},
35		index:              index,
36	}
37	return e
38}
39
40type edge struct {
41	edge Edge
42	op   activeOp
43
44	edgeState
45	depRequests map[pipe.Receiver]*dep
46	deps        []*dep
47
48	cacheMapReq        pipe.Receiver
49	cacheMapDone       bool
50	cacheMapIndex      int
51	cacheMapDigests    []digest.Digest
52	execReq            pipe.Receiver
53	execCacheLoad      bool
54	err                error
55	cacheRecords       map[string]*CacheRecord
56	cacheRecordsLoaded map[string]struct{}
57	keyMap             map[string]struct{}
58
59	noCacheMatchPossible      bool
60	allDepsCompletedCacheFast bool
61	allDepsCompletedCacheSlow bool
62	allDepsStateCacheSlow     bool
63	allDepsCompleted          bool
64	hasActiveOutgoing         bool
65
66	releaserCount int
67	keysDidChange bool
68	index         *edgeIndex
69
70	secondaryExporters []expDep
71}
72
73// dep holds state for a dependant edge
74type dep struct {
75	req pipe.Receiver
76	edgeState
77	index             Index
78	keyMap            map[string]*CacheKey
79	slowCacheReq      pipe.Receiver
80	slowCacheComplete bool
81	slowCacheFoundKey bool
82	slowCacheKey      *ExportableCacheKey
83	err               error
84}
85
86// expDep holds secorndary exporter info for dependency
87type expDep struct {
88	index    int
89	cacheKey CacheKeyWithSelector
90}
91
92func newDep(i Index) *dep {
93	return &dep{index: i, keyMap: map[string]*CacheKey{}}
94}
95
96// edgePipe is a pipe for requests between two edges
97type edgePipe struct {
98	*pipe.Pipe
99	From, Target *edge
100	mu           sync.Mutex
101}
102
103// edgeState hold basic mutable state info for an edge
104type edgeState struct {
105	state    edgeStatusType
106	result   *SharedCachedResult
107	cacheMap *CacheMap
108	keys     []ExportableCacheKey
109}
110
111type edgeRequest struct {
112	desiredState edgeStatusType
113	currentState edgeState
114	currentKeys  int
115}
116
117// incrementReferenceCount increases the number of times release needs to be
118// called to release the edge. Called on merging edges.
119func (e *edge) incrementReferenceCount() {
120	e.releaserCount++
121}
122
123// release releases the edge resources
124func (e *edge) release() {
125	if e.releaserCount > 0 {
126		e.releaserCount--
127		return
128	}
129	e.index.Release(e)
130	if e.result != nil {
131		go e.result.Release(context.TODO())
132	}
133}
134
135// commitOptions returns parameters for the op execution
136func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) {
137	k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
138	if len(e.deps) == 0 {
139		keys := make([]*CacheKey, 0, len(e.cacheMapDigests))
140		for _, dgst := range e.cacheMapDigests {
141			keys = append(keys, NewCacheKey(dgst, e.edge.Index))
142		}
143		return keys, nil
144	}
145
146	inputs := make([][]CacheKeyWithSelector, len(e.deps))
147	results := make([]CachedResult, len(e.deps))
148	for i, dep := range e.deps {
149		for _, k := range dep.result.CacheKeys() {
150			inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector})
151		}
152		if dep.slowCacheKey != nil {
153			inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
154		}
155		results[i] = dep.result
156	}
157
158	k.deps = inputs
159	return []*CacheKey{k}, results
160}
161
162// isComplete returns true if edge state is final and will never change
163func (e *edge) isComplete() bool {
164	return e.err != nil || e.result != nil
165}
166
167// finishIncoming finalizes the incoming pipe request
168func (e *edge) finishIncoming(req pipe.Sender) {
169	err := e.err
170	if req.Request().Canceled && err == nil {
171		err = context.Canceled
172	}
173	if debugScheduler {
174		logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
175	}
176	req.Finalize(&e.edgeState, err)
177}
178
179// updateIncoming updates the current value of incoming pipe request
180func (e *edge) updateIncoming(req pipe.Sender) {
181	if debugScheduler {
182		logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
183	}
184	req.Update(&e.edgeState)
185}
186
187// probeCache is called with unprocessed cache keys for dependency
188// if the key could match the edge, the cacheRecords for dependency are filled
189func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
190	if len(depKeys) == 0 {
191		return false
192	}
193	if e.op.IgnoreCache() {
194		return false
195	}
196	keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
197	if err != nil {
198		e.err = errors.Wrap(err, "error on cache query")
199	}
200	found := false
201	for _, k := range keys {
202		if _, ok := d.keyMap[k.ID]; !ok {
203			d.keyMap[k.ID] = k
204			found = true
205		}
206	}
207	return found
208}
209
210// checkDepMatchPossible checks if any cache matches are possible past this point
211func (e *edge) checkDepMatchPossible(dep *dep) {
212	depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
213	if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keyMap) == 0) {
214		e.noCacheMatchPossible = true
215	}
216}
217
218// slowCacheFunc returns the result based cache func for dependency if it exists
219func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
220	if e.cacheMap == nil {
221		return nil
222	}
223	return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
224}
225
226// preprocessFunc returns result based cache func
227func (e *edge) preprocessFunc(dep *dep) PreprocessFunc {
228	if e.cacheMap == nil {
229		return nil
230	}
231	return e.cacheMap.Deps[int(dep.index)].PreprocessFunc
232}
233
234// allDepsHaveKeys checks if all dependencies have at least one key. used for
235// determining if there is enough data for combining cache key for edge
236func (e *edge) allDepsHaveKeys(matching bool) bool {
237	if e.cacheMap == nil {
238		return false
239	}
240	for _, d := range e.deps {
241		cond := len(d.keys) == 0
242		if matching {
243			cond = len(d.keyMap) == 0
244		}
245		if cond && d.slowCacheKey == nil && d.result == nil {
246			return false
247		}
248	}
249	return true
250}
251
252// depKeys returns all current dependency cache keys
253func (e *edge) currentIndexKey() *CacheKey {
254	if e.cacheMap == nil {
255		return nil
256	}
257
258	keys := make([][]CacheKeyWithSelector, len(e.deps))
259	for i, d := range e.deps {
260		if len(d.keys) == 0 && d.result == nil {
261			return nil
262		}
263		for _, k := range d.keys {
264			keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
265		}
266		if d.result != nil {
267			for _, rk := range d.result.CacheKeys() {
268				keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk})
269			}
270			if d.slowCacheKey != nil {
271				keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
272			}
273		}
274	}
275
276	k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
277	k.deps = keys
278
279	return k
280}
281
282// slow cache keys can be computed in 2 phases if there are multiple deps.
283// first evaluate ones that didn't match any definition based keys
284func (e *edge) skipPhase2SlowCache(dep *dep) bool {
285	isPhase1 := false
286	for _, dep := range e.deps {
287		if (!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil || dep.state < edgeStatusCacheSlow) && len(dep.keyMap) == 0 {
288			isPhase1 = true
289			break
290		}
291	}
292
293	if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
294		return true
295	}
296	return false
297}
298
299func (e *edge) skipPhase2FastCache(dep *dep) bool {
300	isPhase1 := false
301	for _, dep := range e.deps {
302		if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
303			isPhase1 = true
304			break
305		}
306	}
307
308	if isPhase1 && len(dep.keyMap) > 0 {
309		return true
310	}
311	return false
312}
313
314// unpark is called by the scheduler with incoming requests and updates for
315// previous calls.
316// To avoid deadlocks and resource leaks this function needs to follow
317// following rules:
318// 1) this function needs to return unclosed outgoing requests if some incoming
319//    requests were not completed
320// 2) this function may not return outgoing requests if it has completed all
321//    incoming requests
322func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
323	// process all incoming changes
324	depChanged := false
325	for _, upt := range updates {
326		if changed := e.processUpdate(upt); changed {
327			depChanged = true
328		}
329	}
330
331	if depChanged {
332		// the dep responses had changes. need to reevaluate edge state
333		e.recalcCurrentState()
334	}
335
336	desiredState, done := e.respondToIncoming(incoming, allPipes)
337	if done {
338		return
339	}
340
341	cacheMapReq := false
342	// set up new outgoing requests if needed
343	if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) {
344		index := e.cacheMapIndex
345		e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
346			cm, err := e.op.CacheMap(ctx, index)
347			return cm, errors.Wrap(err, "failed to load cache key")
348		})
349		cacheMapReq = true
350	}
351
352	// execute op
353	if e.execReq == nil && desiredState == edgeStatusComplete {
354		if ok := e.execIfPossible(f); ok {
355			return
356		}
357	}
358
359	if e.execReq == nil {
360		if added := e.createInputRequests(desiredState, f, false); !added && !e.hasActiveOutgoing && !cacheMapReq {
361			logrus.Errorf("buildkit scheluding error: leaving incoming open. forcing solve. Please report this with BUILDKIT_SCHEDULER_DEBUG=1")
362			debugSchedulerPreUnpark(e, incoming, updates, allPipes)
363			e.createInputRequests(desiredState, f, true)
364		}
365	}
366
367}
368
369func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
370	return ExportableCacheKey{
371		CacheKey: k,
372		Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache},
373	}
374}
375
376func (e *edge) markFailed(f *pipeFactory, err error) {
377	e.err = err
378	e.postpone(f)
379}
380
381// processUpdate is called by unpark for every updated pipe request
382func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
383	// response for cachemap request
384	if upt == e.cacheMapReq && upt.Status().Completed {
385		if err := upt.Status().Err; err != nil {
386			e.cacheMapReq = nil
387			if !upt.Status().Canceled && e.err == nil {
388				e.err = err
389			}
390		} else {
391			resp := upt.Status().Value.(*cacheMapResp)
392			e.cacheMap = resp.CacheMap
393			e.cacheMapDone = resp.complete
394			e.cacheMapIndex++
395			if len(e.deps) == 0 {
396				e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
397				if !e.op.IgnoreCache() {
398					keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
399					if err != nil {
400						logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
401					} else {
402						for _, k := range keys {
403							records, err := e.op.Cache().Records(k)
404							if err != nil {
405								logrus.Errorf("error receiving cache records: %v", err)
406								continue
407							}
408
409							for _, r := range records {
410								e.cacheRecords[r.ID] = r
411							}
412
413							e.keys = append(e.keys, e.makeExportable(k, records))
414						}
415					}
416				}
417				e.state = edgeStatusCacheSlow
418			}
419			if e.allDepsHaveKeys(false) {
420				e.keysDidChange = true
421			}
422			// probe keys that were loaded before cache map
423			for i, dep := range e.deps {
424				e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
425				e.checkDepMatchPossible(dep)
426			}
427			if !e.cacheMapDone {
428				e.cacheMapReq = nil
429			}
430		}
431		return true
432	}
433
434	// response for exec request
435	if upt == e.execReq && upt.Status().Completed {
436		if err := upt.Status().Err; err != nil {
437			e.execReq = nil
438			if e.execCacheLoad {
439				for k := range e.cacheRecordsLoaded {
440					delete(e.cacheRecords, k)
441				}
442			} else if !upt.Status().Canceled && e.err == nil {
443				e.err = err
444			}
445		} else {
446			e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
447			e.state = edgeStatusComplete
448		}
449		return true
450	}
451
452	// response for requests to dependencies
453	if dep, ok := e.depRequests[upt]; ok {
454		if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
455			if e.err == nil {
456				e.err = err
457			}
458			dep.err = err
459		}
460
461		state := upt.Status().Value.(*edgeState)
462
463		if len(dep.keys) < len(state.keys) {
464			newKeys := state.keys[len(dep.keys):]
465			if e.cacheMap != nil {
466				e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
467				dep.edgeState.keys = state.keys
468				if e.allDepsHaveKeys(false) {
469					e.keysDidChange = true
470				}
471			}
472			depChanged = true
473		}
474		if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
475			e.keysDidChange = true
476		}
477
478		recheck := state.state != dep.state
479
480		dep.edgeState = *state
481
482		if recheck && e.cacheMap != nil {
483			e.checkDepMatchPossible(dep)
484			depChanged = true
485		}
486
487		return
488	}
489
490	// response for result based cache function
491	for i, dep := range e.deps {
492		if upt == dep.slowCacheReq && upt.Status().Completed {
493			if err := upt.Status().Err; err != nil {
494				dep.slowCacheReq = nil
495				if !upt.Status().Canceled && e.err == nil {
496					e.err = upt.Status().Err
497				}
498			} else if !dep.slowCacheComplete {
499				dgst := upt.Status().Value.(digest.Digest)
500				if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" {
501					k := NewCacheKey(dgst, -1)
502					dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
503					slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
504					defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
505					for _, dk := range dep.result.CacheKeys() {
506						defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector})
507					}
508					dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})
509
510					// connect def key to slow key
511					e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
512				}
513
514				dep.slowCacheComplete = true
515				e.keysDidChange = true
516				e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
517			}
518			return true
519		}
520	}
521
522	return
523}
524
525// recalcCurrentState is called by unpark to recompute internal state after
526// the state of dependencies has changed
527func (e *edge) recalcCurrentState() {
528	// TODO: fast pass to detect incomplete results
529	newKeys := map[string]*CacheKey{}
530
531	for i, dep := range e.deps {
532		if i == 0 {
533			for id, k := range dep.keyMap {
534				if _, ok := e.keyMap[id]; ok {
535					continue
536				}
537				newKeys[id] = k
538			}
539		} else {
540			for id := range newKeys {
541				if _, ok := dep.keyMap[id]; !ok {
542					delete(newKeys, id)
543				}
544			}
545		}
546		if len(newKeys) == 0 {
547			break
548		}
549	}
550
551	for key := range newKeys {
552		e.keyMap[key] = struct{}{}
553	}
554
555	for _, r := range newKeys {
556		// TODO: add all deps automatically
557		mergedKey := r.clone()
558		mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
559		for i, dep := range e.deps {
560			if dep.result != nil {
561				for _, dk := range dep.result.CacheKeys() {
562					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk})
563				}
564				if dep.slowCacheKey != nil {
565					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
566				}
567			} else {
568				for _, k := range dep.keys {
569					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
570				}
571			}
572		}
573
574		records, err := e.op.Cache().Records(mergedKey)
575		if err != nil {
576			logrus.Errorf("error receiving cache records: %v", err)
577			continue
578		}
579
580		for _, r := range records {
581			if _, ok := e.cacheRecordsLoaded[r.ID]; !ok {
582				e.cacheRecords[r.ID] = r
583			}
584		}
585
586		e.keys = append(e.keys, e.makeExportable(mergedKey, records))
587	}
588
589	// detect lower/upper bound for current state
590	allDepsCompletedCacheFast := e.cacheMap != nil
591	allDepsCompletedCacheSlow := e.cacheMap != nil
592	allDepsStateCacheSlow := true
593	allDepsCompleted := true
594	stLow := edgeStatusInitial    // minimal possible state
595	stHigh := edgeStatusCacheSlow // maximum possible state
596	if e.cacheMap != nil {
597		for _, dep := range e.deps {
598			isSlowCacheIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
599			isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
600
601			if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
602				stLow = dep.state
603				if stLow > edgeStatusCacheSlow {
604					stLow = edgeStatusCacheSlow
605				}
606			}
607			effectiveState := dep.state
608			if dep.state == edgeStatusCacheSlow && isSlowCacheIncomplete {
609				effectiveState = edgeStatusCacheFast
610			}
611			if dep.state == edgeStatusComplete && isSlowCacheIncomplete {
612				effectiveState = edgeStatusCacheFast
613			}
614			if effectiveState < stHigh {
615				stHigh = effectiveState
616			}
617			if isSlowIncomplete || dep.state < edgeStatusComplete {
618				allDepsCompleted = false
619			}
620			if dep.state < edgeStatusCacheFast {
621				allDepsCompletedCacheFast = false
622			}
623			if isSlowCacheIncomplete || dep.state < edgeStatusCacheSlow {
624				allDepsCompletedCacheSlow = false
625			}
626			if dep.state < edgeStatusCacheSlow && len(dep.keyMap) == 0 {
627				allDepsStateCacheSlow = false
628			}
629		}
630		if stLow > e.state {
631			e.state = stLow
632		}
633		if stHigh > e.state {
634			e.state = stHigh
635		}
636		if !e.cacheMapDone && len(e.keys) == 0 {
637			e.state = edgeStatusInitial
638		}
639
640		e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast
641		e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow
642		e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow
643		e.allDepsCompleted = e.cacheMapDone && allDepsCompleted
644
645		if e.allDepsStateCacheSlow && len(e.cacheRecords) > 0 && e.state == edgeStatusCacheFast {
646			openKeys := map[string]struct{}{}
647			for _, dep := range e.deps {
648				isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
649				if !isSlowIncomplete {
650					openDepKeys := map[string]struct{}{}
651					for key := range dep.keyMap {
652						if _, ok := e.keyMap[key]; !ok {
653							openDepKeys[key] = struct{}{}
654						}
655					}
656					if len(openKeys) != 0 {
657						for k := range openKeys {
658							if _, ok := openDepKeys[k]; !ok {
659								delete(openKeys, k)
660							}
661						}
662					} else {
663						openKeys = openDepKeys
664					}
665					if len(openKeys) == 0 {
666						e.state = edgeStatusCacheSlow
667						if debugScheduler {
668							logrus.Debugf("upgrade to cache-slow because no open keys")
669						}
670					}
671				}
672			}
673		}
674	}
675}
676
677// respondToIncoming responds to all incoming requests. completing or
678// updating them when possible
679func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
680	// detect the result state for the requests
681	allIncomingCanComplete := true
682	desiredState := e.state
683	allCanceled := true
684
685	// check incoming requests
686	// check if all requests can be either answered or canceled
687	if !e.isComplete() {
688		for _, req := range incoming {
689			if !req.Request().Canceled {
690				allCanceled = false
691				if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
692					desiredState = r.desiredState
693					if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
694						allIncomingCanComplete = false
695					}
696				}
697			}
698		}
699	}
700
701	// do not set allIncomingCanComplete if active ongoing can modify the state
702	if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing {
703		allIncomingCanComplete = false
704	}
705
706	if debugScheduler {
707		logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
708	}
709
710	if allIncomingCanComplete && e.hasActiveOutgoing {
711		// cancel all current requests
712		for _, p := range allPipes {
713			p.Cancel()
714		}
715
716		// can close all but one requests
717		var leaveOpen pipe.Sender
718		for _, req := range incoming {
719			if !req.Request().Canceled {
720				leaveOpen = req
721				break
722			}
723		}
724		for _, req := range incoming {
725			if leaveOpen == nil || leaveOpen == req {
726				leaveOpen = req
727				continue
728			}
729			e.finishIncoming(req)
730		}
731		return desiredState, true
732	}
733
734	// can complete, finish and return
735	if allIncomingCanComplete && !e.hasActiveOutgoing {
736		for _, req := range incoming {
737			e.finishIncoming(req)
738		}
739		return desiredState, true
740	}
741
742	// update incoming based on current state
743	for _, req := range incoming {
744		r := req.Request().Payload.(*edgeRequest)
745		if req.Request().Canceled {
746			e.finishIncoming(req)
747		} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
748			e.finishIncoming(req)
749		} else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled {
750			e.updateIncoming(req)
751		}
752	}
753	return desiredState, false
754}
755
756// createInputRequests creates new requests for dependencies or async functions
757// that need to complete to continue processing the edge
758func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) bool {
759	addedNew := false
760
761	// initialize deps state
762	if e.deps == nil {
763		e.depRequests = make(map[pipe.Receiver]*dep)
764		e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
765		for i := range e.edge.Vertex.Inputs() {
766			e.deps = append(e.deps, newDep(Index(i)))
767		}
768	}
769
770	// cycle all dependencies. set up outgoing requests if needed
771	for _, dep := range e.deps {
772		desiredStateDep := dep.state
773
774		if e.noCacheMatchPossible || force {
775			desiredStateDep = edgeStatusComplete
776		} else if dep.state == edgeStatusInitial && desiredState > dep.state {
777			desiredStateDep = edgeStatusCacheFast
778		} else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
779			// wait all deps to complete cache fast before continuing with slow cache
780			if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) {
781				if !e.skipPhase2FastCache(dep) && e.cacheMap != nil {
782					desiredStateDep = edgeStatusCacheSlow
783				}
784			}
785		} else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
786			// if all deps have completed cache-slow or content based cache for input is available
787			if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) {
788				if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
789					desiredStateDep = edgeStatusComplete
790				}
791			}
792		} else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
793			if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) {
794				desiredStateDep = edgeStatusComplete
795			}
796		}
797
798		// outgoing request is needed
799		if dep.state < desiredStateDep {
800			addNew := true
801			if dep.req != nil && !dep.req.Status().Completed {
802				if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
803					dep.req.Cancel()
804				} else {
805					addNew = false
806				}
807			}
808			if addNew {
809				req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
810					currentState: dep.edgeState,
811					desiredState: desiredStateDep,
812					currentKeys:  len(dep.keys),
813				})
814				e.depRequests[req] = dep
815				dep.req = req
816				addedNew = true
817			}
818		}
819		// initialize function to compute cache key based on dependency result
820		if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil {
821			pfn := e.preprocessFunc(dep)
822			fn := e.slowCacheFunc(dep)
823			res := dep.result
824			func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) {
825				dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
826					v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res)
827					return v, errors.Wrap(err, "failed to compute cache key")
828				})
829			}(pfn, fn, res, dep.index)
830			addedNew = true
831		}
832	}
833	return addedNew
834}
835
836// execIfPossible creates a request for getting the edge result if there is
837// enough state
838func (e *edge) execIfPossible(f *pipeFactory) bool {
839	if len(e.cacheRecords) > 0 {
840		if e.keysDidChange {
841			e.postpone(f)
842			return true
843		}
844		e.execReq = f.NewFuncRequest(e.loadCache)
845		e.execCacheLoad = true
846		for req := range e.depRequests {
847			req.Cancel()
848		}
849		return true
850	} else if e.allDepsCompleted {
851		if e.keysDidChange {
852			e.postpone(f)
853			return true
854		}
855		e.execReq = f.NewFuncRequest(e.execOp)
856		e.execCacheLoad = false
857		return true
858	}
859	return false
860}
861
862// postpone delays exec to next unpark invocation if we have unprocessed keys
863func (e *edge) postpone(f *pipeFactory) {
864	f.NewFuncRequest(func(context.Context) (interface{}, error) {
865		return nil, nil
866	})
867}
868
869// loadCache creates a request to load edge result from cache
870func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
871	recs := make([]*CacheRecord, 0, len(e.cacheRecords))
872	for _, r := range e.cacheRecords {
873		recs = append(recs, r)
874	}
875
876	rec := getBestResult(recs)
877	e.cacheRecordsLoaded[rec.ID] = struct{}{}
878
879	logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
880	res, err := e.op.LoadCache(ctx, rec)
881	if err != nil {
882		logrus.Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err)
883		return nil, errors.Wrap(err, "failed to load cache")
884	}
885
886	return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil
887}
888
889// execOp creates a request to execute the vertex operation
890func (e *edge) execOp(ctx context.Context) (interface{}, error) {
891	cacheKeys, inputs := e.commitOptions()
892	results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
893	if err != nil {
894		return nil, errors.WithStack(err)
895	}
896
897	index := e.edge.Index
898	if len(results) <= int(index) {
899		return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results))
900	}
901
902	res := results[int(index)]
903
904	for i := range results {
905		if i != int(index) {
906			go results[i].Release(context.TODO())
907		}
908	}
909
910	var exporters []CacheExporter
911
912	for _, cacheKey := range cacheKeys {
913		ck, err := e.op.Cache().Save(cacheKey, res, time.Now())
914		if err != nil {
915			return nil, err
916		}
917
918		if exp, ok := ck.Exporter.(*exporter); ok {
919			exp.edge = e
920		}
921
922		exps := make([]CacheExporter, 0, len(subExporters))
923		for _, exp := range subExporters {
924			exps = append(exps, exp.Exporter)
925		}
926
927		exporters = append(exporters, ck.Exporter)
928		exporters = append(exporters, exps...)
929	}
930
931	ek := make([]ExportableCacheKey, 0, len(cacheKeys))
932	for _, ck := range cacheKeys {
933		ek = append(ek, ExportableCacheKey{
934			CacheKey: ck,
935			Exporter: &mergedExporter{exporters: exporters},
936		})
937	}
938
939	return NewCachedResult(res, ek), nil
940}
941
942func toResultSlice(cres []CachedResult) (out []Result) {
943	out = make([]Result, len(cres))
944	for i := range cres {
945		out[i] = cres[i].(Result)
946	}
947	return out
948}
949
950func isEqualState(s1, s2 edgeState) bool {
951	if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) {
952		return false
953	}
954	return true
955}
956
957func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector {
958	out := make([]CacheKeyWithSelector, len(keys))
959	for i, k := range keys {
960		out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k}
961	}
962	return out
963}
964