1package scheduler 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 8 "code.cloudfoundry.org/lager" 9 "github.com/concourse/concourse/atc/db" 10 "github.com/concourse/concourse/tracing" 11) 12 13//go:generate counterfeiter . Algorithm 14 15type Algorithm interface { 16 Compute( 17 context.Context, 18 db.Job, 19 db.InputConfigs, 20 ) (db.InputMapping, bool, bool, error) 21} 22 23type Scheduler struct { 24 Algorithm Algorithm 25 BuildStarter BuildStarter 26} 27 28func (s *Scheduler) Schedule( 29 ctx context.Context, 30 logger lager.Logger, 31 job db.SchedulerJob, 32) (bool, error) { 33 jobInputs, err := job.AlgorithmInputs() 34 if err != nil { 35 return false, fmt.Errorf("inputs: %w", err) 36 } 37 38 inputMapping, resolved, runAgain, err := s.Algorithm.Compute(ctx, job, jobInputs) 39 if err != nil { 40 return false, fmt.Errorf("compute inputs: %w", err) 41 } 42 43 if runAgain { 44 err = job.RequestSchedule() 45 if err != nil { 46 return false, fmt.Errorf("request schedule: %w", err) 47 } 48 } 49 50 err = job.SaveNextInputMapping(inputMapping, resolved) 51 if err != nil { 52 return false, fmt.Errorf("save next input mapping: %w", err) 53 } 54 55 err = s.ensurePendingBuildExists(ctx, logger, job, jobInputs) 56 if err != nil { 57 return false, err 58 } 59 60 return s.BuildStarter.TryStartPendingBuildsForJob(logger, job, jobInputs) 61} 62 63func (s *Scheduler) ensurePendingBuildExists( 64 ctx context.Context, 65 logger lager.Logger, 66 job db.SchedulerJob, 67 jobInputs db.InputConfigs, 68) error { 69 buildInputs, satisfiableInputs, err := job.GetFullNextBuildInputs() 70 if err != nil { 71 return fmt.Errorf("get next build inputs: %w", err) 72 } 73 74 if !satisfiableInputs { 75 logger.Debug("next-build-inputs-not-determined") 76 return nil 77 } 78 79 inputMapping := map[string]db.BuildInput{} 80 for _, input := range buildInputs { 81 inputMapping[input.Name] = input 82 } 83 84 var hasNewInputs bool 85 for _, inputConfig := range jobInputs { 86 inputSource, ok := inputMapping[inputConfig.Name] 87 88 //trigger: true, and the version has not been used 89 if ok && inputSource.FirstOccurrence { 90 hasNewInputs = true 91 if inputConfig.Trigger { 92 version, _ := json.Marshal(inputSource.Version) 93 spanCtx, _ := tracing.StartSpanLinkedToFollowing( 94 ctx, 95 inputSource, 96 "job.EnsurePendingBuildExists", 97 tracing.Attrs{ 98 "team": job.TeamName(), 99 "pipeline": job.PipelineName(), 100 "job": job.Name(), 101 "input": inputSource.Name, 102 "version": string(version), 103 }, 104 ) 105 err := job.EnsurePendingBuildExists(spanCtx) 106 if err != nil { 107 return fmt.Errorf("ensure pending build exists: %w", err) 108 } 109 110 break 111 } 112 } 113 } 114 115 if hasNewInputs != job.HasNewInputs() { 116 if err := job.SetHasNewInputs(hasNewInputs); err != nil { 117 return fmt.Errorf("set has new inputs: %w", err) 118 } 119 } 120 121 return nil 122} 123