1package db 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "time" 9 10 "code.cloudfoundry.org/lager" 11 12 sq "github.com/Masterminds/squirrel" 13 "github.com/lib/pq" 14 15 "github.com/concourse/concourse/atc" 16 "github.com/concourse/concourse/atc/creds" 17 "github.com/concourse/concourse/atc/db/lock" 18 "github.com/concourse/concourse/atc/event" 19) 20 21var ErrConfigComparisonFailed = errors.New("comparison with existing config failed during save") 22 23//go:generate counterfeiter . Team 24 25type Team interface { 26 ID() int 27 Name() string 28 Admin() bool 29 30 Auth() atc.TeamAuth 31 32 Delete() error 33 Rename(string) error 34 35 SavePipeline( 36 pipelineName string, 37 config atc.Config, 38 from ConfigVersion, 39 initiallyPaused bool, 40 ) (Pipeline, bool, error) 41 42 Pipeline(pipelineName string) (Pipeline, bool, error) 43 Pipelines() ([]Pipeline, error) 44 PublicPipelines() ([]Pipeline, error) 45 OrderPipelines([]string) error 46 47 CreateOneOffBuild() (Build, error) 48 CreateStartedBuild(plan atc.Plan) (Build, error) 49 50 PrivateAndPublicBuilds(Page) ([]Build, Pagination, error) 51 Builds(page Page) ([]Build, Pagination, error) 52 BuildsWithTime(page Page) ([]Build, Pagination, error) 53 54 SaveWorker(atcWorker atc.Worker, ttl time.Duration) (Worker, error) 55 Workers() ([]Worker, error) 56 FindVolumeForWorkerArtifact(int) (CreatedVolume, bool, error) 57 58 Containers() ([]Container, error) 59 IsCheckContainer(string) (bool, error) 60 IsContainerWithinTeam(string, bool) (bool, error) 61 62 FindContainerByHandle(string) (Container, bool, error) 63 FindCheckContainers(lager.Logger, string, string, creds.Secrets, creds.VarSourcePool) ([]Container, map[int]time.Time, error) 64 FindContainersByMetadata(ContainerMetadata) ([]Container, error) 65 FindCreatedContainerByHandle(string) (CreatedContainer, bool, error) 66 FindWorkerForContainer(handle string) (Worker, bool, error) 67 FindWorkerForVolume(handle string) (Worker, bool, error) 68 69 UpdateProviderAuth(auth atc.TeamAuth) error 70} 71 72type team struct { 73 id int 74 conn Conn 75 lockFactory lock.LockFactory 76 77 name string 78 admin bool 79 80 auth atc.TeamAuth 81} 82 83func (t *team) ID() int { return t.id } 84func (t *team) Name() string { return t.name } 85func (t *team) Admin() bool { return t.admin } 86 87func (t *team) Auth() atc.TeamAuth { return t.auth } 88 89func (t *team) Delete() error { 90 _, err := psql.Delete("teams"). 91 Where(sq.Eq{ 92 "name": t.name, 93 }). 94 RunWith(t.conn). 95 Exec() 96 97 return err 98} 99 100func (t *team) Rename(name string) error { 101 _, err := psql.Update("teams"). 102 Set("name", name). 103 Where(sq.Eq{ 104 "id": t.id, 105 }). 106 RunWith(t.conn). 107 Exec() 108 109 return err 110} 111 112func (t *team) Workers() ([]Worker, error) { 113 return getWorkers(t.conn, workersQuery.Where(sq.Or{ 114 sq.Eq{"t.id": t.id}, 115 sq.Eq{"w.team_id": nil}, 116 })) 117} 118 119func (t *team) FindVolumeForWorkerArtifact(artifactID int) (CreatedVolume, bool, error) { 120 tx, err := t.conn.Begin() 121 if err != nil { 122 return nil, false, err 123 } 124 125 defer Rollback(tx) 126 127 artifact, found, err := getWorkerArtifact(tx, t.conn, artifactID) 128 if err != nil { 129 return nil, false, err 130 } 131 132 err = tx.Commit() 133 if err != nil { 134 return nil, false, err 135 } 136 137 if !found { 138 return nil, false, nil 139 } 140 141 return artifact.Volume(t.ID()) 142} 143 144func (t *team) FindWorkerForContainer(handle string) (Worker, bool, error) { 145 return getWorker(t.conn, workersQuery.Join("containers c ON c.worker_name = w.name").Where(sq.And{ 146 sq.Eq{"c.handle": handle}, 147 })) 148} 149 150func (t *team) FindWorkerForVolume(handle string) (Worker, bool, error) { 151 return getWorker(t.conn, workersQuery.Join("volumes v ON v.worker_name = w.name").Where(sq.And{ 152 sq.Eq{"v.handle": handle}, 153 })) 154} 155 156func (t *team) Containers() ([]Container, error) { 157 rows, err := selectContainers("c"). 158 Join("workers w ON c.worker_name = w.name"). 159 Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id"). 160 Join("resources r ON r.resource_config_id = rccs.resource_config_id"). 161 Join("pipelines p ON p.id = r.pipeline_id"). 162 Where(sq.Eq{ 163 "p.team_id": t.id, 164 }). 165 Where(sq.Or{ 166 sq.Eq{ 167 "w.team_id": t.id, 168 }, sq.Eq{ 169 "w.team_id": nil, 170 }, 171 }). 172 Distinct(). 173 RunWith(t.conn). 174 Query() 175 if err != nil { 176 return nil, err 177 } 178 179 var containers []Container 180 containers, err = scanContainers(rows, t.conn, containers) 181 if err != nil { 182 return nil, err 183 } 184 185 rows, err = selectContainers("c"). 186 Join("workers w ON c.worker_name = w.name"). 187 Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id"). 188 Join("resource_types rt ON rt.resource_config_id = rccs.resource_config_id"). 189 Join("pipelines p ON p.id = rt.pipeline_id"). 190 Where(sq.Eq{ 191 "p.team_id": t.id, 192 }). 193 Where(sq.Or{ 194 sq.Eq{ 195 "w.team_id": t.id, 196 }, sq.Eq{ 197 "w.team_id": nil, 198 }, 199 }). 200 Distinct(). 201 RunWith(t.conn). 202 Query() 203 if err != nil { 204 return nil, err 205 } 206 207 containers, err = scanContainers(rows, t.conn, containers) 208 if err != nil { 209 return nil, err 210 } 211 212 rows, err = selectContainers("c"). 213 Where(sq.Eq{ 214 "c.team_id": t.id, 215 }). 216 RunWith(t.conn). 217 Query() 218 if err != nil { 219 return nil, err 220 } 221 222 containers, err = scanContainers(rows, t.conn, containers) 223 if err != nil { 224 return nil, err 225 } 226 227 return containers, nil 228} 229 230func (t *team) IsCheckContainer(handle string) (bool, error) { 231 var containerType string 232 err := psql.Select("meta_type"). 233 From("containers"). 234 Where(sq.Eq{ 235 "handle": handle, 236 }). 237 RunWith(t.conn). 238 QueryRow(). 239 Scan(&containerType) 240 if err != nil { 241 return false, err 242 } 243 244 return ContainerType(containerType) == ContainerTypeCheck, nil 245} 246 247func (t *team) IsContainerWithinTeam(handle string, isCheck bool) (bool, error) { 248 var ok int 249 var err error 250 251 if isCheck { 252 err = psql.Select("1"). 253 From("resources r"). 254 Join("pipelines p ON p.id = r.pipeline_id"). 255 Join("resource_configs rc ON rc.id = r.resource_config_id"). 256 Join("resource_config_check_sessions rccs ON rccs.resource_config_id = rc.id"). 257 Join("containers c ON rccs.id = c.resource_config_check_session_id"). 258 Where(sq.Eq{ 259 "c.handle": handle, 260 "p.team_id": t.id, 261 }). 262 RunWith(t.conn). 263 QueryRow(). 264 Scan(&ok) 265 } else { 266 err = psql.Select("1"). 267 From("containers c"). 268 Where(sq.Eq{ 269 "c.team_id": t.id, 270 "c.handle": handle, 271 }). 272 RunWith(t.conn). 273 QueryRow(). 274 Scan(&ok) 275 } 276 if err != nil { 277 if err == sql.ErrNoRows { 278 return false, nil 279 } 280 return false, err 281 } 282 283 return true, nil 284} 285 286func (t *team) FindContainerByHandle( 287 handle string, 288) (Container, bool, error) { 289 creatingContainer, createdContainer, err := t.findContainer(sq.Eq{"handle": handle}) 290 if err != nil { 291 return nil, false, err 292 } 293 294 if creatingContainer != nil { 295 return creatingContainer, true, nil 296 } 297 298 if createdContainer != nil { 299 return createdContainer, true, nil 300 } 301 302 return nil, false, nil 303} 304 305func (t *team) FindContainersByMetadata(metadata ContainerMetadata) ([]Container, error) { 306 eq := sq.Eq(metadata.SQLMap()) 307 eq["team_id"] = t.id 308 309 rows, err := selectContainers(). 310 Where(eq). 311 RunWith(t.conn). 312 Query() 313 if err != nil { 314 return nil, err 315 } 316 317 var containers []Container 318 319 containers, err = scanContainers(rows, t.conn, containers) 320 if err != nil { 321 return nil, err 322 } 323 324 return containers, nil 325} 326 327func (t *team) FindCreatedContainerByHandle( 328 handle string, 329) (CreatedContainer, bool, error) { 330 _, createdContainer, err := t.findContainer(sq.Eq{"handle": handle}) 331 if err != nil { 332 return nil, false, err 333 } 334 335 if createdContainer != nil { 336 return createdContainer, true, nil 337 } 338 339 return nil, false, nil 340} 341 342func savePipeline( 343 tx Tx, 344 pipelineName string, 345 config atc.Config, 346 from ConfigVersion, 347 initiallyPaused bool, 348 teamID int, 349 jobID sql.NullInt64, 350 buildID sql.NullInt64, 351) (int, bool, error) { 352 var existingConfig bool 353 err := tx.QueryRow(`SELECT EXISTS ( 354 SELECT 1 355 FROM pipelines 356 WHERE name = $1 357 AND team_id = $2 358 )`, pipelineName, teamID).Scan(&existingConfig) 359 if err != nil { 360 return 0, false, err 361 } 362 363 groupsPayload, err := json.Marshal(config.Groups) 364 if err != nil { 365 return 0, false, err 366 } 367 368 varSourcesPayload, err := json.Marshal(config.VarSources) 369 if err != nil { 370 return 0, false, err 371 } 372 373 encryptedVarSourcesPayload, nonce, err := tx.EncryptionStrategy().Encrypt(varSourcesPayload) 374 if err != nil { 375 return 0, false, err 376 } 377 378 displayPayload, err := json.Marshal(config.Display) 379 if err != nil { 380 return 0, false, err 381 } 382 383 var pipelineID int 384 if !existingConfig { 385 err = psql.Insert("pipelines"). 386 SetMap(map[string]interface{}{ 387 "name": pipelineName, 388 "groups": groupsPayload, 389 "var_sources": encryptedVarSourcesPayload, 390 "display": displayPayload, 391 "nonce": nonce, 392 "version": sq.Expr("nextval('config_version_seq')"), 393 "ordering": sq.Expr("currval('pipelines_id_seq')"), 394 "paused": initiallyPaused, 395 "last_updated": sq.Expr("now()"), 396 "team_id": teamID, 397 "parent_job_id": jobID, 398 "parent_build_id": buildID, 399 }). 400 Suffix("RETURNING id"). 401 RunWith(tx). 402 QueryRow().Scan(&pipelineID) 403 if err != nil { 404 return 0, false, err 405 } 406 } else { 407 408 q := psql.Update("pipelines"). 409 Set("archived", false). 410 Set("groups", groupsPayload). 411 Set("var_sources", encryptedVarSourcesPayload). 412 Set("display", displayPayload). 413 Set("nonce", nonce). 414 Set("version", sq.Expr("nextval('config_version_seq')")). 415 Set("last_updated", sq.Expr("now()")). 416 Set("parent_job_id", jobID). 417 Set("parent_build_id", buildID). 418 Where(sq.Eq{ 419 "name": pipelineName, 420 "version": from, 421 "team_id": teamID, 422 }) 423 424 if buildID.Valid { 425 q = q.Where(sq.Or{sq.Lt{"parent_build_id": buildID}, sq.Eq{"parent_build_id": nil}}) 426 } 427 428 err := q.Suffix("RETURNING id"). 429 RunWith(tx). 430 QueryRow(). 431 Scan(&pipelineID) 432 if err != nil { 433 if err == sql.ErrNoRows { 434 var currentParentBuildID sql.NullInt64 435 err = tx.QueryRow(` 436 SELECT parent_build_id 437 FROM pipelines 438 WHERE name = $1 439 AND team_id = $2 `, 440 pipelineName, teamID). 441 Scan(¤tParentBuildID) 442 if err != nil { 443 return 0, false, err 444 } 445 if currentParentBuildID.Valid && int(buildID.Int64) < int(currentParentBuildID.Int64) { 446 return 0, false, ErrSetByNewerBuild 447 } 448 return 0, false, ErrConfigComparisonFailed 449 } 450 451 return 0, false, err 452 } 453 454 err = resetDependentTableStates(tx, pipelineID) 455 if err != nil { 456 return 0, false, err 457 } 458 } 459 460 err = updateResourcesName(tx, config.Resources, pipelineID) 461 if err != nil { 462 return 0, false, err 463 } 464 465 resourceNameToID, err := saveResources(tx, config.Resources, pipelineID) 466 if err != nil { 467 return 0, false, err 468 } 469 470 _, err = psql.Update("resources"). 471 Set("resource_config_id", nil). 472 Where(sq.Eq{ 473 "pipeline_id": pipelineID, 474 "active": false, 475 }). 476 RunWith(tx). 477 Exec() 478 if err != nil { 479 return 0, false, err 480 } 481 482 err = saveResourceTypes(tx, config.ResourceTypes, pipelineID) 483 if err != nil { 484 return 0, false, err 485 } 486 487 err = updateJobsName(tx, config.Jobs, pipelineID) 488 if err != nil { 489 return 0, false, err 490 } 491 492 jobNameToID, err := saveJobsAndSerialGroups(tx, config.Jobs, config.Groups, pipelineID) 493 if err != nil { 494 return 0, false, err 495 } 496 497 err = removeUnusedWorkerTaskCaches(tx, pipelineID, config.Jobs) 498 if err != nil { 499 return 0, false, err 500 } 501 502 err = insertJobPipes(tx, config.Jobs, resourceNameToID, jobNameToID, pipelineID) 503 if err != nil { 504 return 0, false, err 505 } 506 507 err = requestScheduleForJobsInPipeline(tx, pipelineID) 508 if err != nil { 509 return 0, false, err 510 } 511 512 return pipelineID, !existingConfig, nil 513} 514 515func (t *team) SavePipeline( 516 pipelineName string, 517 config atc.Config, 518 from ConfigVersion, 519 initiallyPaused bool, 520) (Pipeline, bool, error) { 521 tx, err := t.conn.Begin() 522 if err != nil { 523 return nil, false, err 524 } 525 526 defer Rollback(tx) 527 528 nullID := sql.NullInt64{Valid: false} 529 pipelineID, isNewPipeline, err := savePipeline(tx, pipelineName, config, from, initiallyPaused, t.id, nullID, nullID) 530 if err != nil { 531 return nil, false, err 532 } 533 534 pipeline := newPipeline(t.conn, t.lockFactory) 535 536 err = scanPipeline( 537 pipeline, 538 pipelinesQuery. 539 Where(sq.Eq{"p.id": pipelineID}). 540 RunWith(tx). 541 QueryRow(), 542 ) 543 if err != nil { 544 return nil, false, err 545 } 546 547 err = tx.Commit() 548 if err != nil { 549 return nil, false, err 550 } 551 552 return pipeline, isNewPipeline, nil 553} 554 555func (t *team) Pipeline(pipelineName string) (Pipeline, bool, error) { 556 pipeline := newPipeline(t.conn, t.lockFactory) 557 558 err := scanPipeline( 559 pipeline, 560 pipelinesQuery. 561 Where(sq.Eq{ 562 "p.team_id": t.id, 563 "p.name": pipelineName, 564 }). 565 RunWith(t.conn). 566 QueryRow(), 567 ) 568 if err != nil { 569 if err == sql.ErrNoRows { 570 return nil, false, nil 571 } 572 return nil, false, err 573 } 574 575 return pipeline, true, nil 576} 577 578func (t *team) Pipelines() ([]Pipeline, error) { 579 rows, err := pipelinesQuery. 580 Where(sq.Eq{ 581 "team_id": t.id, 582 }). 583 OrderBy("ordering"). 584 RunWith(t.conn). 585 Query() 586 if err != nil { 587 return nil, err 588 } 589 590 pipelines, err := scanPipelines(t.conn, t.lockFactory, rows) 591 if err != nil { 592 return nil, err 593 } 594 595 return pipelines, nil 596} 597 598func (t *team) PublicPipelines() ([]Pipeline, error) { 599 rows, err := pipelinesQuery. 600 Where(sq.Eq{ 601 "team_id": t.id, 602 "public": true, 603 }). 604 OrderBy("t.name ASC", "ordering ASC"). 605 RunWith(t.conn). 606 Query() 607 if err != nil { 608 return nil, err 609 } 610 611 pipelines, err := scanPipelines(t.conn, t.lockFactory, rows) 612 if err != nil { 613 return nil, err 614 } 615 616 return pipelines, nil 617} 618 619func (t *team) OrderPipelines(pipelineNames []string) error { 620 tx, err := t.conn.Begin() 621 if err != nil { 622 return err 623 } 624 625 defer Rollback(tx) 626 627 for i, name := range pipelineNames { 628 pipelineUpdate, err := psql.Update("pipelines"). 629 Set("ordering", i). 630 Where(sq.Eq{ 631 "name": name, 632 "team_id": t.id, 633 }). 634 RunWith(tx). 635 Exec() 636 if err != nil { 637 return err 638 } 639 updatedPipelines, err := pipelineUpdate.RowsAffected() 640 if err != nil { 641 return err 642 } 643 if updatedPipelines == 0 { 644 return fmt.Errorf("pipeline %s does not exist", name) 645 } 646 } 647 648 return tx.Commit() 649} 650 651// XXX: This is only begin used by tests, replace all tests to CreateBuild on a job 652func (t *team) CreateOneOffBuild() (Build, error) { 653 tx, err := t.conn.Begin() 654 if err != nil { 655 return nil, err 656 } 657 658 defer Rollback(tx) 659 660 build := newEmptyBuild(t.conn, t.lockFactory) 661 err = createBuild(tx, build, map[string]interface{}{ 662 "name": sq.Expr("nextval('one_off_name')"), 663 "team_id": t.id, 664 "status": BuildStatusPending, 665 }) 666 if err != nil { 667 return nil, err 668 } 669 670 err = tx.Commit() 671 if err != nil { 672 return nil, err 673 } 674 675 return build, nil 676} 677 678func (t *team) CreateStartedBuild(plan atc.Plan) (Build, error) { 679 tx, err := t.conn.Begin() 680 if err != nil { 681 return nil, err 682 } 683 684 defer Rollback(tx) 685 686 metadata, err := json.Marshal(plan) 687 if err != nil { 688 return nil, err 689 } 690 691 encryptedPlan, nonce, err := t.conn.EncryptionStrategy().Encrypt(metadata) 692 if err != nil { 693 return nil, err 694 } 695 696 build := newEmptyBuild(t.conn, t.lockFactory) 697 err = createBuild(tx, build, map[string]interface{}{ 698 "name": sq.Expr("nextval('one_off_name')"), 699 "team_id": t.id, 700 "status": BuildStatusStarted, 701 "start_time": sq.Expr("now()"), 702 "schema": schema, 703 "private_plan": encryptedPlan, 704 "public_plan": plan.Public(), 705 "nonce": nonce, 706 }) 707 if err != nil { 708 return nil, err 709 } 710 711 err = build.saveEvent(tx, event.Status{ 712 Status: atc.StatusStarted, 713 Time: build.StartTime().Unix(), 714 }) 715 if err != nil { 716 return nil, err 717 } 718 719 err = tx.Commit() 720 if err != nil { 721 return nil, err 722 } 723 724 if err = t.conn.Bus().Notify(buildStartedChannel()); err != nil { 725 return nil, err 726 } 727 728 if err = t.conn.Bus().Notify(buildEventsChannel(build.id)); err != nil { 729 return nil, err 730 } 731 732 return build, nil 733} 734 735func (t *team) PrivateAndPublicBuilds(page Page) ([]Build, Pagination, error) { 736 newBuildsQuery := buildsQuery. 737 Where(sq.Or{sq.Eq{"p.public": true}, sq.Eq{"t.id": t.id}}) 738 739 return getBuildsWithPagination(newBuildsQuery, minMaxIdQuery, page, t.conn, t.lockFactory) 740} 741 742func (t *team) BuildsWithTime(page Page) ([]Build, Pagination, error) { 743 return getBuildsWithDates(buildsQuery.Where(sq.Eq{"t.id": t.id}), minMaxIdQuery, page, t.conn, t.lockFactory) 744} 745 746func (t *team) Builds(page Page) ([]Build, Pagination, error) { 747 return getBuildsWithPagination(buildsQuery.Where(sq.Eq{"t.id": t.id}), minMaxIdQuery, page, t.conn, t.lockFactory) 748} 749 750func (t *team) SaveWorker(atcWorker atc.Worker, ttl time.Duration) (Worker, error) { 751 tx, err := t.conn.Begin() 752 if err != nil { 753 return nil, err 754 } 755 756 defer Rollback(tx) 757 758 savedWorker, err := saveWorker(tx, atcWorker, &t.id, ttl, t.conn) 759 if err != nil { 760 return nil, err 761 } 762 763 err = tx.Commit() 764 if err != nil { 765 return nil, err 766 } 767 768 return savedWorker, nil 769} 770 771func (t *team) UpdateProviderAuth(auth atc.TeamAuth) error { 772 tx, err := t.conn.Begin() 773 if err != nil { 774 return err 775 } 776 defer Rollback(tx) 777 778 jsonEncodedProviderAuth, err := json.Marshal(auth) 779 if err != nil { 780 return err 781 } 782 783 query := ` 784 UPDATE teams 785 SET auth = $1, legacy_auth = NULL, nonce = NULL 786 WHERE id = $2 787 RETURNING id, name, admin, auth, nonce 788 ` 789 err = t.queryTeam(tx, query, jsonEncodedProviderAuth, t.id) 790 if err != nil { 791 return err 792 } 793 794 return tx.Commit() 795} 796 797func (t *team) FindCheckContainers(logger lager.Logger, pipelineName string, resourceName string, secretManager creds.Secrets, varSourcePool creds.VarSourcePool) ([]Container, map[int]time.Time, error) { 798 pipeline, found, err := t.Pipeline(pipelineName) 799 if err != nil { 800 return nil, nil, err 801 } 802 if !found { 803 return nil, nil, nil 804 } 805 806 resource, found, err := pipeline.Resource(resourceName) 807 if err != nil { 808 return nil, nil, err 809 } 810 if !found { 811 return nil, nil, nil 812 } 813 814 pipelineResourceTypes, err := pipeline.ResourceTypes() 815 if err != nil { 816 return nil, nil, err 817 } 818 819 variables, err := pipeline.Variables(logger, secretManager, varSourcePool) 820 if err != nil { 821 return nil, nil, err 822 } 823 824 versionedResourceTypes := pipelineResourceTypes.Deserialize() 825 826 source, err := creds.NewSource(variables, resource.Source()).Evaluate() 827 if err != nil { 828 return nil, nil, err 829 } 830 831 resourceTypes, err := creds.NewVersionedResourceTypes(variables, versionedResourceTypes).Evaluate() 832 if err != nil { 833 return nil, nil, err 834 } 835 836 resourceConfigFactory := NewResourceConfigFactory(t.conn, t.lockFactory) 837 resourceConfig, err := resourceConfigFactory.FindOrCreateResourceConfig( 838 resource.Type(), 839 source, 840 resourceTypes, 841 ) 842 if err != nil { 843 return nil, nil, err 844 } 845 846 rows, err := selectContainers("c"). 847 Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id"). 848 Where(sq.Eq{ 849 "rccs.resource_config_id": resourceConfig.ID(), 850 }). 851 Distinct(). 852 RunWith(t.conn). 853 Query() 854 if err != nil { 855 return nil, nil, err 856 } 857 858 var containers []Container 859 860 containers, err = scanContainers(rows, t.conn, containers) 861 if err != nil { 862 return nil, nil, err 863 } 864 865 rows, err = psql.Select("c.id", "rccs.expires_at"). 866 From("containers c"). 867 Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id"). 868 Where(sq.Eq{ 869 "rccs.resource_config_id": resourceConfig.ID(), 870 }). 871 Distinct(). 872 RunWith(t.conn). 873 Query() 874 if err != nil { 875 return nil, nil, err 876 } 877 878 defer Close(rows) 879 880 checkContainersExpiresAt := make(map[int]time.Time) 881 for rows.Next() { 882 var ( 883 id int 884 expiresAt pq.NullTime 885 ) 886 887 err = rows.Scan(&id, &expiresAt) 888 if err != nil { 889 return nil, nil, err 890 } 891 892 checkContainersExpiresAt[id] = expiresAt.Time 893 } 894 895 return containers, checkContainersExpiresAt, nil 896} 897 898type UpdateName struct { 899 OldName string 900 NewName string 901} 902 903func updateJobsName(tx Tx, jobs []atc.JobConfig, pipelineID int) error { 904 jobsToUpdate := []UpdateName{} 905 906 for _, job := range jobs { 907 if job.OldName != "" && job.OldName != job.Name { 908 var count int 909 err := psql.Select("COUNT(*) as count"). 910 From("jobs"). 911 Where(sq.Eq{ 912 "name": job.OldName, 913 "pipeline_id": pipelineID}). 914 RunWith(tx). 915 QueryRow(). 916 Scan(&count) 917 if err != nil { 918 return err 919 } 920 921 if count != 0 { 922 jobsToUpdate = append(jobsToUpdate, UpdateName{ 923 OldName: job.OldName, 924 NewName: job.Name, 925 }) 926 } 927 } 928 } 929 930 newMap := make(map[int]bool) 931 for _, updateNames := range jobsToUpdate { 932 isCyclic := checkCyclic(jobsToUpdate, updateNames.OldName, newMap) 933 if isCyclic { 934 return errors.New("job name swapping is not supported at this time") 935 } 936 } 937 938 jobsToUpdate = sortUpdateNames(jobsToUpdate) 939 940 for _, updateName := range jobsToUpdate { 941 _, err := psql.Delete("jobs"). 942 Where(sq.Eq{ 943 "name": updateName.NewName, 944 "pipeline_id": pipelineID, 945 "active": false}). 946 RunWith(tx). 947 Exec() 948 if err != nil { 949 return err 950 } 951 952 _, err = psql.Update("jobs"). 953 Set("name", updateName.NewName). 954 Where(sq.Eq{"name": updateName.OldName, "pipeline_id": pipelineID}). 955 RunWith(tx). 956 Exec() 957 if err != nil { 958 return err 959 } 960 } 961 962 return nil 963} 964 965func updateResourcesName(tx Tx, resources []atc.ResourceConfig, pipelineID int) error { 966 resourcesToUpdate := []UpdateName{} 967 968 for _, res := range resources { 969 if res.OldName != "" && res.OldName != res.Name { 970 var count int 971 err := psql.Select("COUNT(*) as count"). 972 From("resources"). 973 Where(sq.Eq{ 974 "name": res.OldName, 975 "pipeline_id": pipelineID}). 976 RunWith(tx). 977 QueryRow(). 978 Scan(&count) 979 if err != nil { 980 return err 981 } 982 983 if count != 0 { 984 resourcesToUpdate = append(resourcesToUpdate, UpdateName{ 985 OldName: res.OldName, 986 NewName: res.Name, 987 }) 988 } 989 } 990 } 991 992 newMap := make(map[int]bool) 993 for _, updateNames := range resourcesToUpdate { 994 isCyclic := checkCyclic(resourcesToUpdate, updateNames.OldName, newMap) 995 if isCyclic { 996 return errors.New("resource name swapping is not supported at this time") 997 } 998 } 999 1000 resourcesToUpdate = sortUpdateNames(resourcesToUpdate) 1001 1002 for _, updateName := range resourcesToUpdate { 1003 _, err := psql.Delete("resources"). 1004 Where(sq.Eq{ 1005 "name": updateName.NewName, 1006 "pipeline_id": pipelineID}). 1007 RunWith(tx). 1008 Exec() 1009 if err != nil { 1010 return err 1011 } 1012 1013 _, err = psql.Update("resources"). 1014 Set("name", updateName.NewName). 1015 Where(sq.Eq{"name": updateName.OldName, "pipeline_id": pipelineID}). 1016 RunWith(tx). 1017 Exec() 1018 if err != nil { 1019 return err 1020 } 1021 } 1022 1023 return nil 1024} 1025 1026func checkCyclic(updateNames []UpdateName, curr string, visited map[int]bool) bool { 1027 for i, updateName := range updateNames { 1028 if updateName.NewName == curr && !visited[i] { 1029 visited[i] = true 1030 checkCyclic(updateNames, updateName.OldName, visited) 1031 } else if updateName.NewName == curr && visited[i] && curr != updateName.OldName { 1032 return true 1033 } 1034 } 1035 1036 return false 1037} 1038 1039func sortUpdateNames(updateNames []UpdateName) []UpdateName { 1040 newMap := make(map[string]int) 1041 for i, updateName := range updateNames { 1042 newMap[updateName.NewName] = i + 1 1043 1044 if newMap[updateName.OldName] != 0 { 1045 index := newMap[updateName.OldName] - 1 1046 1047 tempName := updateNames[index] 1048 updateNames[index] = updateName 1049 updateNames[i] = tempName 1050 1051 return sortUpdateNames(updateNames) 1052 } 1053 } 1054 1055 return updateNames 1056} 1057 1058func saveJob(tx Tx, job atc.JobConfig, pipelineID int, groups []string) (int, error) { 1059 configPayload, err := json.Marshal(job) 1060 if err != nil { 1061 return 0, err 1062 } 1063 1064 es := tx.EncryptionStrategy() 1065 encryptedPayload, nonce, err := es.Encrypt(configPayload) 1066 if err != nil { 1067 return 0, err 1068 } 1069 1070 var jobID int 1071 err = psql.Insert("jobs"). 1072 Columns("name", "pipeline_id", "config", "public", "max_in_flight", "disable_manual_trigger", "interruptible", "active", "nonce", "tags"). 1073 Values(job.Name, pipelineID, encryptedPayload, job.Public, job.MaxInFlight(), job.DisableManualTrigger, job.Interruptible, true, nonce, pq.Array(groups)). 1074 Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, public = EXCLUDED.public, max_in_flight = EXCLUDED.max_in_flight, disable_manual_trigger = EXCLUDED.disable_manual_trigger, interruptible = EXCLUDED.interruptible, active = EXCLUDED.active, nonce = EXCLUDED.nonce, tags = EXCLUDED.tags"). 1075 Suffix("RETURNING id"). 1076 RunWith(tx). 1077 QueryRow(). 1078 Scan(&jobID) 1079 if err != nil { 1080 return 0, err 1081 } 1082 1083 return jobID, nil 1084} 1085 1086func registerSerialGroup(tx Tx, serialGroup string, jobID int) error { 1087 _, err := psql.Insert("jobs_serial_groups"). 1088 Columns("serial_group", "job_id"). 1089 Values(serialGroup, jobID). 1090 RunWith(tx). 1091 Exec() 1092 return err 1093} 1094 1095func saveResource(tx Tx, resource atc.ResourceConfig, pipelineID int) (int, error) { 1096 configPayload, err := json.Marshal(resource) 1097 if err != nil { 1098 return 0, err 1099 } 1100 1101 es := tx.EncryptionStrategy() 1102 encryptedPayload, nonce, err := es.Encrypt(configPayload) 1103 if err != nil { 1104 return 0, err 1105 } 1106 1107 var resourceID int 1108 err = psql.Insert("resources"). 1109 Columns("name", "pipeline_id", "config", "active", "nonce", "type"). 1110 Values(resource.Name, pipelineID, encryptedPayload, true, nonce, resource.Type). 1111 Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, active = EXCLUDED.active, nonce = EXCLUDED.nonce, type = EXCLUDED.type"). 1112 Suffix("RETURNING id"). 1113 RunWith(tx). 1114 QueryRow(). 1115 Scan(&resourceID) 1116 if err != nil { 1117 return 0, err 1118 } 1119 1120 _, err = psql.Delete("resource_pins"). 1121 Where(sq.Eq{ 1122 "resource_id": resourceID, 1123 "config": true, 1124 }). 1125 RunWith(tx). 1126 Exec() 1127 if err != nil { 1128 return 0, err 1129 } 1130 1131 if resource.Version != nil { 1132 version, err := json.Marshal(resource.Version) 1133 if err != nil { 1134 return 0, err 1135 } 1136 1137 _, err = psql.Insert("resource_pins"). 1138 Columns("resource_id", "version", "comment_text", "config"). 1139 Values(resourceID, version, "", true). 1140 Suffix("ON CONFLICT (resource_id) DO UPDATE SET version = EXCLUDED.version, comment_text = EXCLUDED.comment_text, config = true"). 1141 RunWith(tx). 1142 Exec() 1143 if err != nil { 1144 return 0, err 1145 } 1146 } 1147 1148 return resourceID, nil 1149} 1150 1151func saveResourceType(tx Tx, resourceType atc.ResourceType, pipelineID int) error { 1152 configPayload, err := json.Marshal(resourceType) 1153 if err != nil { 1154 return err 1155 } 1156 1157 es := tx.EncryptionStrategy() 1158 encryptedPayload, nonce, err := es.Encrypt(configPayload) 1159 if err != nil { 1160 return err 1161 } 1162 1163 _, err = psql.Insert("resource_types"). 1164 Columns("name", "pipeline_id", "config", "active", "nonce", "type"). 1165 Values(resourceType.Name, pipelineID, encryptedPayload, true, nonce, resourceType.Type). 1166 Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, active = EXCLUDED.active, nonce = EXCLUDED.nonce, type = EXCLUDED.type"). 1167 RunWith(tx). 1168 Exec() 1169 1170 return err 1171} 1172 1173func checkIfRowsUpdated(tx Tx, query string, params ...interface{}) (bool, error) { 1174 result, err := tx.Exec(query, params...) 1175 if err != nil { 1176 return false, err 1177 } 1178 1179 rows, err := result.RowsAffected() 1180 if err != nil { 1181 return false, err 1182 } 1183 1184 if rows == 0 { 1185 return false, nil 1186 } 1187 1188 return true, nil 1189} 1190 1191func swallowUniqueiolation(err error) error { 1192 if err != nil { 1193 if pgErr, ok := err.(*pq.Error); ok { 1194 if pgErr.Code.Class().Name() == "integrity_constraint_violation" { 1195 return nil 1196 } 1197 } 1198 1199 return err 1200 } 1201 1202 return nil 1203} 1204 1205func (t *team) findContainer(whereClause sq.Sqlizer) (CreatingContainer, CreatedContainer, error) { 1206 creating, created, destroying, _, err := scanContainer( 1207 selectContainers(). 1208 Where(whereClause). 1209 RunWith(t.conn). 1210 QueryRow(), 1211 t.conn, 1212 ) 1213 if err != nil { 1214 if err == sql.ErrNoRows { 1215 return nil, nil, nil 1216 } 1217 return nil, nil, err 1218 } 1219 1220 if destroying != nil { 1221 return nil, nil, nil 1222 } 1223 1224 return creating, created, nil 1225} 1226 1227func scanPipeline(p *pipeline, scan scannable) error { 1228 var ( 1229 groups sql.NullString 1230 varSources sql.NullString 1231 display sql.NullString 1232 nonce sql.NullString 1233 nonceStr *string 1234 lastUpdated pq.NullTime 1235 parentJobID sql.NullInt64 1236 parentBuildID sql.NullInt64 1237 ) 1238 err := scan.Scan(&p.id, &p.name, &groups, &varSources, &display, &nonce, &p.configVersion, &p.teamID, &p.teamName, &p.paused, &p.public, &p.archived, &lastUpdated, &parentJobID, &parentBuildID) 1239 if err != nil { 1240 return err 1241 } 1242 1243 p.lastUpdated = lastUpdated.Time 1244 p.parentJobID = int(parentJobID.Int64) 1245 p.parentBuildID = int(parentBuildID.Int64) 1246 1247 if groups.Valid { 1248 var pipelineGroups atc.GroupConfigs 1249 err = json.Unmarshal([]byte(groups.String), &pipelineGroups) 1250 if err != nil { 1251 return err 1252 } 1253 1254 p.groups = pipelineGroups 1255 } 1256 1257 if nonce.Valid { 1258 nonceStr = &nonce.String 1259 } 1260 1261 if display.Valid { 1262 var displayConfig *atc.DisplayConfig 1263 err = json.Unmarshal([]byte(display.String), &displayConfig) 1264 if err != nil { 1265 return err 1266 } 1267 1268 p.display = displayConfig 1269 } 1270 1271 if varSources.Valid { 1272 var pipelineVarSources atc.VarSourceConfigs 1273 decryptedVarSource, err := p.conn.EncryptionStrategy().Decrypt(varSources.String, nonceStr) 1274 if err != nil { 1275 return err 1276 } 1277 err = json.Unmarshal([]byte(decryptedVarSource), &pipelineVarSources) 1278 if err != nil { 1279 return err 1280 } 1281 1282 p.varSources = pipelineVarSources 1283 } 1284 1285 return nil 1286} 1287 1288func scanPipelines(conn Conn, lockFactory lock.LockFactory, rows *sql.Rows) ([]Pipeline, error) { 1289 defer Close(rows) 1290 1291 pipelines := []Pipeline{} 1292 1293 for rows.Next() { 1294 pipeline := newPipeline(conn, lockFactory) 1295 1296 err := scanPipeline(pipeline, rows) 1297 if err != nil { 1298 return nil, err 1299 } 1300 1301 pipelines = append(pipelines, pipeline) 1302 } 1303 1304 return pipelines, nil 1305} 1306 1307func scanContainers(rows *sql.Rows, conn Conn, initContainers []Container) ([]Container, error) { 1308 containers := initContainers 1309 1310 defer Close(rows) 1311 1312 for rows.Next() { 1313 creating, created, destroying, _, err := scanContainer(rows, conn) 1314 if err != nil { 1315 return []Container{}, err 1316 } 1317 1318 if creating != nil { 1319 containers = append(containers, creating) 1320 } 1321 1322 if created != nil { 1323 containers = append(containers, created) 1324 } 1325 1326 if destroying != nil { 1327 containers = append(containers, destroying) 1328 } 1329 } 1330 1331 return containers, nil 1332} 1333 1334func (t *team) queryTeam(tx Tx, query string, params ...interface{}) error { 1335 var providerAuth, nonce sql.NullString 1336 1337 err := tx.QueryRow(query, params...).Scan( 1338 &t.id, 1339 &t.name, 1340 &t.admin, 1341 &providerAuth, 1342 &nonce, 1343 ) 1344 if err != nil { 1345 return err 1346 } 1347 1348 if providerAuth.Valid { 1349 var auth atc.TeamAuth 1350 err = json.Unmarshal([]byte(providerAuth.String), &auth) 1351 if err != nil { 1352 return err 1353 } 1354 t.auth = auth 1355 } 1356 1357 return nil 1358} 1359 1360func resetDependentTableStates(tx Tx, pipelineID int) error { 1361 _, err := psql.Delete("jobs_serial_groups"). 1362 Where(sq.Expr(`job_id in ( 1363 SELECT j.id 1364 FROM jobs j 1365 WHERE j.pipeline_id = $1 1366 )`, pipelineID)). 1367 RunWith(tx). 1368 Exec() 1369 if err != nil { 1370 return err 1371 } 1372 1373 tableNames := []string{"jobs", "resources", "resource_types"} 1374 for _, table := range tableNames { 1375 err = inactivateTableForPipeline(tx, pipelineID, table) 1376 if err != nil { 1377 return err 1378 } 1379 } 1380 return err 1381} 1382 1383func inactivateTableForPipeline(tx Tx, pipelineID int, tableName string) error { 1384 _, err := psql.Update(tableName). 1385 Set("active", false). 1386 Where(sq.Eq{ 1387 "pipeline_id": pipelineID, 1388 }). 1389 RunWith(tx). 1390 Exec() 1391 return err 1392} 1393 1394func saveResources(tx Tx, resources atc.ResourceConfigs, pipelineID int) (map[string]int, error) { 1395 resourceNameToID := make(map[string]int) 1396 for _, resource := range resources { 1397 resourceID, err := saveResource(tx, resource, pipelineID) 1398 if err != nil { 1399 return nil, err 1400 } 1401 1402 resourceNameToID[resource.Name] = resourceID 1403 } 1404 1405 return resourceNameToID, nil 1406} 1407 1408func saveResourceTypes(tx Tx, resourceTypes atc.ResourceTypes, pipelineID int) error { 1409 for _, resourceType := range resourceTypes { 1410 err := saveResourceType(tx, resourceType, pipelineID) 1411 if err != nil { 1412 return err 1413 } 1414 } 1415 1416 return nil 1417} 1418 1419func saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.GroupConfigs, pipelineID int) (map[string]int, error) { 1420 jobGroups := make(map[string][]string) 1421 for _, group := range groups { 1422 for _, job := range group.Jobs { 1423 jobGroups[job] = append(jobGroups[job], group.Name) 1424 } 1425 } 1426 1427 jobNameToID := make(map[string]int) 1428 for _, job := range jobs { 1429 jobID, err := saveJob(tx, job, pipelineID, jobGroups[job.Name]) 1430 if err != nil { 1431 return nil, err 1432 } 1433 1434 jobNameToID[job.Name] = jobID 1435 1436 if len(job.SerialGroups) != 0 { 1437 for _, sg := range job.SerialGroups { 1438 err = registerSerialGroup(tx, sg, jobID) 1439 if err != nil { 1440 return nil, err 1441 } 1442 } 1443 } else { 1444 if job.Serial || job.RawMaxInFlight > 0 { 1445 err = registerSerialGroup(tx, job.Name, jobID) 1446 if err != nil { 1447 return nil, err 1448 } 1449 } 1450 } 1451 } 1452 1453 return jobNameToID, nil 1454} 1455 1456func insertJobPipes(tx Tx, jobConfigs atc.JobConfigs, resourceNameToID map[string]int, jobNameToID map[string]int, pipelineID int) error { 1457 _, err := psql.Delete("job_inputs"). 1458 Where(sq.Expr(`job_id in ( 1459 SELECT j.id 1460 FROM jobs j 1461 WHERE j.pipeline_id = $1 1462 )`, pipelineID)). 1463 RunWith(tx). 1464 Exec() 1465 if err != nil { 1466 return err 1467 } 1468 1469 _, err = psql.Delete("job_outputs"). 1470 Where(sq.Expr(`job_id in ( 1471 SELECT j.id 1472 FROM jobs j 1473 WHERE j.pipeline_id = $1 1474 )`, pipelineID)). 1475 RunWith(tx). 1476 Exec() 1477 if err != nil { 1478 return err 1479 } 1480 1481 for _, jobConfig := range jobConfigs { 1482 err := jobConfig.StepConfig().Visit(atc.StepRecursor{ 1483 OnGet: func(step *atc.GetStep) error { 1484 return insertJobInput(tx, step, jobConfig.Name, resourceNameToID, jobNameToID) 1485 }, 1486 OnPut: func(step *atc.PutStep) error { 1487 return insertJobOutput(tx, step, jobConfig.Name, resourceNameToID, jobNameToID) 1488 }, 1489 }) 1490 if err != nil { 1491 return err 1492 } 1493 } 1494 1495 return nil 1496} 1497 1498func insertJobInput(tx Tx, step *atc.GetStep, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error { 1499 if len(step.Passed) != 0 { 1500 for _, passedJob := range step.Passed { 1501 var version sql.NullString 1502 if step.Version != nil { 1503 versionJSON, err := step.Version.MarshalJSON() 1504 if err != nil { 1505 return err 1506 } 1507 1508 version = sql.NullString{Valid: true, String: string(versionJSON)} 1509 } 1510 1511 _, err := psql.Insert("job_inputs"). 1512 Columns("name", "job_id", "resource_id", "passed_job_id", "trigger", "version"). 1513 Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()], jobNameToID[passedJob], step.Trigger, version). 1514 RunWith(tx). 1515 Exec() 1516 if err != nil { 1517 return err 1518 } 1519 } 1520 } else { 1521 var version sql.NullString 1522 if step.Version != nil { 1523 versionJSON, err := step.Version.MarshalJSON() 1524 if err != nil { 1525 return err 1526 } 1527 1528 version = sql.NullString{Valid: true, String: string(versionJSON)} 1529 } 1530 1531 _, err := psql.Insert("job_inputs"). 1532 Columns("name", "job_id", "resource_id", "trigger", "version"). 1533 Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()], step.Trigger, version). 1534 RunWith(tx). 1535 Exec() 1536 if err != nil { 1537 return err 1538 } 1539 } 1540 1541 return nil 1542} 1543 1544func insertJobOutput(tx Tx, step *atc.PutStep, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error { 1545 _, err := psql.Insert("job_outputs"). 1546 Columns("name", "job_id", "resource_id"). 1547 Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()]). 1548 RunWith(tx). 1549 Exec() 1550 if err != nil { 1551 return err 1552 } 1553 1554 return nil 1555} 1556