1package db 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "sort" 7 8 sq "github.com/Masterminds/squirrel" 9 "github.com/concourse/concourse/atc" 10 "github.com/concourse/concourse/atc/db/lock" 11 "github.com/lib/pq" 12) 13 14//go:generate counterfeiter . JobFactory 15 16// XXX: This job factory object is not really a job factory anymore. It is 17// holding the responsibility for two very different things: constructing a 18// dashboard object and also a scheduler job object. Figure out what this is 19// trying to encapsulate or considering splitting this out! 20type JobFactory interface { 21 VisibleJobs([]string) (atc.Dashboard, error) 22 AllActiveJobs() (atc.Dashboard, error) 23 JobsToSchedule() (SchedulerJobs, error) 24} 25 26type jobFactory struct { 27 conn Conn 28 lockFactory lock.LockFactory 29} 30 31func NewJobFactory(conn Conn, lockFactory lock.LockFactory) JobFactory { 32 return &jobFactory{ 33 conn: conn, 34 lockFactory: lockFactory, 35 } 36} 37 38type SchedulerJobs []SchedulerJob 39 40type SchedulerJob struct { 41 Job 42 Resources SchedulerResources 43 ResourceTypes atc.VersionedResourceTypes 44} 45 46type SchedulerResources []SchedulerResource 47 48type SchedulerResource struct { 49 Name string 50 Type string 51 Source atc.Source 52} 53 54func (r *SchedulerResource) ApplySourceDefaults(resourceTypes atc.VersionedResourceTypes) { 55 parentType, found := resourceTypes.Lookup(r.Type) 56 if found { 57 r.Source = parentType.Defaults.Merge(r.Source) 58 } else { 59 defaults, found := atc.FindBaseResourceTypeDefaults(r.Type) 60 if found { 61 r.Source = defaults.Merge(r.Source) 62 } 63 } 64} 65 66func (resources SchedulerResources) Lookup(name string) (*SchedulerResource, bool) { 67 for _, resource := range resources { 68 if resource.Name == name { 69 return &resource, true 70 } 71 } 72 73 return nil, false 74} 75 76func (j *jobFactory) JobsToSchedule() (SchedulerJobs, error) { 77 tx, err := j.conn.Begin() 78 if err != nil { 79 return nil, err 80 } 81 82 defer tx.Rollback() 83 84 rows, err := jobsQuery. 85 Where(sq.Expr("j.schedule_requested > j.last_scheduled")). 86 Where(sq.Eq{ 87 "j.active": true, 88 "j.paused": false, 89 "p.paused": false, 90 }). 91 RunWith(tx). 92 Query() 93 if err != nil { 94 return nil, err 95 } 96 97 jobs, err := scanJobs(j.conn, j.lockFactory, rows) 98 if err != nil { 99 return nil, err 100 } 101 102 var schedulerJobs SchedulerJobs 103 pipelineResourceTypes := make(map[int]ResourceTypes) 104 for _, job := range jobs { 105 rows, err := tx.Query(`WITH inputs AS ( 106 SELECT ji.resource_id from job_inputs ji where ji.job_id = $1 107 UNION 108 SELECT jo.resource_id from job_outputs jo where jo.job_id = $1 109 ) 110 SELECT r.name, r.type, r.config, r.nonce 111 From resources r 112 Join inputs i on i.resource_id = r.id`, job.ID()) 113 if err != nil { 114 return nil, err 115 } 116 117 var schedulerResources SchedulerResources 118 for rows.Next() { 119 var name, type_ string 120 var configBlob []byte 121 var nonce sql.NullString 122 123 err = rows.Scan(&name, &type_, &configBlob, &nonce) 124 if err != nil { 125 return nil, err 126 } 127 128 defer Close(rows) 129 130 es := j.conn.EncryptionStrategy() 131 132 var noncense *string 133 if nonce.Valid { 134 noncense = &nonce.String 135 } 136 137 decryptedConfig, err := es.Decrypt(string(configBlob), noncense) 138 if err != nil { 139 return nil, err 140 } 141 142 var config atc.ResourceConfig 143 err = json.Unmarshal(decryptedConfig, &config) 144 if err != nil { 145 return nil, err 146 } 147 148 schedulerResources = append(schedulerResources, SchedulerResource{ 149 Name: name, 150 Type: type_, 151 Source: config.Source, 152 }) 153 } 154 155 var resourceTypes ResourceTypes 156 var found bool 157 resourceTypes, found = pipelineResourceTypes[job.PipelineID()] 158 if !found { 159 rows, err := resourceTypesQuery. 160 Where(sq.Eq{"r.pipeline_id": job.PipelineID()}). 161 OrderBy("r.name"). 162 RunWith(tx). 163 Query() 164 if err != nil { 165 return nil, err 166 } 167 168 defer Close(rows) 169 170 for rows.Next() { 171 resourceType := newEmptyResourceType(j.conn, j.lockFactory) 172 err := scanResourceType(resourceType, rows) 173 if err != nil { 174 return nil, err 175 } 176 177 resourceTypes = append(resourceTypes, resourceType) 178 } 179 180 pipelineResourceTypes[job.PipelineID()] = resourceTypes 181 } 182 183 schedulerJobs = append(schedulerJobs, SchedulerJob{ 184 Job: job, 185 Resources: schedulerResources, 186 ResourceTypes: resourceTypes.Deserialize(), 187 }) 188 } 189 190 err = tx.Commit() 191 if err != nil { 192 return nil, err 193 } 194 195 return schedulerJobs, nil 196} 197 198func (j *jobFactory) VisibleJobs(teamNames []string) (atc.Dashboard, error) { 199 tx, err := j.conn.Begin() 200 if err != nil { 201 return nil, err 202 } 203 204 defer Rollback(tx) 205 206 dashboardFactory := newDashboardFactory(tx, sq.Or{ 207 sq.Eq{"tm.name": teamNames}, 208 sq.Eq{"p.public": true}, 209 }) 210 211 dashboard, err := dashboardFactory.buildDashboard() 212 if err != nil { 213 return nil, err 214 } 215 216 err = tx.Commit() 217 if err != nil { 218 return nil, err 219 } 220 221 return dashboard, nil 222} 223 224func (j *jobFactory) AllActiveJobs() (atc.Dashboard, error) { 225 tx, err := j.conn.Begin() 226 if err != nil { 227 return nil, err 228 } 229 230 defer Rollback(tx) 231 232 dashboardFactory := newDashboardFactory(tx, nil) 233 dashboard, err := dashboardFactory.buildDashboard() 234 if err != nil { 235 return nil, err 236 } 237 238 err = tx.Commit() 239 if err != nil { 240 return nil, err 241 } 242 243 return dashboard, nil 244} 245 246type dashboardFactory struct { 247 // Constraints that are used by the dashboard queries. For example, a job ID 248 // constraint so that the dashboard will only return the job I have access to 249 // see. 250 pred interface{} 251 252 tx Tx 253} 254 255func newDashboardFactory(tx Tx, pred interface{}) dashboardFactory { 256 return dashboardFactory{ 257 pred: pred, 258 tx: tx, 259 } 260} 261 262func (d dashboardFactory) buildDashboard() (atc.Dashboard, error) { 263 dashboard, err := d.constructJobsForDashboard() 264 if err != nil { 265 return nil, err 266 } 267 268 jobInputs, err := d.fetchJobInputs() 269 if err != nil { 270 return nil, err 271 } 272 273 jobOutputs, err := d.fetchJobOutputs() 274 if err != nil { 275 return nil, err 276 } 277 278 return d.combineJobInputsAndOutputsWithDashboardJobs(dashboard, jobInputs, jobOutputs), nil 279} 280 281func (d dashboardFactory) constructJobsForDashboard() (atc.Dashboard, error) { 282 rows, err := psql.Select("j.id", "j.name", "p.name", "j.paused", "j.has_new_inputs", "j.tags", "tm.name", 283 "l.id", "l.name", "l.status", "l.start_time", "l.end_time", 284 "n.id", "n.name", "n.status", "n.start_time", "n.end_time", 285 "t.id", "t.name", "t.status", "t.start_time", "t.end_time"). 286 From("jobs j"). 287 Join("pipelines p ON j.pipeline_id = p.id"). 288 Join("teams tm ON p.team_id = tm.id"). 289 LeftJoin("builds l on j.latest_completed_build_id = l.id"). 290 LeftJoin("builds n on j.next_build_id = n.id"). 291 LeftJoin("builds t on j.transition_build_id = t.id"). 292 Where(sq.Eq{ 293 "j.active": true, 294 }). 295 Where(d.pred). 296 OrderBy("j.id ASC"). 297 RunWith(d.tx). 298 Query() 299 if err != nil { 300 return nil, err 301 } 302 303 type nullableBuild struct { 304 id sql.NullInt64 305 name sql.NullString 306 jobName sql.NullString 307 status sql.NullString 308 startTime pq.NullTime 309 endTime pq.NullTime 310 } 311 312 var dashboard atc.Dashboard 313 for rows.Next() { 314 var ( 315 f, n, t nullableBuild 316 ) 317 318 j := atc.DashboardJob{} 319 err = rows.Scan(&j.ID, &j.Name, &j.PipelineName, &j.Paused, &j.HasNewInputs, pq.Array(&j.Groups), &j.TeamName, 320 &f.id, &f.name, &f.status, &f.startTime, &f.endTime, 321 &n.id, &n.name, &n.status, &n.startTime, &n.endTime, 322 &t.id, &t.name, &t.status, &t.startTime, &t.endTime) 323 if err != nil { 324 return nil, err 325 } 326 327 if f.id.Valid { 328 j.FinishedBuild = &atc.DashboardBuild{ 329 ID: int(f.id.Int64), 330 Name: f.name.String, 331 JobName: j.Name, 332 PipelineName: j.PipelineName, 333 TeamName: j.TeamName, 334 Status: f.status.String, 335 StartTime: f.startTime.Time, 336 EndTime: f.endTime.Time, 337 } 338 } 339 340 if n.id.Valid { 341 j.NextBuild = &atc.DashboardBuild{ 342 ID: int(n.id.Int64), 343 Name: n.name.String, 344 JobName: j.Name, 345 PipelineName: j.PipelineName, 346 TeamName: j.TeamName, 347 Status: n.status.String, 348 StartTime: n.startTime.Time, 349 EndTime: n.endTime.Time, 350 } 351 } 352 353 if t.id.Valid { 354 j.TransitionBuild = &atc.DashboardBuild{ 355 ID: int(t.id.Int64), 356 Name: t.name.String, 357 JobName: j.Name, 358 PipelineName: j.PipelineName, 359 TeamName: j.TeamName, 360 Status: t.status.String, 361 StartTime: t.startTime.Time, 362 EndTime: t.endTime.Time, 363 } 364 } 365 366 dashboard = append(dashboard, j) 367 } 368 369 return dashboard, nil 370} 371 372func (d dashboardFactory) fetchJobInputs() (map[int][]atc.DashboardJobInput, error) { 373 rows, err := psql.Select("j.id", "i.name", "r.name", "array_agg(jp.name ORDER BY jp.id)", "i.trigger"). 374 From("job_inputs i"). 375 Join("jobs j ON j.id = i.job_id"). 376 Join("pipelines p ON p.id = j.pipeline_id"). 377 Join("teams tm ON tm.id = p.team_id"). 378 Join("resources r ON r.id = i.resource_id"). 379 LeftJoin("jobs jp ON jp.id = i.passed_job_id"). 380 Where(sq.Eq{ 381 "j.active": true, 382 }). 383 Where(d.pred). 384 GroupBy("i.name, j.id, r.name, i.trigger"). 385 OrderBy("j.id"). 386 RunWith(d.tx). 387 Query() 388 if err != nil { 389 return nil, err 390 } 391 392 jobInputs := make(map[int][]atc.DashboardJobInput) 393 for rows.Next() { 394 var passedString []sql.NullString 395 var inputName, resourceName string 396 var jobID int 397 var trigger bool 398 399 err = rows.Scan(&jobID, &inputName, &resourceName, pq.Array(&passedString), &trigger) 400 if err != nil { 401 return nil, err 402 } 403 404 var passed []string 405 for _, s := range passedString { 406 if s.Valid { 407 passed = append(passed, s.String) 408 } 409 } 410 411 jobInputs[jobID] = append(jobInputs[jobID], atc.DashboardJobInput{ 412 Name: inputName, 413 Resource: resourceName, 414 Trigger: trigger, 415 Passed: passed, 416 }) 417 } 418 419 return jobInputs, nil 420} 421 422func (d dashboardFactory) fetchJobOutputs() (map[int][]atc.JobOutput, error) { 423 rows, err := psql.Select("o.name", "r.name", "o.job_id"). 424 From("job_outputs o"). 425 Join("jobs j ON j.id = o.job_id"). 426 Join("pipelines p ON p.id = j.pipeline_id"). 427 Join("teams tm ON tm.id = p.team_id"). 428 Join("resources r ON r.id = o.resource_id"). 429 Where(d.pred). 430 Where(sq.Eq{ 431 "j.active": true, 432 }). 433 OrderBy("j.id"). 434 RunWith(d.tx). 435 Query() 436 if err != nil { 437 return nil, err 438 } 439 440 jobOutputs := make(map[int][]atc.JobOutput) 441 for rows.Next() { 442 var output atc.JobOutput 443 var jobID int 444 445 err = rows.Scan(&output.Name, &output.Resource, &jobID) 446 if err != nil { 447 return nil, err 448 } 449 450 jobOutputs[jobID] = append(jobOutputs[jobID], output) 451 } 452 453 return jobOutputs, err 454} 455 456func (d dashboardFactory) combineJobInputsAndOutputsWithDashboardJobs(dashboard atc.Dashboard, jobInputs map[int][]atc.DashboardJobInput, jobOutputs map[int][]atc.JobOutput) atc.Dashboard { 457 var finalDashboard atc.Dashboard 458 for _, job := range dashboard { 459 for _, input := range jobInputs[job.ID] { 460 job.Inputs = append(job.Inputs, input) 461 } 462 463 sort.Slice(job.Inputs, func(p, q int) bool { 464 return job.Inputs[p].Name < job.Inputs[q].Name 465 }) 466 467 for _, output := range jobOutputs[job.ID] { 468 job.Outputs = append(job.Outputs, output) 469 } 470 471 sort.Slice(job.Outputs, func(p, q int) bool { 472 return job.Outputs[p].Name < job.Outputs[q].Name 473 }) 474 475 finalDashboard = append(finalDashboard, job) 476 } 477 478 return finalDashboard 479} 480