1package solver 2 3import ( 4 "context" 5 "sync" 6 7 "github.com/moby/buildkit/solver/internal/pipe" 8 digest "github.com/opencontainers/go-digest" 9 "github.com/pkg/errors" 10 "github.com/sirupsen/logrus" 11) 12 13type edgeStatusType int 14 15const ( 16 edgeStatusInitial edgeStatusType = iota 17 edgeStatusCacheFast 18 edgeStatusCacheSlow 19 edgeStatusComplete 20) 21 22func (t edgeStatusType) String() string { 23 return []string{"initial", "cache-fast", "cache-slow", "complete"}[t] 24} 25 26func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { 27 e := &edge{ 28 edge: ed, 29 op: op, 30 depRequests: map[pipe.Receiver]*dep{}, 31 keyMap: map[string]*CacheKey{}, 32 cacheRecords: map[string]*CacheRecord{}, 33 index: index, 34 } 35 return e 36} 37 38type edge struct { 39 edge Edge 40 op activeOp 41 42 edgeState 43 depRequests map[pipe.Receiver]*dep 44 deps []*dep 45 46 cacheMapReq pipe.Receiver 47 cacheMapDone bool 48 cacheMapIndex int 49 cacheMapDigests []digest.Digest 50 execReq pipe.Receiver 51 err error 52 cacheRecords map[string]*CacheRecord 53 keyMap map[string]*CacheKey 54 55 noCacheMatchPossible bool 56 allDepsCompletedCacheFast bool 57 allDepsCompletedCacheSlow bool 58 allDepsStateCacheSlow bool 59 allDepsCompleted bool 60 hasActiveOutgoing bool 61 62 releaserCount int 63 keysDidChange bool 64 index *edgeIndex 65 66 secondaryExporters []expDep 67} 68 69// dep holds state for a dependant edge 70type dep struct { 71 req pipe.Receiver 72 edgeState 73 index Index 74 keyMap map[string]*CacheKey 75 desiredState edgeStatusType 76 e *edge 77 slowCacheReq pipe.Receiver 78 slowCacheComplete bool 79 slowCacheFoundKey bool 80 slowCacheKey *ExportableCacheKey 81 err error 82} 83 84// expDep holds secorndary exporter info for dependency 85type expDep struct { 86 index int 87 cacheKey CacheKeyWithSelector 88} 89 90func newDep(i Index) *dep { 91 return &dep{index: i, keyMap: map[string]*CacheKey{}} 92} 93 94// edgePipe is a pipe for requests between two edges 95type edgePipe struct { 96 *pipe.Pipe 97 From, Target *edge 98 mu sync.Mutex 99} 100 101// edgeState hold basic mutable state info for an edge 102type edgeState struct { 103 state edgeStatusType 104 result *SharedCachedResult 105 cacheMap *CacheMap 106 keys []ExportableCacheKey 107} 108 109type edgeRequest struct { 110 desiredState edgeStatusType 111 currentState edgeState 112 currentKeys int 113} 114 115// incrementReferenceCount increases the number of times release needs to be 116// called to release the edge. Called on merging edges. 117func (e *edge) incrementReferenceCount() { 118 e.releaserCount += 1 119} 120 121// release releases the edge resources 122func (e *edge) release() { 123 if e.releaserCount > 0 { 124 e.releaserCount-- 125 return 126 } 127 e.index.Release(e) 128 if e.result != nil { 129 go e.result.Release(context.TODO()) 130 } 131} 132 133// commitOptions returns parameters for the op execution 134func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) { 135 k := NewCacheKey(e.cacheMap.Digest, e.edge.Index) 136 if len(e.deps) == 0 { 137 keys := make([]*CacheKey, 0, len(e.cacheMapDigests)) 138 for _, dgst := range e.cacheMapDigests { 139 keys = append(keys, NewCacheKey(dgst, e.edge.Index)) 140 } 141 return keys, nil 142 } 143 144 inputs := make([][]CacheKeyWithSelector, len(e.deps)) 145 results := make([]CachedResult, len(e.deps)) 146 for i, dep := range e.deps { 147 for _, k := range dep.result.CacheKeys() { 148 inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector}) 149 } 150 if dep.slowCacheKey != nil { 151 inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}) 152 } 153 results[i] = dep.result 154 } 155 156 k.deps = inputs 157 return []*CacheKey{k}, results 158} 159 160// isComplete returns true if edge state is final and will never change 161func (e *edge) isComplete() bool { 162 return e.err != nil || e.result != nil 163} 164 165// finishIncoming finalizes the incoming pipe request 166func (e *edge) finishIncoming(req pipe.Sender) { 167 err := e.err 168 if req.Request().Canceled && err == nil { 169 err = context.Canceled 170 } 171 if debugScheduler { 172 logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) 173 } 174 req.Finalize(&e.edgeState, err) 175} 176 177// updateIncoming updates the current value of incoming pipe request 178func (e *edge) updateIncoming(req pipe.Sender) { 179 req.Update(&e.edgeState) 180} 181 182// probeCache is called with unprocessed cache keys for dependency 183// if the key could match the edge, the cacheRecords for dependency are filled 184func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool { 185 if len(depKeys) == 0 { 186 return false 187 } 188 if e.op.IgnoreCache() { 189 return false 190 } 191 keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index) 192 if err != nil { 193 e.err = errors.Wrap(err, "error on cache query") 194 } 195 found := false 196 for _, k := range keys { 197 if _, ok := d.keyMap[k.ID]; !ok { 198 d.keyMap[k.ID] = k 199 found = true 200 } 201 } 202 return found 203} 204 205// checkDepMatchPossible checks if any cache matches are possible past this point 206func (e *edge) checkDepMatchPossible(dep *dep) { 207 depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil 208 if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keys) == 0) { 209 e.noCacheMatchPossible = true 210 } 211} 212 213// slowCacheFunc returns the result based cache func for dependency if it exists 214func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc { 215 if e.cacheMap == nil { 216 return nil 217 } 218 return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc 219} 220 221// allDepsHaveKeys checks if all dependencies have at least one key. used for 222// determining if there is enough data for combining cache key for edge 223func (e *edge) allDepsHaveKeys() bool { 224 if e.cacheMap == nil { 225 return false 226 } 227 for _, d := range e.deps { 228 if len(d.keys) == 0 && d.slowCacheKey == nil && d.result == nil { 229 return false 230 } 231 } 232 return true 233} 234 235// depKeys returns all current dependency cache keys 236func (e *edge) currentIndexKey() *CacheKey { 237 if e.cacheMap == nil { 238 return nil 239 } 240 241 keys := make([][]CacheKeyWithSelector, len(e.deps)) 242 for i, d := range e.deps { 243 if len(d.keys) == 0 && d.result == nil { 244 return nil 245 } 246 for _, k := range d.keys { 247 keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k}) 248 } 249 if d.result != nil { 250 for _, rk := range d.result.CacheKeys() { 251 keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk}) 252 } 253 if d.slowCacheKey != nil { 254 keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}}) 255 } 256 } 257 } 258 259 k := NewCacheKey(e.cacheMap.Digest, e.edge.Index) 260 k.deps = keys 261 262 return k 263} 264 265// slow cache keys can be computed in 2 phases if there are multiple deps. 266// first evaluate ones that didn't match any definition based keys 267func (e *edge) skipPhase2SlowCache(dep *dep) bool { 268 isPhase1 := false 269 for _, dep := range e.deps { 270 if !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) == 0 { 271 isPhase1 = true 272 break 273 } 274 } 275 276 if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 { 277 return true 278 } 279 return false 280} 281 282func (e *edge) skipPhase2FastCache(dep *dep) bool { 283 isPhase1 := false 284 for _, dep := range e.deps { 285 if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) { 286 isPhase1 = true 287 break 288 } 289 } 290 291 if isPhase1 && len(dep.keyMap) > 0 { 292 return true 293 } 294 return false 295} 296 297// unpark is called by the scheduler with incoming requests and updates for 298// previous calls. 299// To avoid deadlocks and resource leaks this function needs to follow 300// following rules: 301// 1) this function needs to return unclosed outgoing requests if some incoming 302// requests were not completed 303// 2) this function may not return outgoing requests if it has completed all 304// incoming requests 305func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) { 306 // process all incoming changes 307 depChanged := false 308 for _, upt := range updates { 309 if changed := e.processUpdate(upt); changed { 310 depChanged = true 311 } 312 } 313 314 if depChanged { 315 // the dep responses had changes. need to reevaluate edge state 316 e.recalcCurrentState() 317 } 318 319 desiredState, done := e.respondToIncoming(incoming, allPipes) 320 if done { 321 return 322 } 323 324 // set up new outgoing requests if needed 325 if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) { 326 index := e.cacheMapIndex 327 e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { 328 return e.op.CacheMap(ctx, index) 329 }) 330 } 331 332 // execute op 333 if e.execReq == nil && desiredState == edgeStatusComplete { 334 if ok := e.execIfPossible(f); ok { 335 return 336 } 337 } 338 339 if e.execReq == nil { 340 e.createInputRequests(desiredState, f) 341 } 342 343} 344 345func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey { 346 return ExportableCacheKey{ 347 CacheKey: k, 348 Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache}, 349 } 350} 351 352// processUpdate is called by unpark for every updated pipe request 353func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { 354 // response for cachemap request 355 if upt == e.cacheMapReq && upt.Status().Completed { 356 if err := upt.Status().Err; err != nil { 357 e.cacheMapReq = nil 358 if !upt.Status().Canceled && e.err == nil { 359 e.err = err 360 } 361 } else { 362 resp := upt.Status().Value.(*cacheMapResp) 363 e.cacheMap = resp.CacheMap 364 e.cacheMapDone = resp.complete 365 e.cacheMapIndex++ 366 if len(e.deps) == 0 { 367 e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest) 368 if !e.op.IgnoreCache() { 369 keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) 370 if err != nil { 371 logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error 372 } else { 373 for _, k := range keys { 374 records, err := e.op.Cache().Records(k) 375 if err != nil { 376 logrus.Errorf("error receiving cache records: %v", err) 377 continue 378 } 379 380 for _, r := range records { 381 e.cacheRecords[r.ID] = r 382 } 383 384 e.keys = append(e.keys, e.makeExportable(k, records)) 385 } 386 } 387 } 388 e.state = edgeStatusCacheSlow 389 } 390 if e.allDepsHaveKeys() { 391 e.keysDidChange = true 392 } 393 // probe keys that were loaded before cache map 394 for i, dep := range e.deps { 395 e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector)) 396 e.checkDepMatchPossible(dep) 397 } 398 if !e.cacheMapDone { 399 e.cacheMapReq = nil 400 } 401 } 402 return true 403 } 404 405 // response for exec request 406 if upt == e.execReq && upt.Status().Completed { 407 if err := upt.Status().Err; err != nil { 408 e.execReq = nil 409 if !upt.Status().Canceled && e.err == nil { 410 e.err = err 411 } 412 } else { 413 e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult)) 414 e.state = edgeStatusComplete 415 } 416 return true 417 } 418 419 // response for requests to dependencies 420 if dep, ok := e.depRequests[upt]; ok { 421 if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil { 422 if e.err == nil { 423 e.err = err 424 } 425 dep.err = err 426 } 427 428 state := upt.Status().Value.(*edgeState) 429 430 if len(dep.keys) < len(state.keys) { 431 newKeys := state.keys[len(dep.keys):] 432 if e.cacheMap != nil { 433 e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector)) 434 dep.edgeState.keys = state.keys 435 if e.allDepsHaveKeys() { 436 e.keysDidChange = true 437 } 438 } 439 depChanged = true 440 } 441 if dep.state != edgeStatusComplete && state.state == edgeStatusComplete { 442 e.keysDidChange = true 443 } 444 445 recheck := state.state != dep.state 446 447 dep.edgeState = *state 448 449 if recheck && e.cacheMap != nil { 450 e.checkDepMatchPossible(dep) 451 depChanged = true 452 } 453 454 return 455 } 456 457 // response for result based cache function 458 for i, dep := range e.deps { 459 if upt == dep.slowCacheReq && upt.Status().Completed { 460 if err := upt.Status().Err; err != nil { 461 dep.slowCacheReq = nil 462 if !upt.Status().Canceled && e.err == nil { 463 e.err = upt.Status().Err 464 } 465 } else if !dep.slowCacheComplete { 466 k := NewCacheKey(upt.Status().Value.(digest.Digest), -1) 467 dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} 468 slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} 469 defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) 470 for _, dk := range dep.result.CacheKeys() { 471 defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector}) 472 } 473 dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) 474 475 // connect def key to slow key 476 e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) 477 478 dep.slowCacheComplete = true 479 e.keysDidChange = true 480 e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true 481 } 482 return true 483 } 484 } 485 486 return 487} 488 489// recalcCurrentState is called by unpark to recompute internal state after 490// the state of dependencies has changed 491func (e *edge) recalcCurrentState() { 492 // TODO: fast pass to detect incomplete results 493 newKeys := map[string]*CacheKey{} 494 495 for i, dep := range e.deps { 496 if i == 0 { 497 for id, k := range dep.keyMap { 498 if _, ok := e.keyMap[id]; ok { 499 continue 500 } 501 newKeys[id] = k 502 } 503 } else { 504 for id := range newKeys { 505 if _, ok := dep.keyMap[id]; !ok { 506 delete(newKeys, id) 507 } 508 } 509 } 510 if len(newKeys) == 0 { 511 break 512 } 513 } 514 515 for _, r := range newKeys { 516 // TODO: add all deps automatically 517 mergedKey := r.clone() 518 mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps)) 519 for i, dep := range e.deps { 520 if dep.result != nil { 521 for _, dk := range dep.result.CacheKeys() { 522 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk}) 523 } 524 if dep.slowCacheKey != nil { 525 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}) 526 } 527 } else { 528 for _, k := range dep.keys { 529 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k}) 530 } 531 } 532 } 533 534 records, err := e.op.Cache().Records(mergedKey) 535 if err != nil { 536 logrus.Errorf("error receiving cache records: %v", err) 537 continue 538 } 539 540 for _, r := range records { 541 e.cacheRecords[r.ID] = r 542 } 543 544 e.keys = append(e.keys, e.makeExportable(mergedKey, records)) 545 } 546 547 // detect lower/upper bound for current state 548 allDepsCompletedCacheFast := e.cacheMap != nil 549 allDepsCompletedCacheSlow := e.cacheMap != nil 550 allDepsStateCacheSlow := true 551 allDepsCompleted := true 552 stLow := edgeStatusInitial // minimal possible state 553 stHigh := edgeStatusCacheSlow // maximum possible state 554 if e.cacheMap != nil { 555 for _, dep := range e.deps { 556 isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) 557 558 if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete { 559 stLow = dep.state 560 if stLow > edgeStatusCacheSlow { 561 stLow = edgeStatusCacheSlow 562 } 563 } 564 effectiveState := dep.state 565 if dep.state == edgeStatusCacheSlow && isSlowIncomplete { 566 effectiveState = edgeStatusCacheFast 567 } 568 if dep.state == edgeStatusComplete && isSlowIncomplete { 569 effectiveState = edgeStatusCacheFast 570 } 571 if effectiveState < stHigh { 572 stHigh = effectiveState 573 } 574 if isSlowIncomplete || dep.state < edgeStatusComplete { 575 allDepsCompleted = false 576 } 577 if dep.state < edgeStatusCacheFast { 578 allDepsCompletedCacheFast = false 579 } 580 if isSlowIncomplete || dep.state < edgeStatusCacheSlow { 581 allDepsCompletedCacheSlow = false 582 } 583 if dep.state < edgeStatusCacheSlow && len(dep.keys) == 0 { 584 allDepsStateCacheSlow = false 585 } 586 } 587 if stLow > e.state { 588 e.state = stLow 589 } 590 if stHigh > e.state { 591 e.state = stHigh 592 } 593 if !e.cacheMapDone && len(e.keys) == 0 { 594 e.state = edgeStatusInitial 595 } 596 597 e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast 598 e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow 599 e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow 600 e.allDepsCompleted = e.cacheMapDone && allDepsCompleted 601 } 602} 603 604// respondToIncoming responds to all incoming requests. completing or 605// updating them when possible 606func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) { 607 // detect the result state for the requests 608 allIncomingCanComplete := true 609 desiredState := e.state 610 allCanceled := true 611 612 // check incoming requests 613 // check if all requests can be either answered or canceled 614 if !e.isComplete() { 615 for _, req := range incoming { 616 if !req.Request().Canceled { 617 allCanceled = false 618 if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState { 619 desiredState = r.desiredState 620 if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) { 621 allIncomingCanComplete = false 622 } 623 } 624 } 625 } 626 } 627 628 // do not set allIncomingCanComplete if active ongoing can modify the state 629 if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing { 630 allIncomingCanComplete = false 631 } 632 633 if debugScheduler { 634 logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords)) 635 } 636 637 if allIncomingCanComplete && e.hasActiveOutgoing { 638 // cancel all current requests 639 for _, p := range allPipes { 640 p.Cancel() 641 } 642 643 // can close all but one requests 644 var leaveOpen pipe.Sender 645 for _, req := range incoming { 646 if !req.Request().Canceled { 647 leaveOpen = req 648 break 649 } 650 } 651 for _, req := range incoming { 652 if leaveOpen == nil || leaveOpen == req { 653 leaveOpen = req 654 continue 655 } 656 e.finishIncoming(req) 657 } 658 return desiredState, true 659 } 660 661 // can complete, finish and return 662 if allIncomingCanComplete && !e.hasActiveOutgoing { 663 for _, req := range incoming { 664 e.finishIncoming(req) 665 } 666 return desiredState, true 667 } 668 669 // update incoming based on current state 670 for _, req := range incoming { 671 r := req.Request().Payload.(*edgeRequest) 672 if req.Request().Canceled { 673 e.finishIncoming(req) 674 } else if !e.hasActiveOutgoing && e.state >= r.desiredState { 675 e.finishIncoming(req) 676 } else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled { 677 e.updateIncoming(req) 678 } 679 } 680 return desiredState, false 681} 682 683// createInputRequests creates new requests for dependencies or async functions 684// that need to complete to continue processing the edge 685func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory) { 686 // initialize deps state 687 if e.deps == nil { 688 e.depRequests = make(map[pipe.Receiver]*dep) 689 e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) 690 for i := range e.edge.Vertex.Inputs() { 691 e.deps = append(e.deps, newDep(Index(i))) 692 } 693 } 694 695 // cycle all dependencies. set up outgoing requests if needed 696 for _, dep := range e.deps { 697 desiredStateDep := dep.state 698 699 if e.noCacheMatchPossible { 700 desiredStateDep = edgeStatusComplete 701 } else if dep.state == edgeStatusInitial && desiredState > dep.state { 702 desiredStateDep = edgeStatusCacheFast 703 } else if dep.state == edgeStatusCacheFast && desiredState > dep.state { 704 // wait all deps to complete cache fast before continuing with slow cache 705 if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keys) == 0 || e.allDepsHaveKeys() { 706 if !e.skipPhase2FastCache(dep) { 707 desiredStateDep = edgeStatusCacheSlow 708 } 709 } 710 } else if dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete { 711 // if all deps have completed cache-slow or content based cache for input is available 712 if (len(dep.keys) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) { 713 if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow { 714 desiredStateDep = edgeStatusComplete 715 } 716 } 717 } else if dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow { 718 if len(dep.keys) == 0 || !e.skipPhase2SlowCache(dep) && e.allDepsStateCacheSlow { 719 desiredStateDep = edgeStatusComplete 720 } 721 } 722 723 // outgoing request is needed 724 if dep.state < desiredStateDep { 725 addNew := true 726 if dep.req != nil && !dep.req.Status().Completed { 727 if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep { 728 dep.req.Cancel() 729 } else { 730 addNew = false 731 } 732 } 733 if addNew { 734 req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{ 735 currentState: dep.edgeState, 736 desiredState: desiredStateDep, 737 currentKeys: len(dep.keys), 738 }) 739 e.depRequests[req] = dep 740 dep.req = req 741 } 742 } 743 // initialize function to compute cache key based on dependency result 744 if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && e.slowCacheFunc(dep) != nil && e.cacheMap != nil { 745 fn := e.slowCacheFunc(dep) 746 res := dep.result 747 func(fn ResultBasedCacheFunc, res Result, index Index) { 748 dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { 749 return e.op.CalcSlowCache(ctx, index, fn, res) 750 }) 751 }(fn, res, dep.index) 752 } 753 } 754} 755 756// execIfPossible creates a request for getting the edge result if there is 757// enough state 758func (e *edge) execIfPossible(f *pipeFactory) bool { 759 if len(e.cacheRecords) > 0 { 760 if e.keysDidChange { 761 e.postpone(f) 762 return true 763 } 764 e.execReq = f.NewFuncRequest(e.loadCache) 765 for req := range e.depRequests { 766 req.Cancel() 767 } 768 return true 769 } else if e.allDepsCompleted { 770 if e.keysDidChange { 771 e.postpone(f) 772 return true 773 } 774 e.execReq = f.NewFuncRequest(e.execOp) 775 return true 776 } 777 return false 778} 779 780// postpone delays exec to next unpark invocation if we have unprocessed keys 781func (e *edge) postpone(f *pipeFactory) { 782 f.NewFuncRequest(func(context.Context) (interface{}, error) { 783 return nil, nil 784 }) 785} 786 787// loadCache creates a request to load edge result from cache 788func (e *edge) loadCache(ctx context.Context) (interface{}, error) { 789 recs := make([]*CacheRecord, 0, len(e.cacheRecords)) 790 for _, r := range e.cacheRecords { 791 recs = append(recs, r) 792 } 793 794 rec := getBestResult(recs) 795 796 logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) 797 res, err := e.op.LoadCache(ctx, rec) 798 if err != nil { 799 return nil, err 800 } 801 802 return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil 803} 804 805// execOp creates a request to execute the vertex operation 806func (e *edge) execOp(ctx context.Context) (interface{}, error) { 807 cacheKeys, inputs := e.commitOptions() 808 results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs)) 809 if err != nil { 810 return nil, err 811 } 812 813 index := e.edge.Index 814 if len(results) <= int(index) { 815 return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results)) 816 } 817 818 res := results[int(index)] 819 820 for i := range results { 821 if i != int(index) { 822 go results[i].Release(context.TODO()) 823 } 824 } 825 826 var exporters []CacheExporter 827 828 for _, cacheKey := range cacheKeys { 829 ck, err := e.op.Cache().Save(cacheKey, res) 830 if err != nil { 831 return nil, err 832 } 833 834 if exp, ok := ck.Exporter.(*exporter); ok { 835 exp.edge = e 836 } 837 838 exps := make([]CacheExporter, 0, len(subExporters)) 839 for _, exp := range subExporters { 840 exps = append(exps, exp.Exporter) 841 } 842 843 exporters = append(exporters, ck.Exporter) 844 exporters = append(exporters, exps...) 845 } 846 847 ek := make([]ExportableCacheKey, 0, len(cacheKeys)) 848 for _, ck := range cacheKeys { 849 ek = append(ek, ExportableCacheKey{ 850 CacheKey: ck, 851 Exporter: &mergedExporter{exporters: exporters}, 852 }) 853 } 854 855 return NewCachedResult(res, ek), nil 856} 857 858func toResultSlice(cres []CachedResult) (out []Result) { 859 out = make([]Result, len(cres)) 860 for i := range cres { 861 out[i] = cres[i].(Result) 862 } 863 return out 864} 865 866func isEqualState(s1, s2 edgeState) bool { 867 if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) { 868 return false 869 } 870 return true 871} 872 873func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector { 874 out := make([]CacheKeyWithSelector, len(keys)) 875 for i, k := range keys { 876 out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k} 877 } 878 return out 879} 880