1package db
2
3import (
4	"database/sql"
5	"encoding/json"
6	"sort"
7
8	sq "github.com/Masterminds/squirrel"
9	"github.com/concourse/concourse/atc"
10	"github.com/concourse/concourse/atc/db/lock"
11	"github.com/lib/pq"
12)
13
14//go:generate counterfeiter . JobFactory
15
16// XXX: This job factory object is not really a job factory anymore. It is
17// holding the responsibility for two very different things: constructing a
18// dashboard object and also a scheduler job object. Figure out what this is
19// trying to encapsulate or considering splitting this out!
20type JobFactory interface {
21	VisibleJobs([]string) (atc.Dashboard, error)
22	AllActiveJobs() (atc.Dashboard, error)
23	JobsToSchedule() (SchedulerJobs, error)
24}
25
26type jobFactory struct {
27	conn        Conn
28	lockFactory lock.LockFactory
29}
30
31func NewJobFactory(conn Conn, lockFactory lock.LockFactory) JobFactory {
32	return &jobFactory{
33		conn:        conn,
34		lockFactory: lockFactory,
35	}
36}
37
38type SchedulerJobs []SchedulerJob
39
40type SchedulerJob struct {
41	Job
42	Resources     SchedulerResources
43	ResourceTypes atc.VersionedResourceTypes
44}
45
46type SchedulerResources []SchedulerResource
47
48type SchedulerResource struct {
49	Name   string
50	Type   string
51	Source atc.Source
52}
53
54func (r *SchedulerResource) ApplySourceDefaults(resourceTypes atc.VersionedResourceTypes) {
55	parentType, found := resourceTypes.Lookup(r.Type)
56	if found {
57		r.Source = parentType.Defaults.Merge(r.Source)
58	} else {
59		defaults, found := atc.FindBaseResourceTypeDefaults(r.Type)
60		if found {
61			r.Source = defaults.Merge(r.Source)
62		}
63	}
64}
65
66func (resources SchedulerResources) Lookup(name string) (*SchedulerResource, bool) {
67	for _, resource := range resources {
68		if resource.Name == name {
69			return &resource, true
70		}
71	}
72
73	return nil, false
74}
75
76func (j *jobFactory) JobsToSchedule() (SchedulerJobs, error) {
77	tx, err := j.conn.Begin()
78	if err != nil {
79		return nil, err
80	}
81
82	defer tx.Rollback()
83
84	rows, err := jobsQuery.
85		Where(sq.Expr("j.schedule_requested > j.last_scheduled")).
86		Where(sq.Eq{
87			"j.active": true,
88			"j.paused": false,
89			"p.paused": false,
90		}).
91		RunWith(tx).
92		Query()
93	if err != nil {
94		return nil, err
95	}
96
97	jobs, err := scanJobs(j.conn, j.lockFactory, rows)
98	if err != nil {
99		return nil, err
100	}
101
102	var schedulerJobs SchedulerJobs
103	pipelineResourceTypes := make(map[int]ResourceTypes)
104	for _, job := range jobs {
105		rows, err := tx.Query(`WITH inputs AS (
106				SELECT ji.resource_id from job_inputs ji where ji.job_id = $1
107				UNION
108				SELECT jo.resource_id from job_outputs jo where jo.job_id = $1
109			)
110			SELECT r.name, r.type, r.config, r.nonce
111			From resources r
112			Join inputs i on i.resource_id = r.id`, job.ID())
113		if err != nil {
114			return nil, err
115		}
116
117		var schedulerResources SchedulerResources
118		for rows.Next() {
119			var name, type_ string
120			var configBlob []byte
121			var nonce sql.NullString
122
123			err = rows.Scan(&name, &type_, &configBlob, &nonce)
124			if err != nil {
125				return nil, err
126			}
127
128			defer Close(rows)
129
130			es := j.conn.EncryptionStrategy()
131
132			var noncense *string
133			if nonce.Valid {
134				noncense = &nonce.String
135			}
136
137			decryptedConfig, err := es.Decrypt(string(configBlob), noncense)
138			if err != nil {
139				return nil, err
140			}
141
142			var config atc.ResourceConfig
143			err = json.Unmarshal(decryptedConfig, &config)
144			if err != nil {
145				return nil, err
146			}
147
148			schedulerResources = append(schedulerResources, SchedulerResource{
149				Name:   name,
150				Type:   type_,
151				Source: config.Source,
152			})
153		}
154
155		var resourceTypes ResourceTypes
156		var found bool
157		resourceTypes, found = pipelineResourceTypes[job.PipelineID()]
158		if !found {
159			rows, err := resourceTypesQuery.
160				Where(sq.Eq{"r.pipeline_id": job.PipelineID()}).
161				OrderBy("r.name").
162				RunWith(tx).
163				Query()
164			if err != nil {
165				return nil, err
166			}
167
168			defer Close(rows)
169
170			for rows.Next() {
171				resourceType := newEmptyResourceType(j.conn, j.lockFactory)
172				err := scanResourceType(resourceType, rows)
173				if err != nil {
174					return nil, err
175				}
176
177				resourceTypes = append(resourceTypes, resourceType)
178			}
179
180			pipelineResourceTypes[job.PipelineID()] = resourceTypes
181		}
182
183		schedulerJobs = append(schedulerJobs, SchedulerJob{
184			Job:           job,
185			Resources:     schedulerResources,
186			ResourceTypes: resourceTypes.Deserialize(),
187		})
188	}
189
190	err = tx.Commit()
191	if err != nil {
192		return nil, err
193	}
194
195	return schedulerJobs, nil
196}
197
198func (j *jobFactory) VisibleJobs(teamNames []string) (atc.Dashboard, error) {
199	tx, err := j.conn.Begin()
200	if err != nil {
201		return nil, err
202	}
203
204	defer Rollback(tx)
205
206	dashboardFactory := newDashboardFactory(tx, sq.Or{
207		sq.Eq{"tm.name": teamNames},
208		sq.Eq{"p.public": true},
209	})
210
211	dashboard, err := dashboardFactory.buildDashboard()
212	if err != nil {
213		return nil, err
214	}
215
216	err = tx.Commit()
217	if err != nil {
218		return nil, err
219	}
220
221	return dashboard, nil
222}
223
224func (j *jobFactory) AllActiveJobs() (atc.Dashboard, error) {
225	tx, err := j.conn.Begin()
226	if err != nil {
227		return nil, err
228	}
229
230	defer Rollback(tx)
231
232	dashboardFactory := newDashboardFactory(tx, nil)
233	dashboard, err := dashboardFactory.buildDashboard()
234	if err != nil {
235		return nil, err
236	}
237
238	err = tx.Commit()
239	if err != nil {
240		return nil, err
241	}
242
243	return dashboard, nil
244}
245
246type dashboardFactory struct {
247	// Constraints that are used by the dashboard queries. For example, a job ID
248	// constraint so that the dashboard will only return the job I have access to
249	// see.
250	pred interface{}
251
252	tx Tx
253}
254
255func newDashboardFactory(tx Tx, pred interface{}) dashboardFactory {
256	return dashboardFactory{
257		pred: pred,
258		tx:   tx,
259	}
260}
261
262func (d dashboardFactory) buildDashboard() (atc.Dashboard, error) {
263	dashboard, err := d.constructJobsForDashboard()
264	if err != nil {
265		return nil, err
266	}
267
268	jobInputs, err := d.fetchJobInputs()
269	if err != nil {
270		return nil, err
271	}
272
273	jobOutputs, err := d.fetchJobOutputs()
274	if err != nil {
275		return nil, err
276	}
277
278	return d.combineJobInputsAndOutputsWithDashboardJobs(dashboard, jobInputs, jobOutputs), nil
279}
280
281func (d dashboardFactory) constructJobsForDashboard() (atc.Dashboard, error) {
282	rows, err := psql.Select("j.id", "j.name", "p.name", "j.paused", "j.has_new_inputs", "j.tags", "tm.name",
283		"l.id", "l.name", "l.status", "l.start_time", "l.end_time",
284		"n.id", "n.name", "n.status", "n.start_time", "n.end_time",
285		"t.id", "t.name", "t.status", "t.start_time", "t.end_time").
286		From("jobs j").
287		Join("pipelines p ON j.pipeline_id = p.id").
288		Join("teams tm ON p.team_id = tm.id").
289		LeftJoin("builds l on j.latest_completed_build_id = l.id").
290		LeftJoin("builds n on j.next_build_id = n.id").
291		LeftJoin("builds t on j.transition_build_id = t.id").
292		Where(sq.Eq{
293			"j.active": true,
294		}).
295		Where(d.pred).
296		OrderBy("j.id ASC").
297		RunWith(d.tx).
298		Query()
299	if err != nil {
300		return nil, err
301	}
302
303	type nullableBuild struct {
304		id        sql.NullInt64
305		name      sql.NullString
306		jobName   sql.NullString
307		status    sql.NullString
308		startTime pq.NullTime
309		endTime   pq.NullTime
310	}
311
312	var dashboard atc.Dashboard
313	for rows.Next() {
314		var (
315			f, n, t nullableBuild
316		)
317
318		j := atc.DashboardJob{}
319		err = rows.Scan(&j.ID, &j.Name, &j.PipelineName, &j.Paused, &j.HasNewInputs, pq.Array(&j.Groups), &j.TeamName,
320			&f.id, &f.name, &f.status, &f.startTime, &f.endTime,
321			&n.id, &n.name, &n.status, &n.startTime, &n.endTime,
322			&t.id, &t.name, &t.status, &t.startTime, &t.endTime)
323		if err != nil {
324			return nil, err
325		}
326
327		if f.id.Valid {
328			j.FinishedBuild = &atc.DashboardBuild{
329				ID:           int(f.id.Int64),
330				Name:         f.name.String,
331				JobName:      j.Name,
332				PipelineName: j.PipelineName,
333				TeamName:     j.TeamName,
334				Status:       f.status.String,
335				StartTime:    f.startTime.Time,
336				EndTime:      f.endTime.Time,
337			}
338		}
339
340		if n.id.Valid {
341			j.NextBuild = &atc.DashboardBuild{
342				ID:           int(n.id.Int64),
343				Name:         n.name.String,
344				JobName:      j.Name,
345				PipelineName: j.PipelineName,
346				TeamName:     j.TeamName,
347				Status:       n.status.String,
348				StartTime:    n.startTime.Time,
349				EndTime:      n.endTime.Time,
350			}
351		}
352
353		if t.id.Valid {
354			j.TransitionBuild = &atc.DashboardBuild{
355				ID:           int(t.id.Int64),
356				Name:         t.name.String,
357				JobName:      j.Name,
358				PipelineName: j.PipelineName,
359				TeamName:     j.TeamName,
360				Status:       t.status.String,
361				StartTime:    t.startTime.Time,
362				EndTime:      t.endTime.Time,
363			}
364		}
365
366		dashboard = append(dashboard, j)
367	}
368
369	return dashboard, nil
370}
371
372func (d dashboardFactory) fetchJobInputs() (map[int][]atc.DashboardJobInput, error) {
373	rows, err := psql.Select("j.id", "i.name", "r.name", "array_agg(jp.name ORDER BY jp.id)", "i.trigger").
374		From("job_inputs i").
375		Join("jobs j ON j.id = i.job_id").
376		Join("pipelines p ON p.id = j.pipeline_id").
377		Join("teams tm ON tm.id = p.team_id").
378		Join("resources r ON r.id = i.resource_id").
379		LeftJoin("jobs jp ON jp.id = i.passed_job_id").
380		Where(sq.Eq{
381			"j.active": true,
382		}).
383		Where(d.pred).
384		GroupBy("i.name, j.id, r.name, i.trigger").
385		OrderBy("j.id").
386		RunWith(d.tx).
387		Query()
388	if err != nil {
389		return nil, err
390	}
391
392	jobInputs := make(map[int][]atc.DashboardJobInput)
393	for rows.Next() {
394		var passedString []sql.NullString
395		var inputName, resourceName string
396		var jobID int
397		var trigger bool
398
399		err = rows.Scan(&jobID, &inputName, &resourceName, pq.Array(&passedString), &trigger)
400		if err != nil {
401			return nil, err
402		}
403
404		var passed []string
405		for _, s := range passedString {
406			if s.Valid {
407				passed = append(passed, s.String)
408			}
409		}
410
411		jobInputs[jobID] = append(jobInputs[jobID], atc.DashboardJobInput{
412			Name:     inputName,
413			Resource: resourceName,
414			Trigger:  trigger,
415			Passed:   passed,
416		})
417	}
418
419	return jobInputs, nil
420}
421
422func (d dashboardFactory) fetchJobOutputs() (map[int][]atc.JobOutput, error) {
423	rows, err := psql.Select("o.name", "r.name", "o.job_id").
424		From("job_outputs o").
425		Join("jobs j ON j.id = o.job_id").
426		Join("pipelines p ON p.id = j.pipeline_id").
427		Join("teams tm ON tm.id = p.team_id").
428		Join("resources r ON r.id = o.resource_id").
429		Where(d.pred).
430		Where(sq.Eq{
431			"j.active": true,
432		}).
433		OrderBy("j.id").
434		RunWith(d.tx).
435		Query()
436	if err != nil {
437		return nil, err
438	}
439
440	jobOutputs := make(map[int][]atc.JobOutput)
441	for rows.Next() {
442		var output atc.JobOutput
443		var jobID int
444
445		err = rows.Scan(&output.Name, &output.Resource, &jobID)
446		if err != nil {
447			return nil, err
448		}
449
450		jobOutputs[jobID] = append(jobOutputs[jobID], output)
451	}
452
453	return jobOutputs, err
454}
455
456func (d dashboardFactory) combineJobInputsAndOutputsWithDashboardJobs(dashboard atc.Dashboard, jobInputs map[int][]atc.DashboardJobInput, jobOutputs map[int][]atc.JobOutput) atc.Dashboard {
457	var finalDashboard atc.Dashboard
458	for _, job := range dashboard {
459		for _, input := range jobInputs[job.ID] {
460			job.Inputs = append(job.Inputs, input)
461		}
462
463		sort.Slice(job.Inputs, func(p, q int) bool {
464			return job.Inputs[p].Name < job.Inputs[q].Name
465		})
466
467		for _, output := range jobOutputs[job.ID] {
468			job.Outputs = append(job.Outputs, output)
469		}
470
471		sort.Slice(job.Outputs, func(p, q int) bool {
472			return job.Outputs[p].Name < job.Outputs[q].Name
473		})
474
475		finalDashboard = append(finalDashboard, job)
476	}
477
478	return finalDashboard
479}
480