1package scheduler
2
3import (
4	"context"
5	"fmt"
6	"os"
7	"runtime/debug"
8	"strconv"
9	"sync"
10	"time"
11
12	"code.cloudfoundry.org/lager"
13	"github.com/concourse/concourse/atc/db"
14	"github.com/concourse/concourse/atc/metric"
15	"github.com/concourse/concourse/tracing"
16	"go.opentelemetry.io/otel/api/key"
17)
18
19//go:generate counterfeiter . BuildScheduler
20
21type BuildScheduler interface {
22	Schedule(
23		ctx context.Context,
24		logger lager.Logger,
25		job db.SchedulerJob,
26	) (bool, error)
27}
28
29type Runner struct {
30	logger     lager.Logger
31	jobFactory db.JobFactory
32	scheduler  BuildScheduler
33
34	guardJobScheduling chan struct{}
35	running            *sync.Map
36}
37
38func NewRunner(logger lager.Logger, jobFactory db.JobFactory, scheduler BuildScheduler, maxJobs uint64) *Runner {
39	return &Runner{
40		logger:     logger,
41		jobFactory: jobFactory,
42		scheduler:  scheduler,
43
44		guardJobScheduling: make(chan struct{}, maxJobs),
45		running:            &sync.Map{},
46	}
47}
48
49func (s *Runner) Run(ctx context.Context) error {
50	sLog := s.logger.Session("run")
51
52	sLog.Debug("start")
53	defer sLog.Debug("done")
54	spanCtx, span := tracing.StartSpan(ctx, "scheduler.Run", nil)
55	defer span.End()
56
57	jobs, err := s.jobFactory.JobsToSchedule()
58	if err != nil {
59		return fmt.Errorf("find jobs to schedule: %w", err)
60	}
61
62	for _, j := range jobs {
63		if _, exists := s.running.LoadOrStore(j.ID(), true); exists {
64			// already scheduling this job
65			continue
66		}
67
68		s.guardJobScheduling <- struct{}{}
69
70		jLog := sLog.Session("job", lager.Data{"job": j.Name()})
71
72		go func(job db.SchedulerJob) {
73			loggerData := lager.Data{
74				"job_id":        strconv.Itoa(job.ID()),
75				"job_name":      job.Name(),
76				"pipeline_name": job.PipelineName(),
77				"team_name":     job.TeamName(),
78			}
79			defer func() {
80				if r := recover(); r != nil {
81					err = fmt.Errorf("panic in scheduler run %s: %v", loggerData, r)
82
83					fmt.Fprintf(os.Stderr, "%s\n %s\n", err.Error(), string(debug.Stack()))
84					jLog.Error("panic-in-scheduler-run", err)
85				}
86			}()
87			defer func() {
88				<-s.guardJobScheduling
89				s.running.Delete(job.ID())
90			}()
91
92			schedulingLock, acquired, err := job.AcquireSchedulingLock(sLog)
93			if err != nil {
94				jLog.Error("failed-to-acquire-lock", err)
95				return
96			}
97
98			if !acquired {
99				return
100			}
101
102			defer schedulingLock.Release()
103
104			err = s.scheduleJob(spanCtx, sLog, job)
105			if err != nil {
106				jLog.Error("failed-to-schedule-job", err)
107			}
108		}(j)
109	}
110
111	return nil
112}
113
114func (s *Runner) scheduleJob(ctx context.Context, logger lager.Logger, job db.SchedulerJob) error {
115	metric.Metrics.JobsScheduling.Inc()
116	defer metric.Metrics.JobsScheduling.Dec()
117	defer metric.Metrics.JobsScheduled.Inc()
118
119	logger = logger.Session("schedule-job", lager.Data{"job": job.Name()})
120	spanCtx, span := tracing.StartSpan(ctx, "schedule-job", tracing.Attrs{
121		"team":     job.TeamName(),
122		"pipeline": job.PipelineName(),
123		"job":      job.Name(),
124	})
125	defer span.End()
126
127	logger.Debug("schedule")
128
129	// Grabs out the requested time that triggered off the job schedule in
130	// order to set the last scheduled to the exact time of this triggering
131	// request
132	requestedTime := job.ScheduleRequestedTime()
133
134	found, err := job.Reload()
135	if err != nil {
136		return fmt.Errorf("reload job: %w", err)
137	}
138
139	if !found {
140		logger.Debug("could-not-find-job-to-reload")
141		return nil
142	}
143
144	jStart := time.Now()
145
146	needsRetry, err := s.scheduler.Schedule(
147		spanCtx,
148		logger,
149		job,
150	)
151	if err != nil {
152		return fmt.Errorf("schedule job: %w", err)
153	}
154
155	span.SetAttributes(key.New("needs-retry").Bool(needsRetry))
156	if !needsRetry {
157		err = job.UpdateLastScheduled(requestedTime)
158		if err != nil {
159			logger.Error("failed-to-update-last-scheduled", err, lager.Data{"job": job.Name()})
160			return fmt.Errorf("update last scheduled: %w", err)
161		}
162	}
163
164	metric.SchedulingJobDuration{
165		PipelineName: job.PipelineName(),
166		JobName:      job.Name(),
167		JobID:        job.ID(),
168		Duration:     time.Since(jStart),
169	}.Emit(logger)
170
171	return nil
172}
173