1package solver
2
3import (
4	"context"
5	"sync"
6
7	"github.com/moby/buildkit/solver/internal/pipe"
8	digest "github.com/opencontainers/go-digest"
9	"github.com/pkg/errors"
10	"github.com/sirupsen/logrus"
11)
12
13type edgeStatusType int
14
15const (
16	edgeStatusInitial edgeStatusType = iota
17	edgeStatusCacheFast
18	edgeStatusCacheSlow
19	edgeStatusComplete
20)
21
22func (t edgeStatusType) String() string {
23	return []string{"initial", "cache-fast", "cache-slow", "complete"}[t]
24}
25
26func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge {
27	e := &edge{
28		edge:         ed,
29		op:           op,
30		depRequests:  map[pipe.Receiver]*dep{},
31		keyMap:       map[string]*CacheKey{},
32		cacheRecords: map[string]*CacheRecord{},
33		index:        index,
34	}
35	return e
36}
37
38type edge struct {
39	edge Edge
40	op   activeOp
41
42	edgeState
43	depRequests map[pipe.Receiver]*dep
44	deps        []*dep
45
46	cacheMapReq     pipe.Receiver
47	cacheMapDone    bool
48	cacheMapIndex   int
49	cacheMapDigests []digest.Digest
50	execReq         pipe.Receiver
51	err             error
52	cacheRecords    map[string]*CacheRecord
53	keyMap          map[string]*CacheKey
54
55	noCacheMatchPossible      bool
56	allDepsCompletedCacheFast bool
57	allDepsCompletedCacheSlow bool
58	allDepsStateCacheSlow     bool
59	allDepsCompleted          bool
60	hasActiveOutgoing         bool
61
62	releaserCount int
63	keysDidChange bool
64	index         *edgeIndex
65
66	secondaryExporters []expDep
67}
68
69// dep holds state for a dependant edge
70type dep struct {
71	req pipe.Receiver
72	edgeState
73	index             Index
74	keyMap            map[string]*CacheKey
75	desiredState      edgeStatusType
76	e                 *edge
77	slowCacheReq      pipe.Receiver
78	slowCacheComplete bool
79	slowCacheFoundKey bool
80	slowCacheKey      *ExportableCacheKey
81	err               error
82}
83
84// expDep holds secorndary exporter info for dependency
85type expDep struct {
86	index    int
87	cacheKey CacheKeyWithSelector
88}
89
90func newDep(i Index) *dep {
91	return &dep{index: i, keyMap: map[string]*CacheKey{}}
92}
93
94// edgePipe is a pipe for requests between two edges
95type edgePipe struct {
96	*pipe.Pipe
97	From, Target *edge
98	mu           sync.Mutex
99}
100
101// edgeState hold basic mutable state info for an edge
102type edgeState struct {
103	state    edgeStatusType
104	result   *SharedCachedResult
105	cacheMap *CacheMap
106	keys     []ExportableCacheKey
107}
108
109type edgeRequest struct {
110	desiredState edgeStatusType
111	currentState edgeState
112	currentKeys  int
113}
114
115// incrementReferenceCount increases the number of times release needs to be
116// called to release the edge. Called on merging edges.
117func (e *edge) incrementReferenceCount() {
118	e.releaserCount += 1
119}
120
121// release releases the edge resources
122func (e *edge) release() {
123	if e.releaserCount > 0 {
124		e.releaserCount--
125		return
126	}
127	e.index.Release(e)
128	if e.result != nil {
129		go e.result.Release(context.TODO())
130	}
131}
132
133// commitOptions returns parameters for the op execution
134func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) {
135	k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
136	if len(e.deps) == 0 {
137		keys := make([]*CacheKey, 0, len(e.cacheMapDigests))
138		for _, dgst := range e.cacheMapDigests {
139			keys = append(keys, NewCacheKey(dgst, e.edge.Index))
140		}
141		return keys, nil
142	}
143
144	inputs := make([][]CacheKeyWithSelector, len(e.deps))
145	results := make([]CachedResult, len(e.deps))
146	for i, dep := range e.deps {
147		for _, k := range dep.result.CacheKeys() {
148			inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector})
149		}
150		if dep.slowCacheKey != nil {
151			inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
152		}
153		results[i] = dep.result
154	}
155
156	k.deps = inputs
157	return []*CacheKey{k}, results
158}
159
160// isComplete returns true if edge state is final and will never change
161func (e *edge) isComplete() bool {
162	return e.err != nil || e.result != nil
163}
164
165// finishIncoming finalizes the incoming pipe request
166func (e *edge) finishIncoming(req pipe.Sender) {
167	err := e.err
168	if req.Request().Canceled && err == nil {
169		err = context.Canceled
170	}
171	if debugScheduler {
172		logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState)
173	}
174	req.Finalize(&e.edgeState, err)
175}
176
177// updateIncoming updates the current value of incoming pipe request
178func (e *edge) updateIncoming(req pipe.Sender) {
179	req.Update(&e.edgeState)
180}
181
182// probeCache is called with unprocessed cache keys for dependency
183// if the key could match the edge, the cacheRecords for dependency are filled
184func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool {
185	if len(depKeys) == 0 {
186		return false
187	}
188	if e.op.IgnoreCache() {
189		return false
190	}
191	keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index)
192	if err != nil {
193		e.err = errors.Wrap(err, "error on cache query")
194	}
195	found := false
196	for _, k := range keys {
197		if _, ok := d.keyMap[k.ID]; !ok {
198			d.keyMap[k.ID] = k
199			found = true
200		}
201	}
202	return found
203}
204
205// checkDepMatchPossible checks if any cache matches are possible past this point
206func (e *edge) checkDepMatchPossible(dep *dep) {
207	depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil
208	if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keys) == 0) {
209		e.noCacheMatchPossible = true
210	}
211}
212
213// slowCacheFunc returns the result based cache func for dependency if it exists
214func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc {
215	if e.cacheMap == nil {
216		return nil
217	}
218	return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc
219}
220
221// allDepsHaveKeys checks if all dependencies have at least one key. used for
222// determining if there is enough data for combining cache key for edge
223func (e *edge) allDepsHaveKeys() bool {
224	if e.cacheMap == nil {
225		return false
226	}
227	for _, d := range e.deps {
228		if len(d.keys) == 0 && d.slowCacheKey == nil && d.result == nil {
229			return false
230		}
231	}
232	return true
233}
234
235// depKeys returns all current dependency cache keys
236func (e *edge) currentIndexKey() *CacheKey {
237	if e.cacheMap == nil {
238		return nil
239	}
240
241	keys := make([][]CacheKeyWithSelector, len(e.deps))
242	for i, d := range e.deps {
243		if len(d.keys) == 0 && d.result == nil {
244			return nil
245		}
246		for _, k := range d.keys {
247			keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
248		}
249		if d.result != nil {
250			for _, rk := range d.result.CacheKeys() {
251				keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk})
252			}
253			if d.slowCacheKey != nil {
254				keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}})
255			}
256		}
257	}
258
259	k := NewCacheKey(e.cacheMap.Digest, e.edge.Index)
260	k.deps = keys
261
262	return k
263}
264
265// slow cache keys can be computed in 2 phases if there are multiple deps.
266// first evaluate ones that didn't match any definition based keys
267func (e *edge) skipPhase2SlowCache(dep *dep) bool {
268	isPhase1 := false
269	for _, dep := range e.deps {
270		if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) == 0 {
271			isPhase1 = true
272			break
273		}
274	}
275
276	if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 {
277		return true
278	}
279	return false
280}
281
282func (e *edge) skipPhase2FastCache(dep *dep) bool {
283	isPhase1 := false
284	for _, dep := range e.deps {
285		if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) {
286			isPhase1 = true
287			break
288		}
289	}
290
291	if isPhase1 && len(dep.keyMap) > 0 {
292		return true
293	}
294	return false
295}
296
297// unpark is called by the scheduler with incoming requests and updates for
298// previous calls.
299// To avoid deadlocks and resource leaks this function needs to follow
300// following rules:
301// 1) this function needs to return unclosed outgoing requests if some incoming
302//    requests were not completed
303// 2) this function may not return outgoing requests if it has completed all
304//    incoming requests
305func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) {
306	// process all incoming changes
307	depChanged := false
308	for _, upt := range updates {
309		if changed := e.processUpdate(upt); changed {
310			depChanged = true
311		}
312	}
313
314	if depChanged {
315		// the dep responses had changes. need to reevaluate edge state
316		e.recalcCurrentState()
317	}
318
319	desiredState, done := e.respondToIncoming(incoming, allPipes)
320	if done {
321		return
322	}
323
324	// set up new outgoing requests if needed
325	if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) {
326		index := e.cacheMapIndex
327		e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
328			return e.op.CacheMap(ctx, index)
329		})
330	}
331
332	// execute op
333	if e.execReq == nil && desiredState == edgeStatusComplete {
334		if ok := e.execIfPossible(f); ok {
335			return
336		}
337	}
338
339	if e.execReq == nil {
340		e.createInputRequests(desiredState, f)
341	}
342
343}
344
345func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey {
346	return ExportableCacheKey{
347		CacheKey: k,
348		Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache},
349	}
350}
351
352// processUpdate is called by unpark for every updated pipe request
353func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) {
354	// response for cachemap request
355	if upt == e.cacheMapReq && upt.Status().Completed {
356		if err := upt.Status().Err; err != nil {
357			e.cacheMapReq = nil
358			if !upt.Status().Canceled && e.err == nil {
359				e.err = err
360			}
361		} else {
362			resp := upt.Status().Value.(*cacheMapResp)
363			e.cacheMap = resp.CacheMap
364			e.cacheMapDone = resp.complete
365			e.cacheMapIndex++
366			if len(e.deps) == 0 {
367				e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest)
368				if !e.op.IgnoreCache() {
369					keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index)
370					if err != nil {
371						logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error
372					} else {
373						for _, k := range keys {
374							records, err := e.op.Cache().Records(k)
375							if err != nil {
376								logrus.Errorf("error receiving cache records: %v", err)
377								continue
378							}
379
380							for _, r := range records {
381								e.cacheRecords[r.ID] = r
382							}
383
384							e.keys = append(e.keys, e.makeExportable(k, records))
385						}
386					}
387				}
388				e.state = edgeStatusCacheSlow
389			}
390			if e.allDepsHaveKeys() {
391				e.keysDidChange = true
392			}
393			// probe keys that were loaded before cache map
394			for i, dep := range e.deps {
395				e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector))
396				e.checkDepMatchPossible(dep)
397			}
398			if !e.cacheMapDone {
399				e.cacheMapReq = nil
400			}
401		}
402		return true
403	}
404
405	// response for exec request
406	if upt == e.execReq && upt.Status().Completed {
407		if err := upt.Status().Err; err != nil {
408			e.execReq = nil
409			if !upt.Status().Canceled && e.err == nil {
410				e.err = err
411			}
412		} else {
413			e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult))
414			e.state = edgeStatusComplete
415		}
416		return true
417	}
418
419	// response for requests to dependencies
420	if dep, ok := e.depRequests[upt]; ok {
421		if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil {
422			if e.err == nil {
423				e.err = err
424			}
425			dep.err = err
426		}
427
428		state := upt.Status().Value.(*edgeState)
429
430		if len(dep.keys) < len(state.keys) {
431			newKeys := state.keys[len(dep.keys):]
432			if e.cacheMap != nil {
433				e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector))
434				dep.edgeState.keys = state.keys
435				if e.allDepsHaveKeys() {
436					e.keysDidChange = true
437				}
438			}
439			depChanged = true
440		}
441		if dep.state != edgeStatusComplete && state.state == edgeStatusComplete {
442			e.keysDidChange = true
443		}
444
445		recheck := state.state != dep.state
446
447		dep.edgeState = *state
448
449		if recheck && e.cacheMap != nil {
450			e.checkDepMatchPossible(dep)
451			depChanged = true
452		}
453
454		return
455	}
456
457	// response for result based cache function
458	for i, dep := range e.deps {
459		if upt == dep.slowCacheReq && upt.Status().Completed {
460			if err := upt.Status().Err; err != nil {
461				dep.slowCacheReq = nil
462				if !upt.Status().Canceled && e.err == nil {
463					e.err = upt.Status().Err
464				}
465			} else if !dep.slowCacheComplete {
466				k := NewCacheKey(upt.Status().Value.(digest.Digest), -1)
467				dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}}
468				slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}
469				defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys()))
470				for _, dk := range dep.result.CacheKeys() {
471					defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector})
472				}
473				dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp})
474
475				// connect def key to slow key
476				e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index)
477
478				dep.slowCacheComplete = true
479				e.keysDidChange = true
480				e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true
481			}
482			return true
483		}
484	}
485
486	return
487}
488
489// recalcCurrentState is called by unpark to recompute internal state after
490// the state of dependencies has changed
491func (e *edge) recalcCurrentState() {
492	// TODO: fast pass to detect incomplete results
493	newKeys := map[string]*CacheKey{}
494
495	for i, dep := range e.deps {
496		if i == 0 {
497			for id, k := range dep.keyMap {
498				if _, ok := e.keyMap[id]; ok {
499					continue
500				}
501				newKeys[id] = k
502			}
503		} else {
504			for id := range newKeys {
505				if _, ok := dep.keyMap[id]; !ok {
506					delete(newKeys, id)
507				}
508			}
509		}
510		if len(newKeys) == 0 {
511			break
512		}
513	}
514
515	for _, r := range newKeys {
516		// TODO: add all deps automatically
517		mergedKey := r.clone()
518		mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps))
519		for i, dep := range e.deps {
520			if dep.result != nil {
521				for _, dk := range dep.result.CacheKeys() {
522					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk})
523				}
524				if dep.slowCacheKey != nil {
525					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey})
526				}
527			} else {
528				for _, k := range dep.keys {
529					mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k})
530				}
531			}
532		}
533
534		records, err := e.op.Cache().Records(mergedKey)
535		if err != nil {
536			logrus.Errorf("error receiving cache records: %v", err)
537			continue
538		}
539
540		for _, r := range records {
541			e.cacheRecords[r.ID] = r
542		}
543
544		e.keys = append(e.keys, e.makeExportable(mergedKey, records))
545	}
546
547	// detect lower/upper bound for current state
548	allDepsCompletedCacheFast := e.cacheMap != nil
549	allDepsCompletedCacheSlow := e.cacheMap != nil
550	allDepsStateCacheSlow := true
551	allDepsCompleted := true
552	stLow := edgeStatusInitial    // minimal possible state
553	stHigh := edgeStatusCacheSlow // maximum possible state
554	if e.cacheMap != nil {
555		for _, dep := range e.deps {
556			isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete))
557
558			if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete {
559				stLow = dep.state
560				if stLow > edgeStatusCacheSlow {
561					stLow = edgeStatusCacheSlow
562				}
563			}
564			effectiveState := dep.state
565			if dep.state == edgeStatusCacheSlow && isSlowIncomplete {
566				effectiveState = edgeStatusCacheFast
567			}
568			if dep.state == edgeStatusComplete && isSlowIncomplete {
569				effectiveState = edgeStatusCacheFast
570			}
571			if effectiveState < stHigh {
572				stHigh = effectiveState
573			}
574			if isSlowIncomplete || dep.state < edgeStatusComplete {
575				allDepsCompleted = false
576			}
577			if dep.state < edgeStatusCacheFast {
578				allDepsCompletedCacheFast = false
579			}
580			if isSlowIncomplete || dep.state < edgeStatusCacheSlow {
581				allDepsCompletedCacheSlow = false
582			}
583			if dep.state < edgeStatusCacheSlow && len(dep.keys) == 0 {
584				allDepsStateCacheSlow = false
585			}
586		}
587		if stLow > e.state {
588			e.state = stLow
589		}
590		if stHigh > e.state {
591			e.state = stHigh
592		}
593		if !e.cacheMapDone && len(e.keys) == 0 {
594			e.state = edgeStatusInitial
595		}
596
597		e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast
598		e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow
599		e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow
600		e.allDepsCompleted = e.cacheMapDone && allDepsCompleted
601	}
602}
603
604// respondToIncoming responds to all incoming requests. completing or
605// updating them when possible
606func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) {
607	// detect the result state for the requests
608	allIncomingCanComplete := true
609	desiredState := e.state
610	allCanceled := true
611
612	// check incoming requests
613	// check if all requests can be either answered or canceled
614	if !e.isComplete() {
615		for _, req := range incoming {
616			if !req.Request().Canceled {
617				allCanceled = false
618				if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState {
619					desiredState = r.desiredState
620					if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) {
621						allIncomingCanComplete = false
622					}
623				}
624			}
625		}
626	}
627
628	// do not set allIncomingCanComplete if active ongoing can modify the state
629	if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing {
630		allIncomingCanComplete = false
631	}
632
633	if debugScheduler {
634		logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords))
635	}
636
637	if allIncomingCanComplete && e.hasActiveOutgoing {
638		// cancel all current requests
639		for _, p := range allPipes {
640			p.Cancel()
641		}
642
643		// can close all but one requests
644		var leaveOpen pipe.Sender
645		for _, req := range incoming {
646			if !req.Request().Canceled {
647				leaveOpen = req
648				break
649			}
650		}
651		for _, req := range incoming {
652			if leaveOpen == nil || leaveOpen == req {
653				leaveOpen = req
654				continue
655			}
656			e.finishIncoming(req)
657		}
658		return desiredState, true
659	}
660
661	// can complete, finish and return
662	if allIncomingCanComplete && !e.hasActiveOutgoing {
663		for _, req := range incoming {
664			e.finishIncoming(req)
665		}
666		return desiredState, true
667	}
668
669	// update incoming based on current state
670	for _, req := range incoming {
671		r := req.Request().Payload.(*edgeRequest)
672		if req.Request().Canceled {
673			e.finishIncoming(req)
674		} else if !e.hasActiveOutgoing && e.state >= r.desiredState {
675			e.finishIncoming(req)
676		} else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled {
677			e.updateIncoming(req)
678		}
679	}
680	return desiredState, false
681}
682
683// createInputRequests creates new requests for dependencies or async functions
684// that need to complete to continue processing the edge
685func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory) {
686	// initialize deps state
687	if e.deps == nil {
688		e.depRequests = make(map[pipe.Receiver]*dep)
689		e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs()))
690		for i := range e.edge.Vertex.Inputs() {
691			e.deps = append(e.deps, newDep(Index(i)))
692		}
693	}
694
695	// cycle all dependencies. set up outgoing requests if needed
696	for _, dep := range e.deps {
697		desiredStateDep := dep.state
698
699		if e.noCacheMatchPossible {
700			desiredStateDep = edgeStatusComplete
701		} else if dep.state == edgeStatusInitial && desiredState > dep.state {
702			desiredStateDep = edgeStatusCacheFast
703		} else if dep.state == edgeStatusCacheFast && desiredState > dep.state {
704			// wait all deps to complete cache fast before continuing with slow cache
705			if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keys) == 0 || e.allDepsHaveKeys() {
706				if !e.skipPhase2FastCache(dep) {
707					desiredStateDep = edgeStatusCacheSlow
708				}
709			}
710		} else if dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete {
711			// if all deps have completed cache-slow or content based cache for input is available
712			if (len(dep.keys) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) {
713				if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow {
714					desiredStateDep = edgeStatusComplete
715				}
716			}
717		} else if dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow {
718			if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow {
719				desiredStateDep = edgeStatusComplete
720			}
721		}
722
723		// outgoing request is needed
724		if dep.state < desiredStateDep {
725			addNew := true
726			if dep.req != nil && !dep.req.Status().Completed {
727				if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep {
728					dep.req.Cancel()
729				} else {
730					addNew = false
731				}
732			}
733			if addNew {
734				req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{
735					currentState: dep.edgeState,
736					desiredState: desiredStateDep,
737					currentKeys:  len(dep.keys),
738				})
739				e.depRequests[req] = dep
740				dep.req = req
741			}
742		}
743		// initialize function to compute cache key based on dependency result
744		if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil {
745			fn := e.slowCacheFunc(dep)
746			res := dep.result
747			func(fn ResultBasedCacheFunc, res Result, index Index) {
748				dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
749					return e.op.CalcSlowCache(ctx, index, fn, res)
750				})
751			}(fn, res, dep.index)
752		}
753	}
754}
755
756// execIfPossible creates a request for getting the edge result if there is
757// enough state
758func (e *edge) execIfPossible(f *pipeFactory) bool {
759	if len(e.cacheRecords) > 0 {
760		if e.keysDidChange {
761			e.postpone(f)
762			return true
763		}
764		e.execReq = f.NewFuncRequest(e.loadCache)
765		for req := range e.depRequests {
766			req.Cancel()
767		}
768		return true
769	} else if e.allDepsCompleted {
770		if e.keysDidChange {
771			e.postpone(f)
772			return true
773		}
774		e.execReq = f.NewFuncRequest(e.execOp)
775		return true
776	}
777	return false
778}
779
780// postpone delays exec to next unpark invocation if we have unprocessed keys
781func (e *edge) postpone(f *pipeFactory) {
782	f.NewFuncRequest(func(context.Context) (interface{}, error) {
783		return nil, nil
784	})
785}
786
787// loadCache creates a request to load edge result from cache
788func (e *edge) loadCache(ctx context.Context) (interface{}, error) {
789	recs := make([]*CacheRecord, 0, len(e.cacheRecords))
790	for _, r := range e.cacheRecords {
791		recs = append(recs, r)
792	}
793
794	rec := getBestResult(recs)
795
796	logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID)
797	res, err := e.op.LoadCache(ctx, rec)
798	if err != nil {
799		return nil, err
800	}
801
802	return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil
803}
804
805// execOp creates a request to execute the vertex operation
806func (e *edge) execOp(ctx context.Context) (interface{}, error) {
807	cacheKeys, inputs := e.commitOptions()
808	results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs))
809	if err != nil {
810		return nil, err
811	}
812
813	index := e.edge.Index
814	if len(results) <= int(index) {
815		return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results))
816	}
817
818	res := results[int(index)]
819
820	for i := range results {
821		if i != int(index) {
822			go results[i].Release(context.TODO())
823		}
824	}
825
826	var exporters []CacheExporter
827
828	for _, cacheKey := range cacheKeys {
829		ck, err := e.op.Cache().Save(cacheKey, res)
830		if err != nil {
831			return nil, err
832		}
833
834		if exp, ok := ck.Exporter.(*exporter); ok {
835			exp.edge = e
836		}
837
838		exps := make([]CacheExporter, 0, len(subExporters))
839		for _, exp := range subExporters {
840			exps = append(exps, exp.Exporter)
841		}
842
843		exporters = append(exporters, ck.Exporter)
844		exporters = append(exporters, exps...)
845	}
846
847	ek := make([]ExportableCacheKey, 0, len(cacheKeys))
848	for _, ck := range cacheKeys {
849		ek = append(ek, ExportableCacheKey{
850			CacheKey: ck,
851			Exporter: &mergedExporter{exporters: exporters},
852		})
853	}
854
855	return NewCachedResult(res, ek), nil
856}
857
858func toResultSlice(cres []CachedResult) (out []Result) {
859	out = make([]Result, len(cres))
860	for i := range cres {
861		out[i] = cres[i].(Result)
862	}
863	return out
864}
865
866func isEqualState(s1, s2 edgeState) bool {
867	if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) {
868		return false
869	}
870	return true
871}
872
873func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector {
874	out := make([]CacheKeyWithSelector, len(keys))
875	for i, k := range keys {
876		out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k}
877	}
878	return out
879}
880