1package scheduler
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7
8	"code.cloudfoundry.org/lager"
9	"github.com/concourse/concourse/atc/db"
10	"github.com/concourse/concourse/tracing"
11)
12
13//go:generate counterfeiter . Algorithm
14
15type Algorithm interface {
16	Compute(
17		context.Context,
18		db.Job,
19		db.InputConfigs,
20	) (db.InputMapping, bool, bool, error)
21}
22
23type Scheduler struct {
24	Algorithm    Algorithm
25	BuildStarter BuildStarter
26}
27
28func (s *Scheduler) Schedule(
29	ctx context.Context,
30	logger lager.Logger,
31	job db.SchedulerJob,
32) (bool, error) {
33	jobInputs, err := job.AlgorithmInputs()
34	if err != nil {
35		return false, fmt.Errorf("inputs: %w", err)
36	}
37
38	inputMapping, resolved, runAgain, err := s.Algorithm.Compute(ctx, job, jobInputs)
39	if err != nil {
40		return false, fmt.Errorf("compute inputs: %w", err)
41	}
42
43	if runAgain {
44		err = job.RequestSchedule()
45		if err != nil {
46			return false, fmt.Errorf("request schedule: %w", err)
47		}
48	}
49
50	err = job.SaveNextInputMapping(inputMapping, resolved)
51	if err != nil {
52		return false, fmt.Errorf("save next input mapping: %w", err)
53	}
54
55	err = s.ensurePendingBuildExists(ctx, logger, job, jobInputs)
56	if err != nil {
57		return false, err
58	}
59
60	return s.BuildStarter.TryStartPendingBuildsForJob(logger, job, jobInputs)
61}
62
63func (s *Scheduler) ensurePendingBuildExists(
64	ctx context.Context,
65	logger lager.Logger,
66	job db.SchedulerJob,
67	jobInputs db.InputConfigs,
68) error {
69	buildInputs, satisfiableInputs, err := job.GetFullNextBuildInputs()
70	if err != nil {
71		return fmt.Errorf("get next build inputs: %w", err)
72	}
73
74	if !satisfiableInputs {
75		logger.Debug("next-build-inputs-not-determined")
76		return nil
77	}
78
79	inputMapping := map[string]db.BuildInput{}
80	for _, input := range buildInputs {
81		inputMapping[input.Name] = input
82	}
83
84	var hasNewInputs bool
85	for _, inputConfig := range jobInputs {
86		inputSource, ok := inputMapping[inputConfig.Name]
87
88		//trigger: true, and the version has not been used
89		if ok && inputSource.FirstOccurrence {
90			hasNewInputs = true
91			if inputConfig.Trigger {
92				version, _ := json.Marshal(inputSource.Version)
93				spanCtx, _ := tracing.StartSpanLinkedToFollowing(
94					ctx,
95					inputSource,
96					"job.EnsurePendingBuildExists",
97					tracing.Attrs{
98						"team":     job.TeamName(),
99						"pipeline": job.PipelineName(),
100						"job":      job.Name(),
101						"input":    inputSource.Name,
102						"version":  string(version),
103					},
104				)
105				err := job.EnsurePendingBuildExists(spanCtx)
106				if err != nil {
107					return fmt.Errorf("ensure pending build exists: %w", err)
108				}
109
110				break
111			}
112		}
113	}
114
115	if hasNewInputs != job.HasNewInputs() {
116		if err := job.SetHasNewInputs(hasNewInputs); err != nil {
117			return fmt.Errorf("set has new inputs: %w", err)
118		}
119	}
120
121	return nil
122}
123