1package solver 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/moby/buildkit/solver/internal/pipe" 9 digest "github.com/opencontainers/go-digest" 10 "github.com/pkg/errors" 11 "github.com/sirupsen/logrus" 12) 13 14type edgeStatusType int 15 16const ( 17 edgeStatusInitial edgeStatusType = iota 18 edgeStatusCacheFast 19 edgeStatusCacheSlow 20 edgeStatusComplete 21) 22 23func (t edgeStatusType) String() string { 24 return []string{"initial", "cache-fast", "cache-slow", "complete"}[t] 25} 26 27func newEdge(ed Edge, op activeOp, index *edgeIndex) *edge { 28 e := &edge{ 29 edge: ed, 30 op: op, 31 depRequests: map[pipe.Receiver]*dep{}, 32 keyMap: map[string]struct{}{}, 33 cacheRecords: map[string]*CacheRecord{}, 34 cacheRecordsLoaded: map[string]struct{}{}, 35 index: index, 36 } 37 return e 38} 39 40type edge struct { 41 edge Edge 42 op activeOp 43 44 edgeState 45 depRequests map[pipe.Receiver]*dep 46 deps []*dep 47 48 cacheMapReq pipe.Receiver 49 cacheMapDone bool 50 cacheMapIndex int 51 cacheMapDigests []digest.Digest 52 execReq pipe.Receiver 53 execCacheLoad bool 54 err error 55 cacheRecords map[string]*CacheRecord 56 cacheRecordsLoaded map[string]struct{} 57 keyMap map[string]struct{} 58 59 noCacheMatchPossible bool 60 allDepsCompletedCacheFast bool 61 allDepsCompletedCacheSlow bool 62 allDepsStateCacheSlow bool 63 allDepsCompleted bool 64 hasActiveOutgoing bool 65 66 releaserCount int 67 keysDidChange bool 68 index *edgeIndex 69 70 secondaryExporters []expDep 71} 72 73// dep holds state for a dependant edge 74type dep struct { 75 req pipe.Receiver 76 edgeState 77 index Index 78 keyMap map[string]*CacheKey 79 slowCacheReq pipe.Receiver 80 slowCacheComplete bool 81 slowCacheFoundKey bool 82 slowCacheKey *ExportableCacheKey 83 err error 84} 85 86// expDep holds secorndary exporter info for dependency 87type expDep struct { 88 index int 89 cacheKey CacheKeyWithSelector 90} 91 92func newDep(i Index) *dep { 93 return &dep{index: i, keyMap: map[string]*CacheKey{}} 94} 95 96// edgePipe is a pipe for requests between two edges 97type edgePipe struct { 98 *pipe.Pipe 99 From, Target *edge 100 mu sync.Mutex 101} 102 103// edgeState hold basic mutable state info for an edge 104type edgeState struct { 105 state edgeStatusType 106 result *SharedCachedResult 107 cacheMap *CacheMap 108 keys []ExportableCacheKey 109} 110 111type edgeRequest struct { 112 desiredState edgeStatusType 113 currentState edgeState 114 currentKeys int 115} 116 117// incrementReferenceCount increases the number of times release needs to be 118// called to release the edge. Called on merging edges. 119func (e *edge) incrementReferenceCount() { 120 e.releaserCount++ 121} 122 123// release releases the edge resources 124func (e *edge) release() { 125 if e.releaserCount > 0 { 126 e.releaserCount-- 127 return 128 } 129 e.index.Release(e) 130 if e.result != nil { 131 go e.result.Release(context.TODO()) 132 } 133} 134 135// commitOptions returns parameters for the op execution 136func (e *edge) commitOptions() ([]*CacheKey, []CachedResult) { 137 k := NewCacheKey(e.cacheMap.Digest, e.edge.Index) 138 if len(e.deps) == 0 { 139 keys := make([]*CacheKey, 0, len(e.cacheMapDigests)) 140 for _, dgst := range e.cacheMapDigests { 141 keys = append(keys, NewCacheKey(dgst, e.edge.Index)) 142 } 143 return keys, nil 144 } 145 146 inputs := make([][]CacheKeyWithSelector, len(e.deps)) 147 results := make([]CachedResult, len(e.deps)) 148 for i, dep := range e.deps { 149 for _, k := range dep.result.CacheKeys() { 150 inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: k, Selector: e.cacheMap.Deps[i].Selector}) 151 } 152 if dep.slowCacheKey != nil { 153 inputs[i] = append(inputs[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}) 154 } 155 results[i] = dep.result 156 } 157 158 k.deps = inputs 159 return []*CacheKey{k}, results 160} 161 162// isComplete returns true if edge state is final and will never change 163func (e *edge) isComplete() bool { 164 return e.err != nil || e.result != nil 165} 166 167// finishIncoming finalizes the incoming pipe request 168func (e *edge) finishIncoming(req pipe.Sender) { 169 err := e.err 170 if req.Request().Canceled && err == nil { 171 err = context.Canceled 172 } 173 if debugScheduler { 174 logrus.Debugf("finishIncoming %s %v %#v desired=%s", e.edge.Vertex.Name(), err, e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) 175 } 176 req.Finalize(&e.edgeState, err) 177} 178 179// updateIncoming updates the current value of incoming pipe request 180func (e *edge) updateIncoming(req pipe.Sender) { 181 if debugScheduler { 182 logrus.Debugf("updateIncoming %s %#v desired=%s", e.edge.Vertex.Name(), e.edgeState, req.Request().Payload.(*edgeRequest).desiredState) 183 } 184 req.Update(&e.edgeState) 185} 186 187// probeCache is called with unprocessed cache keys for dependency 188// if the key could match the edge, the cacheRecords for dependency are filled 189func (e *edge) probeCache(d *dep, depKeys []CacheKeyWithSelector) bool { 190 if len(depKeys) == 0 { 191 return false 192 } 193 if e.op.IgnoreCache() { 194 return false 195 } 196 keys, err := e.op.Cache().Query(depKeys, d.index, e.cacheMap.Digest, e.edge.Index) 197 if err != nil { 198 e.err = errors.Wrap(err, "error on cache query") 199 } 200 found := false 201 for _, k := range keys { 202 if _, ok := d.keyMap[k.ID]; !ok { 203 d.keyMap[k.ID] = k 204 found = true 205 } 206 } 207 return found 208} 209 210// checkDepMatchPossible checks if any cache matches are possible past this point 211func (e *edge) checkDepMatchPossible(dep *dep) { 212 depHasSlowCache := e.cacheMap.Deps[dep.index].ComputeDigestFunc != nil 213 if !e.noCacheMatchPossible && (((!dep.slowCacheFoundKey && dep.slowCacheComplete && depHasSlowCache) || (!depHasSlowCache && dep.state >= edgeStatusCacheSlow)) && len(dep.keyMap) == 0) { 214 e.noCacheMatchPossible = true 215 } 216} 217 218// slowCacheFunc returns the result based cache func for dependency if it exists 219func (e *edge) slowCacheFunc(dep *dep) ResultBasedCacheFunc { 220 if e.cacheMap == nil { 221 return nil 222 } 223 return e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc 224} 225 226// preprocessFunc returns result based cache func 227func (e *edge) preprocessFunc(dep *dep) PreprocessFunc { 228 if e.cacheMap == nil { 229 return nil 230 } 231 return e.cacheMap.Deps[int(dep.index)].PreprocessFunc 232} 233 234// allDepsHaveKeys checks if all dependencies have at least one key. used for 235// determining if there is enough data for combining cache key for edge 236func (e *edge) allDepsHaveKeys(matching bool) bool { 237 if e.cacheMap == nil { 238 return false 239 } 240 for _, d := range e.deps { 241 cond := len(d.keys) == 0 242 if matching { 243 cond = len(d.keyMap) == 0 244 } 245 if cond && d.slowCacheKey == nil && d.result == nil { 246 return false 247 } 248 } 249 return true 250} 251 252// depKeys returns all current dependency cache keys 253func (e *edge) currentIndexKey() *CacheKey { 254 if e.cacheMap == nil { 255 return nil 256 } 257 258 keys := make([][]CacheKeyWithSelector, len(e.deps)) 259 for i, d := range e.deps { 260 if len(d.keys) == 0 && d.result == nil { 261 return nil 262 } 263 for _, k := range d.keys { 264 keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k}) 265 } 266 if d.result != nil { 267 for _, rk := range d.result.CacheKeys() { 268 keys[i] = append(keys[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: rk}) 269 } 270 if d.slowCacheKey != nil { 271 keys[i] = append(keys[i], CacheKeyWithSelector{CacheKey: ExportableCacheKey{CacheKey: d.slowCacheKey.CacheKey, Exporter: &exporter{k: d.slowCacheKey.CacheKey}}}) 272 } 273 } 274 } 275 276 k := NewCacheKey(e.cacheMap.Digest, e.edge.Index) 277 k.deps = keys 278 279 return k 280} 281 282// slow cache keys can be computed in 2 phases if there are multiple deps. 283// first evaluate ones that didn't match any definition based keys 284func (e *edge) skipPhase2SlowCache(dep *dep) bool { 285 isPhase1 := false 286 for _, dep := range e.deps { 287 if (!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil || dep.state < edgeStatusCacheSlow) && len(dep.keyMap) == 0 { 288 isPhase1 = true 289 break 290 } 291 } 292 293 if isPhase1 && !dep.slowCacheComplete && e.slowCacheFunc(dep) != nil && len(dep.keyMap) > 0 { 294 return true 295 } 296 return false 297} 298 299func (e *edge) skipPhase2FastCache(dep *dep) bool { 300 isPhase1 := false 301 for _, dep := range e.deps { 302 if e.cacheMap == nil || len(dep.keyMap) == 0 && ((!dep.slowCacheComplete && e.slowCacheFunc(dep) != nil) || (dep.state < edgeStatusComplete && e.slowCacheFunc(dep) == nil)) { 303 isPhase1 = true 304 break 305 } 306 } 307 308 if isPhase1 && len(dep.keyMap) > 0 { 309 return true 310 } 311 return false 312} 313 314// unpark is called by the scheduler with incoming requests and updates for 315// previous calls. 316// To avoid deadlocks and resource leaks this function needs to follow 317// following rules: 318// 1) this function needs to return unclosed outgoing requests if some incoming 319// requests were not completed 320// 2) this function may not return outgoing requests if it has completed all 321// incoming requests 322func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory) { 323 // process all incoming changes 324 depChanged := false 325 for _, upt := range updates { 326 if changed := e.processUpdate(upt); changed { 327 depChanged = true 328 } 329 } 330 331 if depChanged { 332 // the dep responses had changes. need to reevaluate edge state 333 e.recalcCurrentState() 334 } 335 336 desiredState, done := e.respondToIncoming(incoming, allPipes) 337 if done { 338 return 339 } 340 341 cacheMapReq := false 342 // set up new outgoing requests if needed 343 if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) { 344 index := e.cacheMapIndex 345 e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { 346 cm, err := e.op.CacheMap(ctx, index) 347 return cm, errors.Wrap(err, "failed to load cache key") 348 }) 349 cacheMapReq = true 350 } 351 352 // execute op 353 if e.execReq == nil && desiredState == edgeStatusComplete { 354 if ok := e.execIfPossible(f); ok { 355 return 356 } 357 } 358 359 if e.execReq == nil { 360 if added := e.createInputRequests(desiredState, f, false); !added && !e.hasActiveOutgoing && !cacheMapReq { 361 logrus.Errorf("buildkit scheluding error: leaving incoming open. forcing solve. Please report this with BUILDKIT_SCHEDULER_DEBUG=1") 362 debugSchedulerPreUnpark(e, incoming, updates, allPipes) 363 e.createInputRequests(desiredState, f, true) 364 } 365 } 366 367} 368 369func (e *edge) makeExportable(k *CacheKey, records []*CacheRecord) ExportableCacheKey { 370 return ExportableCacheKey{ 371 CacheKey: k, 372 Exporter: &exporter{k: k, records: records, override: e.edge.Vertex.Options().ExportCache}, 373 } 374} 375 376func (e *edge) markFailed(f *pipeFactory, err error) { 377 e.err = err 378 e.postpone(f) 379} 380 381// processUpdate is called by unpark for every updated pipe request 382func (e *edge) processUpdate(upt pipe.Receiver) (depChanged bool) { 383 // response for cachemap request 384 if upt == e.cacheMapReq && upt.Status().Completed { 385 if err := upt.Status().Err; err != nil { 386 e.cacheMapReq = nil 387 if !upt.Status().Canceled && e.err == nil { 388 e.err = err 389 } 390 } else { 391 resp := upt.Status().Value.(*cacheMapResp) 392 e.cacheMap = resp.CacheMap 393 e.cacheMapDone = resp.complete 394 e.cacheMapIndex++ 395 if len(e.deps) == 0 { 396 e.cacheMapDigests = append(e.cacheMapDigests, e.cacheMap.Digest) 397 if !e.op.IgnoreCache() { 398 keys, err := e.op.Cache().Query(nil, 0, e.cacheMap.Digest, e.edge.Index) 399 if err != nil { 400 logrus.Error(errors.Wrap(err, "invalid query response")) // make the build fail for this error 401 } else { 402 for _, k := range keys { 403 records, err := e.op.Cache().Records(k) 404 if err != nil { 405 logrus.Errorf("error receiving cache records: %v", err) 406 continue 407 } 408 409 for _, r := range records { 410 e.cacheRecords[r.ID] = r 411 } 412 413 e.keys = append(e.keys, e.makeExportable(k, records)) 414 } 415 } 416 } 417 e.state = edgeStatusCacheSlow 418 } 419 if e.allDepsHaveKeys(false) { 420 e.keysDidChange = true 421 } 422 // probe keys that were loaded before cache map 423 for i, dep := range e.deps { 424 e.probeCache(dep, withSelector(dep.keys, e.cacheMap.Deps[i].Selector)) 425 e.checkDepMatchPossible(dep) 426 } 427 if !e.cacheMapDone { 428 e.cacheMapReq = nil 429 } 430 } 431 return true 432 } 433 434 // response for exec request 435 if upt == e.execReq && upt.Status().Completed { 436 if err := upt.Status().Err; err != nil { 437 e.execReq = nil 438 if e.execCacheLoad { 439 for k := range e.cacheRecordsLoaded { 440 delete(e.cacheRecords, k) 441 } 442 } else if !upt.Status().Canceled && e.err == nil { 443 e.err = err 444 } 445 } else { 446 e.result = NewSharedCachedResult(upt.Status().Value.(CachedResult)) 447 e.state = edgeStatusComplete 448 } 449 return true 450 } 451 452 // response for requests to dependencies 453 if dep, ok := e.depRequests[upt]; ok { 454 if err := upt.Status().Err; !upt.Status().Canceled && upt.Status().Completed && err != nil { 455 if e.err == nil { 456 e.err = err 457 } 458 dep.err = err 459 } 460 461 state := upt.Status().Value.(*edgeState) 462 463 if len(dep.keys) < len(state.keys) { 464 newKeys := state.keys[len(dep.keys):] 465 if e.cacheMap != nil { 466 e.probeCache(dep, withSelector(newKeys, e.cacheMap.Deps[dep.index].Selector)) 467 dep.edgeState.keys = state.keys 468 if e.allDepsHaveKeys(false) { 469 e.keysDidChange = true 470 } 471 } 472 depChanged = true 473 } 474 if dep.state != edgeStatusComplete && state.state == edgeStatusComplete { 475 e.keysDidChange = true 476 } 477 478 recheck := state.state != dep.state 479 480 dep.edgeState = *state 481 482 if recheck && e.cacheMap != nil { 483 e.checkDepMatchPossible(dep) 484 depChanged = true 485 } 486 487 return 488 } 489 490 // response for result based cache function 491 for i, dep := range e.deps { 492 if upt == dep.slowCacheReq && upt.Status().Completed { 493 if err := upt.Status().Err; err != nil { 494 dep.slowCacheReq = nil 495 if !upt.Status().Canceled && e.err == nil { 496 e.err = upt.Status().Err 497 } 498 } else if !dep.slowCacheComplete { 499 dgst := upt.Status().Value.(digest.Digest) 500 if e.cacheMap.Deps[int(dep.index)].ComputeDigestFunc != nil && dgst != "" { 501 k := NewCacheKey(dgst, -1) 502 dep.slowCacheKey = &ExportableCacheKey{CacheKey: k, Exporter: &exporter{k: k}} 503 slowKeyExp := CacheKeyWithSelector{CacheKey: *dep.slowCacheKey} 504 defKeys := make([]CacheKeyWithSelector, 0, len(dep.result.CacheKeys())) 505 for _, dk := range dep.result.CacheKeys() { 506 defKeys = append(defKeys, CacheKeyWithSelector{CacheKey: dk, Selector: e.cacheMap.Deps[i].Selector}) 507 } 508 dep.slowCacheFoundKey = e.probeCache(dep, []CacheKeyWithSelector{slowKeyExp}) 509 510 // connect def key to slow key 511 e.op.Cache().Query(append(defKeys, slowKeyExp), dep.index, e.cacheMap.Digest, e.edge.Index) 512 } 513 514 dep.slowCacheComplete = true 515 e.keysDidChange = true 516 e.checkDepMatchPossible(dep) // not matching key here doesn't set nocachematch possible to true 517 } 518 return true 519 } 520 } 521 522 return 523} 524 525// recalcCurrentState is called by unpark to recompute internal state after 526// the state of dependencies has changed 527func (e *edge) recalcCurrentState() { 528 // TODO: fast pass to detect incomplete results 529 newKeys := map[string]*CacheKey{} 530 531 for i, dep := range e.deps { 532 if i == 0 { 533 for id, k := range dep.keyMap { 534 if _, ok := e.keyMap[id]; ok { 535 continue 536 } 537 newKeys[id] = k 538 } 539 } else { 540 for id := range newKeys { 541 if _, ok := dep.keyMap[id]; !ok { 542 delete(newKeys, id) 543 } 544 } 545 } 546 if len(newKeys) == 0 { 547 break 548 } 549 } 550 551 for key := range newKeys { 552 e.keyMap[key] = struct{}{} 553 } 554 555 for _, r := range newKeys { 556 // TODO: add all deps automatically 557 mergedKey := r.clone() 558 mergedKey.deps = make([][]CacheKeyWithSelector, len(e.deps)) 559 for i, dep := range e.deps { 560 if dep.result != nil { 561 for _, dk := range dep.result.CacheKeys() { 562 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: dk}) 563 } 564 if dep.slowCacheKey != nil { 565 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{CacheKey: *dep.slowCacheKey}) 566 } 567 } else { 568 for _, k := range dep.keys { 569 mergedKey.deps[i] = append(mergedKey.deps[i], CacheKeyWithSelector{Selector: e.cacheMap.Deps[i].Selector, CacheKey: k}) 570 } 571 } 572 } 573 574 records, err := e.op.Cache().Records(mergedKey) 575 if err != nil { 576 logrus.Errorf("error receiving cache records: %v", err) 577 continue 578 } 579 580 for _, r := range records { 581 if _, ok := e.cacheRecordsLoaded[r.ID]; !ok { 582 e.cacheRecords[r.ID] = r 583 } 584 } 585 586 e.keys = append(e.keys, e.makeExportable(mergedKey, records)) 587 } 588 589 // detect lower/upper bound for current state 590 allDepsCompletedCacheFast := e.cacheMap != nil 591 allDepsCompletedCacheSlow := e.cacheMap != nil 592 allDepsStateCacheSlow := true 593 allDepsCompleted := true 594 stLow := edgeStatusInitial // minimal possible state 595 stHigh := edgeStatusCacheSlow // maximum possible state 596 if e.cacheMap != nil { 597 for _, dep := range e.deps { 598 isSlowCacheIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) 599 isSlowIncomplete := (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) 600 601 if dep.state > stLow && len(dep.keyMap) == 0 && !isSlowIncomplete { 602 stLow = dep.state 603 if stLow > edgeStatusCacheSlow { 604 stLow = edgeStatusCacheSlow 605 } 606 } 607 effectiveState := dep.state 608 if dep.state == edgeStatusCacheSlow && isSlowCacheIncomplete { 609 effectiveState = edgeStatusCacheFast 610 } 611 if dep.state == edgeStatusComplete && isSlowCacheIncomplete { 612 effectiveState = edgeStatusCacheFast 613 } 614 if effectiveState < stHigh { 615 stHigh = effectiveState 616 } 617 if isSlowIncomplete || dep.state < edgeStatusComplete { 618 allDepsCompleted = false 619 } 620 if dep.state < edgeStatusCacheFast { 621 allDepsCompletedCacheFast = false 622 } 623 if isSlowCacheIncomplete || dep.state < edgeStatusCacheSlow { 624 allDepsCompletedCacheSlow = false 625 } 626 if dep.state < edgeStatusCacheSlow && len(dep.keyMap) == 0 { 627 allDepsStateCacheSlow = false 628 } 629 } 630 if stLow > e.state { 631 e.state = stLow 632 } 633 if stHigh > e.state { 634 e.state = stHigh 635 } 636 if !e.cacheMapDone && len(e.keys) == 0 { 637 e.state = edgeStatusInitial 638 } 639 640 e.allDepsCompletedCacheFast = e.cacheMapDone && allDepsCompletedCacheFast 641 e.allDepsCompletedCacheSlow = e.cacheMapDone && allDepsCompletedCacheSlow 642 e.allDepsStateCacheSlow = e.cacheMapDone && allDepsStateCacheSlow 643 e.allDepsCompleted = e.cacheMapDone && allDepsCompleted 644 645 if e.allDepsStateCacheSlow && len(e.cacheRecords) > 0 && e.state == edgeStatusCacheFast { 646 openKeys := map[string]struct{}{} 647 for _, dep := range e.deps { 648 isSlowIncomplete := e.slowCacheFunc(dep) != nil && (dep.state == edgeStatusCacheSlow || (dep.state == edgeStatusComplete && !dep.slowCacheComplete)) 649 if !isSlowIncomplete { 650 openDepKeys := map[string]struct{}{} 651 for key := range dep.keyMap { 652 if _, ok := e.keyMap[key]; !ok { 653 openDepKeys[key] = struct{}{} 654 } 655 } 656 if len(openKeys) != 0 { 657 for k := range openKeys { 658 if _, ok := openDepKeys[k]; !ok { 659 delete(openKeys, k) 660 } 661 } 662 } else { 663 openKeys = openDepKeys 664 } 665 if len(openKeys) == 0 { 666 e.state = edgeStatusCacheSlow 667 if debugScheduler { 668 logrus.Debugf("upgrade to cache-slow because no open keys") 669 } 670 } 671 } 672 } 673 } 674 } 675} 676 677// respondToIncoming responds to all incoming requests. completing or 678// updating them when possible 679func (e *edge) respondToIncoming(incoming []pipe.Sender, allPipes []pipe.Receiver) (edgeStatusType, bool) { 680 // detect the result state for the requests 681 allIncomingCanComplete := true 682 desiredState := e.state 683 allCanceled := true 684 685 // check incoming requests 686 // check if all requests can be either answered or canceled 687 if !e.isComplete() { 688 for _, req := range incoming { 689 if !req.Request().Canceled { 690 allCanceled = false 691 if r := req.Request().Payload.(*edgeRequest); desiredState < r.desiredState { 692 desiredState = r.desiredState 693 if e.hasActiveOutgoing || r.desiredState == edgeStatusComplete || r.currentKeys == len(e.keys) { 694 allIncomingCanComplete = false 695 } 696 } 697 } 698 } 699 } 700 701 // do not set allIncomingCanComplete if active ongoing can modify the state 702 if !allCanceled && e.state < edgeStatusComplete && len(e.keys) == 0 && e.hasActiveOutgoing { 703 allIncomingCanComplete = false 704 } 705 706 if debugScheduler { 707 logrus.Debugf("status state=%s cancomplete=%v hasouts=%v noPossibleCache=%v depsCacheFast=%v keys=%d cacheRecords=%d", e.state, allIncomingCanComplete, e.hasActiveOutgoing, e.noCacheMatchPossible, e.allDepsCompletedCacheFast, len(e.keys), len(e.cacheRecords)) 708 } 709 710 if allIncomingCanComplete && e.hasActiveOutgoing { 711 // cancel all current requests 712 for _, p := range allPipes { 713 p.Cancel() 714 } 715 716 // can close all but one requests 717 var leaveOpen pipe.Sender 718 for _, req := range incoming { 719 if !req.Request().Canceled { 720 leaveOpen = req 721 break 722 } 723 } 724 for _, req := range incoming { 725 if leaveOpen == nil || leaveOpen == req { 726 leaveOpen = req 727 continue 728 } 729 e.finishIncoming(req) 730 } 731 return desiredState, true 732 } 733 734 // can complete, finish and return 735 if allIncomingCanComplete && !e.hasActiveOutgoing { 736 for _, req := range incoming { 737 e.finishIncoming(req) 738 } 739 return desiredState, true 740 } 741 742 // update incoming based on current state 743 for _, req := range incoming { 744 r := req.Request().Payload.(*edgeRequest) 745 if req.Request().Canceled { 746 e.finishIncoming(req) 747 } else if !e.hasActiveOutgoing && e.state >= r.desiredState { 748 e.finishIncoming(req) 749 } else if !isEqualState(r.currentState, e.edgeState) && !req.Request().Canceled { 750 e.updateIncoming(req) 751 } 752 } 753 return desiredState, false 754} 755 756// createInputRequests creates new requests for dependencies or async functions 757// that need to complete to continue processing the edge 758func (e *edge) createInputRequests(desiredState edgeStatusType, f *pipeFactory, force bool) bool { 759 addedNew := false 760 761 // initialize deps state 762 if e.deps == nil { 763 e.depRequests = make(map[pipe.Receiver]*dep) 764 e.deps = make([]*dep, 0, len(e.edge.Vertex.Inputs())) 765 for i := range e.edge.Vertex.Inputs() { 766 e.deps = append(e.deps, newDep(Index(i))) 767 } 768 } 769 770 // cycle all dependencies. set up outgoing requests if needed 771 for _, dep := range e.deps { 772 desiredStateDep := dep.state 773 774 if e.noCacheMatchPossible || force { 775 desiredStateDep = edgeStatusComplete 776 } else if dep.state == edgeStatusInitial && desiredState > dep.state { 777 desiredStateDep = edgeStatusCacheFast 778 } else if dep.state == edgeStatusCacheFast && desiredState > dep.state { 779 // wait all deps to complete cache fast before continuing with slow cache 780 if (e.allDepsCompletedCacheFast && len(e.keys) == 0) || len(dep.keyMap) == 0 || e.allDepsHaveKeys(true) { 781 if !e.skipPhase2FastCache(dep) && e.cacheMap != nil { 782 desiredStateDep = edgeStatusCacheSlow 783 } 784 } 785 } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && desiredState == edgeStatusComplete { 786 // if all deps have completed cache-slow or content based cache for input is available 787 if (len(dep.keyMap) == 0 || e.allDepsCompletedCacheSlow || (!e.skipPhase2FastCache(dep) && e.slowCacheFunc(dep) != nil)) && (len(e.cacheRecords) == 0) { 788 if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { 789 desiredStateDep = edgeStatusComplete 790 } 791 } 792 } else if e.cacheMap != nil && dep.state == edgeStatusCacheSlow && e.slowCacheFunc(dep) != nil && desiredState == edgeStatusCacheSlow { 793 if len(dep.keyMap) == 0 || !e.skipPhase2SlowCache(dep) { 794 desiredStateDep = edgeStatusComplete 795 } 796 } 797 798 // outgoing request is needed 799 if dep.state < desiredStateDep { 800 addNew := true 801 if dep.req != nil && !dep.req.Status().Completed { 802 if dep.req.Request().(*edgeRequest).desiredState != desiredStateDep { 803 dep.req.Cancel() 804 } else { 805 addNew = false 806 } 807 } 808 if addNew { 809 req := f.NewInputRequest(e.edge.Vertex.Inputs()[int(dep.index)], &edgeRequest{ 810 currentState: dep.edgeState, 811 desiredState: desiredStateDep, 812 currentKeys: len(dep.keys), 813 }) 814 e.depRequests[req] = dep 815 dep.req = req 816 addedNew = true 817 } 818 } 819 // initialize function to compute cache key based on dependency result 820 if dep.state == edgeStatusComplete && dep.slowCacheReq == nil && (e.slowCacheFunc(dep) != nil || e.preprocessFunc(dep) != nil) && e.cacheMap != nil { 821 pfn := e.preprocessFunc(dep) 822 fn := e.slowCacheFunc(dep) 823 res := dep.result 824 func(pfn PreprocessFunc, fn ResultBasedCacheFunc, res Result, index Index) { 825 dep.slowCacheReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) { 826 v, err := e.op.CalcSlowCache(ctx, index, pfn, fn, res) 827 return v, errors.Wrap(err, "failed to compute cache key") 828 }) 829 }(pfn, fn, res, dep.index) 830 addedNew = true 831 } 832 } 833 return addedNew 834} 835 836// execIfPossible creates a request for getting the edge result if there is 837// enough state 838func (e *edge) execIfPossible(f *pipeFactory) bool { 839 if len(e.cacheRecords) > 0 { 840 if e.keysDidChange { 841 e.postpone(f) 842 return true 843 } 844 e.execReq = f.NewFuncRequest(e.loadCache) 845 e.execCacheLoad = true 846 for req := range e.depRequests { 847 req.Cancel() 848 } 849 return true 850 } else if e.allDepsCompleted { 851 if e.keysDidChange { 852 e.postpone(f) 853 return true 854 } 855 e.execReq = f.NewFuncRequest(e.execOp) 856 e.execCacheLoad = false 857 return true 858 } 859 return false 860} 861 862// postpone delays exec to next unpark invocation if we have unprocessed keys 863func (e *edge) postpone(f *pipeFactory) { 864 f.NewFuncRequest(func(context.Context) (interface{}, error) { 865 return nil, nil 866 }) 867} 868 869// loadCache creates a request to load edge result from cache 870func (e *edge) loadCache(ctx context.Context) (interface{}, error) { 871 recs := make([]*CacheRecord, 0, len(e.cacheRecords)) 872 for _, r := range e.cacheRecords { 873 recs = append(recs, r) 874 } 875 876 rec := getBestResult(recs) 877 e.cacheRecordsLoaded[rec.ID] = struct{}{} 878 879 logrus.Debugf("load cache for %s with %s", e.edge.Vertex.Name(), rec.ID) 880 res, err := e.op.LoadCache(ctx, rec) 881 if err != nil { 882 logrus.Debugf("load cache for %s err: %v", e.edge.Vertex.Name(), err) 883 return nil, errors.Wrap(err, "failed to load cache") 884 } 885 886 return NewCachedResult(res, []ExportableCacheKey{{CacheKey: rec.key, Exporter: &exporter{k: rec.key, record: rec, edge: e}}}), nil 887} 888 889// execOp creates a request to execute the vertex operation 890func (e *edge) execOp(ctx context.Context) (interface{}, error) { 891 cacheKeys, inputs := e.commitOptions() 892 results, subExporters, err := e.op.Exec(ctx, toResultSlice(inputs)) 893 if err != nil { 894 return nil, errors.WithStack(err) 895 } 896 897 index := e.edge.Index 898 if len(results) <= int(index) { 899 return nil, errors.Errorf("invalid response from exec need %d index but %d results received", index, len(results)) 900 } 901 902 res := results[int(index)] 903 904 for i := range results { 905 if i != int(index) { 906 go results[i].Release(context.TODO()) 907 } 908 } 909 910 var exporters []CacheExporter 911 912 for _, cacheKey := range cacheKeys { 913 ck, err := e.op.Cache().Save(cacheKey, res, time.Now()) 914 if err != nil { 915 return nil, err 916 } 917 918 if exp, ok := ck.Exporter.(*exporter); ok { 919 exp.edge = e 920 } 921 922 exps := make([]CacheExporter, 0, len(subExporters)) 923 for _, exp := range subExporters { 924 exps = append(exps, exp.Exporter) 925 } 926 927 exporters = append(exporters, ck.Exporter) 928 exporters = append(exporters, exps...) 929 } 930 931 ek := make([]ExportableCacheKey, 0, len(cacheKeys)) 932 for _, ck := range cacheKeys { 933 ek = append(ek, ExportableCacheKey{ 934 CacheKey: ck, 935 Exporter: &mergedExporter{exporters: exporters}, 936 }) 937 } 938 939 return NewCachedResult(res, ek), nil 940} 941 942func toResultSlice(cres []CachedResult) (out []Result) { 943 out = make([]Result, len(cres)) 944 for i := range cres { 945 out[i] = cres[i].(Result) 946 } 947 return out 948} 949 950func isEqualState(s1, s2 edgeState) bool { 951 if s1.state != s2.state || s1.result != s2.result || s1.cacheMap != s2.cacheMap || len(s1.keys) != len(s2.keys) { 952 return false 953 } 954 return true 955} 956 957func withSelector(keys []ExportableCacheKey, selector digest.Digest) []CacheKeyWithSelector { 958 out := make([]CacheKeyWithSelector, len(keys)) 959 for i, k := range keys { 960 out[i] = CacheKeyWithSelector{Selector: selector, CacheKey: k} 961 } 962 return out 963} 964