1package scheduler 2 3import ( 4 "context" 5 "fmt" 6 "os" 7 "runtime/debug" 8 "strconv" 9 "sync" 10 "time" 11 12 "code.cloudfoundry.org/lager" 13 "github.com/concourse/concourse/atc/db" 14 "github.com/concourse/concourse/atc/metric" 15 "github.com/concourse/concourse/tracing" 16 "go.opentelemetry.io/otel/api/key" 17) 18 19//go:generate counterfeiter . BuildScheduler 20 21type BuildScheduler interface { 22 Schedule( 23 ctx context.Context, 24 logger lager.Logger, 25 job db.SchedulerJob, 26 ) (bool, error) 27} 28 29type Runner struct { 30 logger lager.Logger 31 jobFactory db.JobFactory 32 scheduler BuildScheduler 33 34 guardJobScheduling chan struct{} 35 running *sync.Map 36} 37 38func NewRunner(logger lager.Logger, jobFactory db.JobFactory, scheduler BuildScheduler, maxJobs uint64) *Runner { 39 return &Runner{ 40 logger: logger, 41 jobFactory: jobFactory, 42 scheduler: scheduler, 43 44 guardJobScheduling: make(chan struct{}, maxJobs), 45 running: &sync.Map{}, 46 } 47} 48 49func (s *Runner) Run(ctx context.Context) error { 50 sLog := s.logger.Session("run") 51 52 sLog.Debug("start") 53 defer sLog.Debug("done") 54 spanCtx, span := tracing.StartSpan(ctx, "scheduler.Run", nil) 55 defer span.End() 56 57 jobs, err := s.jobFactory.JobsToSchedule() 58 if err != nil { 59 return fmt.Errorf("find jobs to schedule: %w", err) 60 } 61 62 for _, j := range jobs { 63 if _, exists := s.running.LoadOrStore(j.ID(), true); exists { 64 // already scheduling this job 65 continue 66 } 67 68 s.guardJobScheduling <- struct{}{} 69 70 jLog := sLog.Session("job", lager.Data{"job": j.Name()}) 71 72 go func(job db.SchedulerJob) { 73 loggerData := lager.Data{ 74 "job_id": strconv.Itoa(job.ID()), 75 "job_name": job.Name(), 76 "pipeline_name": job.PipelineName(), 77 "team_name": job.TeamName(), 78 } 79 defer func() { 80 if r := recover(); r != nil { 81 err = fmt.Errorf("panic in scheduler run %s: %v", loggerData, r) 82 83 fmt.Fprintf(os.Stderr, "%s\n %s\n", err.Error(), string(debug.Stack())) 84 jLog.Error("panic-in-scheduler-run", err) 85 } 86 }() 87 defer func() { 88 <-s.guardJobScheduling 89 s.running.Delete(job.ID()) 90 }() 91 92 schedulingLock, acquired, err := job.AcquireSchedulingLock(sLog) 93 if err != nil { 94 jLog.Error("failed-to-acquire-lock", err) 95 return 96 } 97 98 if !acquired { 99 return 100 } 101 102 defer schedulingLock.Release() 103 104 err = s.scheduleJob(spanCtx, sLog, job) 105 if err != nil { 106 jLog.Error("failed-to-schedule-job", err) 107 } 108 }(j) 109 } 110 111 return nil 112} 113 114func (s *Runner) scheduleJob(ctx context.Context, logger lager.Logger, job db.SchedulerJob) error { 115 metric.Metrics.JobsScheduling.Inc() 116 defer metric.Metrics.JobsScheduling.Dec() 117 defer metric.Metrics.JobsScheduled.Inc() 118 119 logger = logger.Session("schedule-job", lager.Data{"job": job.Name()}) 120 spanCtx, span := tracing.StartSpan(ctx, "schedule-job", tracing.Attrs{ 121 "team": job.TeamName(), 122 "pipeline": job.PipelineName(), 123 "job": job.Name(), 124 }) 125 defer span.End() 126 127 logger.Debug("schedule") 128 129 // Grabs out the requested time that triggered off the job schedule in 130 // order to set the last scheduled to the exact time of this triggering 131 // request 132 requestedTime := job.ScheduleRequestedTime() 133 134 found, err := job.Reload() 135 if err != nil { 136 return fmt.Errorf("reload job: %w", err) 137 } 138 139 if !found { 140 logger.Debug("could-not-find-job-to-reload") 141 return nil 142 } 143 144 jStart := time.Now() 145 146 needsRetry, err := s.scheduler.Schedule( 147 spanCtx, 148 logger, 149 job, 150 ) 151 if err != nil { 152 return fmt.Errorf("schedule job: %w", err) 153 } 154 155 span.SetAttributes(key.New("needs-retry").Bool(needsRetry)) 156 if !needsRetry { 157 err = job.UpdateLastScheduled(requestedTime) 158 if err != nil { 159 logger.Error("failed-to-update-last-scheduled", err, lager.Data{"job": job.Name()}) 160 return fmt.Errorf("update last scheduled: %w", err) 161 } 162 } 163 164 metric.SchedulingJobDuration{ 165 PipelineName: job.PipelineName(), 166 JobName: job.Name(), 167 JobID: job.ID(), 168 Duration: time.Since(jStart), 169 }.Emit(logger) 170 171 return nil 172} 173