1package db
2
3import (
4	"database/sql"
5	"encoding/json"
6	"errors"
7	"fmt"
8	"time"
9
10	"code.cloudfoundry.org/lager"
11
12	sq "github.com/Masterminds/squirrel"
13	"github.com/lib/pq"
14
15	"github.com/concourse/concourse/atc"
16	"github.com/concourse/concourse/atc/creds"
17	"github.com/concourse/concourse/atc/db/lock"
18	"github.com/concourse/concourse/atc/event"
19)
20
21var ErrConfigComparisonFailed = errors.New("comparison with existing config failed during save")
22
23//go:generate counterfeiter . Team
24
25type Team interface {
26	ID() int
27	Name() string
28	Admin() bool
29
30	Auth() atc.TeamAuth
31
32	Delete() error
33	Rename(string) error
34
35	SavePipeline(
36		pipelineName string,
37		config atc.Config,
38		from ConfigVersion,
39		initiallyPaused bool,
40	) (Pipeline, bool, error)
41
42	Pipeline(pipelineName string) (Pipeline, bool, error)
43	Pipelines() ([]Pipeline, error)
44	PublicPipelines() ([]Pipeline, error)
45	OrderPipelines([]string) error
46
47	CreateOneOffBuild() (Build, error)
48	CreateStartedBuild(plan atc.Plan) (Build, error)
49
50	PrivateAndPublicBuilds(Page) ([]Build, Pagination, error)
51	Builds(page Page) ([]Build, Pagination, error)
52	BuildsWithTime(page Page) ([]Build, Pagination, error)
53
54	SaveWorker(atcWorker atc.Worker, ttl time.Duration) (Worker, error)
55	Workers() ([]Worker, error)
56	FindVolumeForWorkerArtifact(int) (CreatedVolume, bool, error)
57
58	Containers() ([]Container, error)
59	IsCheckContainer(string) (bool, error)
60	IsContainerWithinTeam(string, bool) (bool, error)
61
62	FindContainerByHandle(string) (Container, bool, error)
63	FindCheckContainers(lager.Logger, string, string, creds.Secrets, creds.VarSourcePool) ([]Container, map[int]time.Time, error)
64	FindContainersByMetadata(ContainerMetadata) ([]Container, error)
65	FindCreatedContainerByHandle(string) (CreatedContainer, bool, error)
66	FindWorkerForContainer(handle string) (Worker, bool, error)
67	FindWorkerForVolume(handle string) (Worker, bool, error)
68
69	UpdateProviderAuth(auth atc.TeamAuth) error
70}
71
72type team struct {
73	id          int
74	conn        Conn
75	lockFactory lock.LockFactory
76
77	name  string
78	admin bool
79
80	auth atc.TeamAuth
81}
82
83func (t *team) ID() int      { return t.id }
84func (t *team) Name() string { return t.name }
85func (t *team) Admin() bool  { return t.admin }
86
87func (t *team) Auth() atc.TeamAuth { return t.auth }
88
89func (t *team) Delete() error {
90	_, err := psql.Delete("teams").
91		Where(sq.Eq{
92			"name": t.name,
93		}).
94		RunWith(t.conn).
95		Exec()
96
97	return err
98}
99
100func (t *team) Rename(name string) error {
101	_, err := psql.Update("teams").
102		Set("name", name).
103		Where(sq.Eq{
104			"id": t.id,
105		}).
106		RunWith(t.conn).
107		Exec()
108
109	return err
110}
111
112func (t *team) Workers() ([]Worker, error) {
113	return getWorkers(t.conn, workersQuery.Where(sq.Or{
114		sq.Eq{"t.id": t.id},
115		sq.Eq{"w.team_id": nil},
116	}))
117}
118
119func (t *team) FindVolumeForWorkerArtifact(artifactID int) (CreatedVolume, bool, error) {
120	tx, err := t.conn.Begin()
121	if err != nil {
122		return nil, false, err
123	}
124
125	defer Rollback(tx)
126
127	artifact, found, err := getWorkerArtifact(tx, t.conn, artifactID)
128	if err != nil {
129		return nil, false, err
130	}
131
132	err = tx.Commit()
133	if err != nil {
134		return nil, false, err
135	}
136
137	if !found {
138		return nil, false, nil
139	}
140
141	return artifact.Volume(t.ID())
142}
143
144func (t *team) FindWorkerForContainer(handle string) (Worker, bool, error) {
145	return getWorker(t.conn, workersQuery.Join("containers c ON c.worker_name = w.name").Where(sq.And{
146		sq.Eq{"c.handle": handle},
147	}))
148}
149
150func (t *team) FindWorkerForVolume(handle string) (Worker, bool, error) {
151	return getWorker(t.conn, workersQuery.Join("volumes v ON v.worker_name = w.name").Where(sq.And{
152		sq.Eq{"v.handle": handle},
153	}))
154}
155
156func (t *team) Containers() ([]Container, error) {
157	rows, err := selectContainers("c").
158		Join("workers w ON c.worker_name = w.name").
159		Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id").
160		Join("resources r ON r.resource_config_id = rccs.resource_config_id").
161		Join("pipelines p ON p.id = r.pipeline_id").
162		Where(sq.Eq{
163			"p.team_id": t.id,
164		}).
165		Where(sq.Or{
166			sq.Eq{
167				"w.team_id": t.id,
168			}, sq.Eq{
169				"w.team_id": nil,
170			},
171		}).
172		Distinct().
173		RunWith(t.conn).
174		Query()
175	if err != nil {
176		return nil, err
177	}
178
179	var containers []Container
180	containers, err = scanContainers(rows, t.conn, containers)
181	if err != nil {
182		return nil, err
183	}
184
185	rows, err = selectContainers("c").
186		Join("workers w ON c.worker_name = w.name").
187		Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id").
188		Join("resource_types rt ON rt.resource_config_id = rccs.resource_config_id").
189		Join("pipelines p ON p.id = rt.pipeline_id").
190		Where(sq.Eq{
191			"p.team_id": t.id,
192		}).
193		Where(sq.Or{
194			sq.Eq{
195				"w.team_id": t.id,
196			}, sq.Eq{
197				"w.team_id": nil,
198			},
199		}).
200		Distinct().
201		RunWith(t.conn).
202		Query()
203	if err != nil {
204		return nil, err
205	}
206
207	containers, err = scanContainers(rows, t.conn, containers)
208	if err != nil {
209		return nil, err
210	}
211
212	rows, err = selectContainers("c").
213		Where(sq.Eq{
214			"c.team_id": t.id,
215		}).
216		RunWith(t.conn).
217		Query()
218	if err != nil {
219		return nil, err
220	}
221
222	containers, err = scanContainers(rows, t.conn, containers)
223	if err != nil {
224		return nil, err
225	}
226
227	return containers, nil
228}
229
230func (t *team) IsCheckContainer(handle string) (bool, error) {
231	var containerType string
232	err := psql.Select("meta_type").
233		From("containers").
234		Where(sq.Eq{
235			"handle": handle,
236		}).
237		RunWith(t.conn).
238		QueryRow().
239		Scan(&containerType)
240	if err != nil {
241		return false, err
242	}
243
244	return ContainerType(containerType) == ContainerTypeCheck, nil
245}
246
247func (t *team) IsContainerWithinTeam(handle string, isCheck bool) (bool, error) {
248	var ok int
249	var err error
250
251	if isCheck {
252		err = psql.Select("1").
253			From("resources r").
254			Join("pipelines p ON p.id = r.pipeline_id").
255			Join("resource_configs rc ON rc.id = r.resource_config_id").
256			Join("resource_config_check_sessions rccs ON rccs.resource_config_id = rc.id").
257			Join("containers c ON rccs.id = c.resource_config_check_session_id").
258			Where(sq.Eq{
259				"c.handle":  handle,
260				"p.team_id": t.id,
261			}).
262			RunWith(t.conn).
263			QueryRow().
264			Scan(&ok)
265	} else {
266		err = psql.Select("1").
267			From("containers c").
268			Where(sq.Eq{
269				"c.team_id": t.id,
270				"c.handle":  handle,
271			}).
272			RunWith(t.conn).
273			QueryRow().
274			Scan(&ok)
275	}
276	if err != nil {
277		if err == sql.ErrNoRows {
278			return false, nil
279		}
280		return false, err
281	}
282
283	return true, nil
284}
285
286func (t *team) FindContainerByHandle(
287	handle string,
288) (Container, bool, error) {
289	creatingContainer, createdContainer, err := t.findContainer(sq.Eq{"handle": handle})
290	if err != nil {
291		return nil, false, err
292	}
293
294	if creatingContainer != nil {
295		return creatingContainer, true, nil
296	}
297
298	if createdContainer != nil {
299		return createdContainer, true, nil
300	}
301
302	return nil, false, nil
303}
304
305func (t *team) FindContainersByMetadata(metadata ContainerMetadata) ([]Container, error) {
306	eq := sq.Eq(metadata.SQLMap())
307	eq["team_id"] = t.id
308
309	rows, err := selectContainers().
310		Where(eq).
311		RunWith(t.conn).
312		Query()
313	if err != nil {
314		return nil, err
315	}
316
317	var containers []Container
318
319	containers, err = scanContainers(rows, t.conn, containers)
320	if err != nil {
321		return nil, err
322	}
323
324	return containers, nil
325}
326
327func (t *team) FindCreatedContainerByHandle(
328	handle string,
329) (CreatedContainer, bool, error) {
330	_, createdContainer, err := t.findContainer(sq.Eq{"handle": handle})
331	if err != nil {
332		return nil, false, err
333	}
334
335	if createdContainer != nil {
336		return createdContainer, true, nil
337	}
338
339	return nil, false, nil
340}
341
342func savePipeline(
343	tx Tx,
344	pipelineName string,
345	config atc.Config,
346	from ConfigVersion,
347	initiallyPaused bool,
348	teamID int,
349	jobID sql.NullInt64,
350	buildID sql.NullInt64,
351) (int, bool, error) {
352	var existingConfig bool
353	err := tx.QueryRow(`SELECT EXISTS (
354		SELECT 1
355		FROM pipelines
356		WHERE name = $1
357		AND team_id = $2
358	)`, pipelineName, teamID).Scan(&existingConfig)
359	if err != nil {
360		return 0, false, err
361	}
362
363	groupsPayload, err := json.Marshal(config.Groups)
364	if err != nil {
365		return 0, false, err
366	}
367
368	varSourcesPayload, err := json.Marshal(config.VarSources)
369	if err != nil {
370		return 0, false, err
371	}
372
373	encryptedVarSourcesPayload, nonce, err := tx.EncryptionStrategy().Encrypt(varSourcesPayload)
374	if err != nil {
375		return 0, false, err
376	}
377
378	displayPayload, err := json.Marshal(config.Display)
379	if err != nil {
380		return 0, false, err
381	}
382
383	var pipelineID int
384	if !existingConfig {
385		err = psql.Insert("pipelines").
386			SetMap(map[string]interface{}{
387				"name":            pipelineName,
388				"groups":          groupsPayload,
389				"var_sources":     encryptedVarSourcesPayload,
390				"display":         displayPayload,
391				"nonce":           nonce,
392				"version":         sq.Expr("nextval('config_version_seq')"),
393				"ordering":        sq.Expr("currval('pipelines_id_seq')"),
394				"paused":          initiallyPaused,
395				"last_updated":    sq.Expr("now()"),
396				"team_id":         teamID,
397				"parent_job_id":   jobID,
398				"parent_build_id": buildID,
399			}).
400			Suffix("RETURNING id").
401			RunWith(tx).
402			QueryRow().Scan(&pipelineID)
403		if err != nil {
404			return 0, false, err
405		}
406	} else {
407
408		q := psql.Update("pipelines").
409			Set("archived", false).
410			Set("groups", groupsPayload).
411			Set("var_sources", encryptedVarSourcesPayload).
412			Set("display", displayPayload).
413			Set("nonce", nonce).
414			Set("version", sq.Expr("nextval('config_version_seq')")).
415			Set("last_updated", sq.Expr("now()")).
416			Set("parent_job_id", jobID).
417			Set("parent_build_id", buildID).
418			Where(sq.Eq{
419				"name":    pipelineName,
420				"version": from,
421				"team_id": teamID,
422			})
423
424		if buildID.Valid {
425			q = q.Where(sq.Or{sq.Lt{"parent_build_id": buildID}, sq.Eq{"parent_build_id": nil}})
426		}
427
428		err := q.Suffix("RETURNING id").
429			RunWith(tx).
430			QueryRow().
431			Scan(&pipelineID)
432		if err != nil {
433			if err == sql.ErrNoRows {
434				var currentParentBuildID sql.NullInt64
435				err = tx.QueryRow(`
436					SELECT parent_build_id
437					FROM pipelines
438					WHERE name = $1
439					AND team_id = $2 `,
440					pipelineName, teamID).
441					Scan(&currentParentBuildID)
442				if err != nil {
443					return 0, false, err
444				}
445				if currentParentBuildID.Valid && int(buildID.Int64) < int(currentParentBuildID.Int64) {
446					return 0, false, ErrSetByNewerBuild
447				}
448				return 0, false, ErrConfigComparisonFailed
449			}
450
451			return 0, false, err
452		}
453
454		err = resetDependentTableStates(tx, pipelineID)
455		if err != nil {
456			return 0, false, err
457		}
458	}
459
460	err = updateResourcesName(tx, config.Resources, pipelineID)
461	if err != nil {
462		return 0, false, err
463	}
464
465	resourceNameToID, err := saveResources(tx, config.Resources, pipelineID)
466	if err != nil {
467		return 0, false, err
468	}
469
470	_, err = psql.Update("resources").
471		Set("resource_config_id", nil).
472		Where(sq.Eq{
473			"pipeline_id": pipelineID,
474			"active":      false,
475		}).
476		RunWith(tx).
477		Exec()
478	if err != nil {
479		return 0, false, err
480	}
481
482	err = saveResourceTypes(tx, config.ResourceTypes, pipelineID)
483	if err != nil {
484		return 0, false, err
485	}
486
487	err = updateJobsName(tx, config.Jobs, pipelineID)
488	if err != nil {
489		return 0, false, err
490	}
491
492	jobNameToID, err := saveJobsAndSerialGroups(tx, config.Jobs, config.Groups, pipelineID)
493	if err != nil {
494		return 0, false, err
495	}
496
497	err = removeUnusedWorkerTaskCaches(tx, pipelineID, config.Jobs)
498	if err != nil {
499		return 0, false, err
500	}
501
502	err = insertJobPipes(tx, config.Jobs, resourceNameToID, jobNameToID, pipelineID)
503	if err != nil {
504		return 0, false, err
505	}
506
507	err = requestScheduleForJobsInPipeline(tx, pipelineID)
508	if err != nil {
509		return 0, false, err
510	}
511
512	return pipelineID, !existingConfig, nil
513}
514
515func (t *team) SavePipeline(
516	pipelineName string,
517	config atc.Config,
518	from ConfigVersion,
519	initiallyPaused bool,
520) (Pipeline, bool, error) {
521	tx, err := t.conn.Begin()
522	if err != nil {
523		return nil, false, err
524	}
525
526	defer Rollback(tx)
527
528	nullID := sql.NullInt64{Valid: false}
529	pipelineID, isNewPipeline, err := savePipeline(tx, pipelineName, config, from, initiallyPaused, t.id, nullID, nullID)
530	if err != nil {
531		return nil, false, err
532	}
533
534	pipeline := newPipeline(t.conn, t.lockFactory)
535
536	err = scanPipeline(
537		pipeline,
538		pipelinesQuery.
539			Where(sq.Eq{"p.id": pipelineID}).
540			RunWith(tx).
541			QueryRow(),
542	)
543	if err != nil {
544		return nil, false, err
545	}
546
547	err = tx.Commit()
548	if err != nil {
549		return nil, false, err
550	}
551
552	return pipeline, isNewPipeline, nil
553}
554
555func (t *team) Pipeline(pipelineName string) (Pipeline, bool, error) {
556	pipeline := newPipeline(t.conn, t.lockFactory)
557
558	err := scanPipeline(
559		pipeline,
560		pipelinesQuery.
561			Where(sq.Eq{
562				"p.team_id": t.id,
563				"p.name":    pipelineName,
564			}).
565			RunWith(t.conn).
566			QueryRow(),
567	)
568	if err != nil {
569		if err == sql.ErrNoRows {
570			return nil, false, nil
571		}
572		return nil, false, err
573	}
574
575	return pipeline, true, nil
576}
577
578func (t *team) Pipelines() ([]Pipeline, error) {
579	rows, err := pipelinesQuery.
580		Where(sq.Eq{
581			"team_id": t.id,
582		}).
583		OrderBy("ordering").
584		RunWith(t.conn).
585		Query()
586	if err != nil {
587		return nil, err
588	}
589
590	pipelines, err := scanPipelines(t.conn, t.lockFactory, rows)
591	if err != nil {
592		return nil, err
593	}
594
595	return pipelines, nil
596}
597
598func (t *team) PublicPipelines() ([]Pipeline, error) {
599	rows, err := pipelinesQuery.
600		Where(sq.Eq{
601			"team_id": t.id,
602			"public":  true,
603		}).
604		OrderBy("t.name ASC", "ordering ASC").
605		RunWith(t.conn).
606		Query()
607	if err != nil {
608		return nil, err
609	}
610
611	pipelines, err := scanPipelines(t.conn, t.lockFactory, rows)
612	if err != nil {
613		return nil, err
614	}
615
616	return pipelines, nil
617}
618
619func (t *team) OrderPipelines(pipelineNames []string) error {
620	tx, err := t.conn.Begin()
621	if err != nil {
622		return err
623	}
624
625	defer Rollback(tx)
626
627	for i, name := range pipelineNames {
628		pipelineUpdate, err := psql.Update("pipelines").
629			Set("ordering", i).
630			Where(sq.Eq{
631				"name":    name,
632				"team_id": t.id,
633			}).
634			RunWith(tx).
635			Exec()
636		if err != nil {
637			return err
638		}
639		updatedPipelines, err := pipelineUpdate.RowsAffected()
640		if err != nil {
641			return err
642		}
643		if updatedPipelines == 0 {
644			return fmt.Errorf("pipeline %s does not exist", name)
645		}
646	}
647
648	return tx.Commit()
649}
650
651// XXX: This is only begin used by tests, replace all tests to CreateBuild on a job
652func (t *team) CreateOneOffBuild() (Build, error) {
653	tx, err := t.conn.Begin()
654	if err != nil {
655		return nil, err
656	}
657
658	defer Rollback(tx)
659
660	build := newEmptyBuild(t.conn, t.lockFactory)
661	err = createBuild(tx, build, map[string]interface{}{
662		"name":    sq.Expr("nextval('one_off_name')"),
663		"team_id": t.id,
664		"status":  BuildStatusPending,
665	})
666	if err != nil {
667		return nil, err
668	}
669
670	err = tx.Commit()
671	if err != nil {
672		return nil, err
673	}
674
675	return build, nil
676}
677
678func (t *team) CreateStartedBuild(plan atc.Plan) (Build, error) {
679	tx, err := t.conn.Begin()
680	if err != nil {
681		return nil, err
682	}
683
684	defer Rollback(tx)
685
686	metadata, err := json.Marshal(plan)
687	if err != nil {
688		return nil, err
689	}
690
691	encryptedPlan, nonce, err := t.conn.EncryptionStrategy().Encrypt(metadata)
692	if err != nil {
693		return nil, err
694	}
695
696	build := newEmptyBuild(t.conn, t.lockFactory)
697	err = createBuild(tx, build, map[string]interface{}{
698		"name":         sq.Expr("nextval('one_off_name')"),
699		"team_id":      t.id,
700		"status":       BuildStatusStarted,
701		"start_time":   sq.Expr("now()"),
702		"schema":       schema,
703		"private_plan": encryptedPlan,
704		"public_plan":  plan.Public(),
705		"nonce":        nonce,
706	})
707	if err != nil {
708		return nil, err
709	}
710
711	err = build.saveEvent(tx, event.Status{
712		Status: atc.StatusStarted,
713		Time:   build.StartTime().Unix(),
714	})
715	if err != nil {
716		return nil, err
717	}
718
719	err = tx.Commit()
720	if err != nil {
721		return nil, err
722	}
723
724	if err = t.conn.Bus().Notify(buildStartedChannel()); err != nil {
725		return nil, err
726	}
727
728	if err = t.conn.Bus().Notify(buildEventsChannel(build.id)); err != nil {
729		return nil, err
730	}
731
732	return build, nil
733}
734
735func (t *team) PrivateAndPublicBuilds(page Page) ([]Build, Pagination, error) {
736	newBuildsQuery := buildsQuery.
737		Where(sq.Or{sq.Eq{"p.public": true}, sq.Eq{"t.id": t.id}})
738
739	return getBuildsWithPagination(newBuildsQuery, minMaxIdQuery, page, t.conn, t.lockFactory)
740}
741
742func (t *team) BuildsWithTime(page Page) ([]Build, Pagination, error) {
743	return getBuildsWithDates(buildsQuery.Where(sq.Eq{"t.id": t.id}), minMaxIdQuery, page, t.conn, t.lockFactory)
744}
745
746func (t *team) Builds(page Page) ([]Build, Pagination, error) {
747	return getBuildsWithPagination(buildsQuery.Where(sq.Eq{"t.id": t.id}), minMaxIdQuery, page, t.conn, t.lockFactory)
748}
749
750func (t *team) SaveWorker(atcWorker atc.Worker, ttl time.Duration) (Worker, error) {
751	tx, err := t.conn.Begin()
752	if err != nil {
753		return nil, err
754	}
755
756	defer Rollback(tx)
757
758	savedWorker, err := saveWorker(tx, atcWorker, &t.id, ttl, t.conn)
759	if err != nil {
760		return nil, err
761	}
762
763	err = tx.Commit()
764	if err != nil {
765		return nil, err
766	}
767
768	return savedWorker, nil
769}
770
771func (t *team) UpdateProviderAuth(auth atc.TeamAuth) error {
772	tx, err := t.conn.Begin()
773	if err != nil {
774		return err
775	}
776	defer Rollback(tx)
777
778	jsonEncodedProviderAuth, err := json.Marshal(auth)
779	if err != nil {
780		return err
781	}
782
783	query := `
784		UPDATE teams
785		SET auth = $1, legacy_auth = NULL, nonce = NULL
786		WHERE id = $2
787		RETURNING id, name, admin, auth, nonce
788	`
789	err = t.queryTeam(tx, query, jsonEncodedProviderAuth, t.id)
790	if err != nil {
791		return err
792	}
793
794	return tx.Commit()
795}
796
797func (t *team) FindCheckContainers(logger lager.Logger, pipelineName string, resourceName string, secretManager creds.Secrets, varSourcePool creds.VarSourcePool) ([]Container, map[int]time.Time, error) {
798	pipeline, found, err := t.Pipeline(pipelineName)
799	if err != nil {
800		return nil, nil, err
801	}
802	if !found {
803		return nil, nil, nil
804	}
805
806	resource, found, err := pipeline.Resource(resourceName)
807	if err != nil {
808		return nil, nil, err
809	}
810	if !found {
811		return nil, nil, nil
812	}
813
814	pipelineResourceTypes, err := pipeline.ResourceTypes()
815	if err != nil {
816		return nil, nil, err
817	}
818
819	variables, err := pipeline.Variables(logger, secretManager, varSourcePool)
820	if err != nil {
821		return nil, nil, err
822	}
823
824	versionedResourceTypes := pipelineResourceTypes.Deserialize()
825
826	source, err := creds.NewSource(variables, resource.Source()).Evaluate()
827	if err != nil {
828		return nil, nil, err
829	}
830
831	resourceTypes, err := creds.NewVersionedResourceTypes(variables, versionedResourceTypes).Evaluate()
832	if err != nil {
833		return nil, nil, err
834	}
835
836	resourceConfigFactory := NewResourceConfigFactory(t.conn, t.lockFactory)
837	resourceConfig, err := resourceConfigFactory.FindOrCreateResourceConfig(
838		resource.Type(),
839		source,
840		resourceTypes,
841	)
842	if err != nil {
843		return nil, nil, err
844	}
845
846	rows, err := selectContainers("c").
847		Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id").
848		Where(sq.Eq{
849			"rccs.resource_config_id": resourceConfig.ID(),
850		}).
851		Distinct().
852		RunWith(t.conn).
853		Query()
854	if err != nil {
855		return nil, nil, err
856	}
857
858	var containers []Container
859
860	containers, err = scanContainers(rows, t.conn, containers)
861	if err != nil {
862		return nil, nil, err
863	}
864
865	rows, err = psql.Select("c.id", "rccs.expires_at").
866		From("containers c").
867		Join("resource_config_check_sessions rccs ON rccs.id = c.resource_config_check_session_id").
868		Where(sq.Eq{
869			"rccs.resource_config_id": resourceConfig.ID(),
870		}).
871		Distinct().
872		RunWith(t.conn).
873		Query()
874	if err != nil {
875		return nil, nil, err
876	}
877
878	defer Close(rows)
879
880	checkContainersExpiresAt := make(map[int]time.Time)
881	for rows.Next() {
882		var (
883			id        int
884			expiresAt pq.NullTime
885		)
886
887		err = rows.Scan(&id, &expiresAt)
888		if err != nil {
889			return nil, nil, err
890		}
891
892		checkContainersExpiresAt[id] = expiresAt.Time
893	}
894
895	return containers, checkContainersExpiresAt, nil
896}
897
898type UpdateName struct {
899	OldName string
900	NewName string
901}
902
903func updateJobsName(tx Tx, jobs []atc.JobConfig, pipelineID int) error {
904	jobsToUpdate := []UpdateName{}
905
906	for _, job := range jobs {
907		if job.OldName != "" && job.OldName != job.Name {
908			var count int
909			err := psql.Select("COUNT(*) as count").
910				From("jobs").
911				Where(sq.Eq{
912					"name":        job.OldName,
913					"pipeline_id": pipelineID}).
914				RunWith(tx).
915				QueryRow().
916				Scan(&count)
917			if err != nil {
918				return err
919			}
920
921			if count != 0 {
922				jobsToUpdate = append(jobsToUpdate, UpdateName{
923					OldName: job.OldName,
924					NewName: job.Name,
925				})
926			}
927		}
928	}
929
930	newMap := make(map[int]bool)
931	for _, updateNames := range jobsToUpdate {
932		isCyclic := checkCyclic(jobsToUpdate, updateNames.OldName, newMap)
933		if isCyclic {
934			return errors.New("job name swapping is not supported at this time")
935		}
936	}
937
938	jobsToUpdate = sortUpdateNames(jobsToUpdate)
939
940	for _, updateName := range jobsToUpdate {
941		_, err := psql.Delete("jobs").
942			Where(sq.Eq{
943				"name":        updateName.NewName,
944				"pipeline_id": pipelineID,
945				"active":      false}).
946			RunWith(tx).
947			Exec()
948		if err != nil {
949			return err
950		}
951
952		_, err = psql.Update("jobs").
953			Set("name", updateName.NewName).
954			Where(sq.Eq{"name": updateName.OldName, "pipeline_id": pipelineID}).
955			RunWith(tx).
956			Exec()
957		if err != nil {
958			return err
959		}
960	}
961
962	return nil
963}
964
965func updateResourcesName(tx Tx, resources []atc.ResourceConfig, pipelineID int) error {
966	resourcesToUpdate := []UpdateName{}
967
968	for _, res := range resources {
969		if res.OldName != "" && res.OldName != res.Name {
970			var count int
971			err := psql.Select("COUNT(*) as count").
972				From("resources").
973				Where(sq.Eq{
974					"name":        res.OldName,
975					"pipeline_id": pipelineID}).
976				RunWith(tx).
977				QueryRow().
978				Scan(&count)
979			if err != nil {
980				return err
981			}
982
983			if count != 0 {
984				resourcesToUpdate = append(resourcesToUpdate, UpdateName{
985					OldName: res.OldName,
986					NewName: res.Name,
987				})
988			}
989		}
990	}
991
992	newMap := make(map[int]bool)
993	for _, updateNames := range resourcesToUpdate {
994		isCyclic := checkCyclic(resourcesToUpdate, updateNames.OldName, newMap)
995		if isCyclic {
996			return errors.New("resource name swapping is not supported at this time")
997		}
998	}
999
1000	resourcesToUpdate = sortUpdateNames(resourcesToUpdate)
1001
1002	for _, updateName := range resourcesToUpdate {
1003		_, err := psql.Delete("resources").
1004			Where(sq.Eq{
1005				"name":        updateName.NewName,
1006				"pipeline_id": pipelineID}).
1007			RunWith(tx).
1008			Exec()
1009		if err != nil {
1010			return err
1011		}
1012
1013		_, err = psql.Update("resources").
1014			Set("name", updateName.NewName).
1015			Where(sq.Eq{"name": updateName.OldName, "pipeline_id": pipelineID}).
1016			RunWith(tx).
1017			Exec()
1018		if err != nil {
1019			return err
1020		}
1021	}
1022
1023	return nil
1024}
1025
1026func checkCyclic(updateNames []UpdateName, curr string, visited map[int]bool) bool {
1027	for i, updateName := range updateNames {
1028		if updateName.NewName == curr && !visited[i] {
1029			visited[i] = true
1030			checkCyclic(updateNames, updateName.OldName, visited)
1031		} else if updateName.NewName == curr && visited[i] && curr != updateName.OldName {
1032			return true
1033		}
1034	}
1035
1036	return false
1037}
1038
1039func sortUpdateNames(updateNames []UpdateName) []UpdateName {
1040	newMap := make(map[string]int)
1041	for i, updateName := range updateNames {
1042		newMap[updateName.NewName] = i + 1
1043
1044		if newMap[updateName.OldName] != 0 {
1045			index := newMap[updateName.OldName] - 1
1046
1047			tempName := updateNames[index]
1048			updateNames[index] = updateName
1049			updateNames[i] = tempName
1050
1051			return sortUpdateNames(updateNames)
1052		}
1053	}
1054
1055	return updateNames
1056}
1057
1058func saveJob(tx Tx, job atc.JobConfig, pipelineID int, groups []string) (int, error) {
1059	configPayload, err := json.Marshal(job)
1060	if err != nil {
1061		return 0, err
1062	}
1063
1064	es := tx.EncryptionStrategy()
1065	encryptedPayload, nonce, err := es.Encrypt(configPayload)
1066	if err != nil {
1067		return 0, err
1068	}
1069
1070	var jobID int
1071	err = psql.Insert("jobs").
1072		Columns("name", "pipeline_id", "config", "public", "max_in_flight", "disable_manual_trigger", "interruptible", "active", "nonce", "tags").
1073		Values(job.Name, pipelineID, encryptedPayload, job.Public, job.MaxInFlight(), job.DisableManualTrigger, job.Interruptible, true, nonce, pq.Array(groups)).
1074		Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, public = EXCLUDED.public, max_in_flight = EXCLUDED.max_in_flight, disable_manual_trigger = EXCLUDED.disable_manual_trigger, interruptible = EXCLUDED.interruptible, active = EXCLUDED.active, nonce = EXCLUDED.nonce, tags = EXCLUDED.tags").
1075		Suffix("RETURNING id").
1076		RunWith(tx).
1077		QueryRow().
1078		Scan(&jobID)
1079	if err != nil {
1080		return 0, err
1081	}
1082
1083	return jobID, nil
1084}
1085
1086func registerSerialGroup(tx Tx, serialGroup string, jobID int) error {
1087	_, err := psql.Insert("jobs_serial_groups").
1088		Columns("serial_group", "job_id").
1089		Values(serialGroup, jobID).
1090		RunWith(tx).
1091		Exec()
1092	return err
1093}
1094
1095func saveResource(tx Tx, resource atc.ResourceConfig, pipelineID int) (int, error) {
1096	configPayload, err := json.Marshal(resource)
1097	if err != nil {
1098		return 0, err
1099	}
1100
1101	es := tx.EncryptionStrategy()
1102	encryptedPayload, nonce, err := es.Encrypt(configPayload)
1103	if err != nil {
1104		return 0, err
1105	}
1106
1107	var resourceID int
1108	err = psql.Insert("resources").
1109		Columns("name", "pipeline_id", "config", "active", "nonce", "type").
1110		Values(resource.Name, pipelineID, encryptedPayload, true, nonce, resource.Type).
1111		Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, active = EXCLUDED.active, nonce = EXCLUDED.nonce, type = EXCLUDED.type").
1112		Suffix("RETURNING id").
1113		RunWith(tx).
1114		QueryRow().
1115		Scan(&resourceID)
1116	if err != nil {
1117		return 0, err
1118	}
1119
1120	_, err = psql.Delete("resource_pins").
1121		Where(sq.Eq{
1122			"resource_id": resourceID,
1123			"config":      true,
1124		}).
1125		RunWith(tx).
1126		Exec()
1127	if err != nil {
1128		return 0, err
1129	}
1130
1131	if resource.Version != nil {
1132		version, err := json.Marshal(resource.Version)
1133		if err != nil {
1134			return 0, err
1135		}
1136
1137		_, err = psql.Insert("resource_pins").
1138			Columns("resource_id", "version", "comment_text", "config").
1139			Values(resourceID, version, "", true).
1140			Suffix("ON CONFLICT (resource_id) DO UPDATE SET version = EXCLUDED.version, comment_text = EXCLUDED.comment_text, config = true").
1141			RunWith(tx).
1142			Exec()
1143		if err != nil {
1144			return 0, err
1145		}
1146	}
1147
1148	return resourceID, nil
1149}
1150
1151func saveResourceType(tx Tx, resourceType atc.ResourceType, pipelineID int) error {
1152	configPayload, err := json.Marshal(resourceType)
1153	if err != nil {
1154		return err
1155	}
1156
1157	es := tx.EncryptionStrategy()
1158	encryptedPayload, nonce, err := es.Encrypt(configPayload)
1159	if err != nil {
1160		return err
1161	}
1162
1163	_, err = psql.Insert("resource_types").
1164		Columns("name", "pipeline_id", "config", "active", "nonce", "type").
1165		Values(resourceType.Name, pipelineID, encryptedPayload, true, nonce, resourceType.Type).
1166		Suffix("ON CONFLICT (name, pipeline_id) DO UPDATE SET config = EXCLUDED.config, active = EXCLUDED.active, nonce = EXCLUDED.nonce, type = EXCLUDED.type").
1167		RunWith(tx).
1168		Exec()
1169
1170	return err
1171}
1172
1173func checkIfRowsUpdated(tx Tx, query string, params ...interface{}) (bool, error) {
1174	result, err := tx.Exec(query, params...)
1175	if err != nil {
1176		return false, err
1177	}
1178
1179	rows, err := result.RowsAffected()
1180	if err != nil {
1181		return false, err
1182	}
1183
1184	if rows == 0 {
1185		return false, nil
1186	}
1187
1188	return true, nil
1189}
1190
1191func swallowUniqueiolation(err error) error {
1192	if err != nil {
1193		if pgErr, ok := err.(*pq.Error); ok {
1194			if pgErr.Code.Class().Name() == "integrity_constraint_violation" {
1195				return nil
1196			}
1197		}
1198
1199		return err
1200	}
1201
1202	return nil
1203}
1204
1205func (t *team) findContainer(whereClause sq.Sqlizer) (CreatingContainer, CreatedContainer, error) {
1206	creating, created, destroying, _, err := scanContainer(
1207		selectContainers().
1208			Where(whereClause).
1209			RunWith(t.conn).
1210			QueryRow(),
1211		t.conn,
1212	)
1213	if err != nil {
1214		if err == sql.ErrNoRows {
1215			return nil, nil, nil
1216		}
1217		return nil, nil, err
1218	}
1219
1220	if destroying != nil {
1221		return nil, nil, nil
1222	}
1223
1224	return creating, created, nil
1225}
1226
1227func scanPipeline(p *pipeline, scan scannable) error {
1228	var (
1229		groups        sql.NullString
1230		varSources    sql.NullString
1231		display       sql.NullString
1232		nonce         sql.NullString
1233		nonceStr      *string
1234		lastUpdated   pq.NullTime
1235		parentJobID   sql.NullInt64
1236		parentBuildID sql.NullInt64
1237	)
1238	err := scan.Scan(&p.id, &p.name, &groups, &varSources, &display, &nonce, &p.configVersion, &p.teamID, &p.teamName, &p.paused, &p.public, &p.archived, &lastUpdated, &parentJobID, &parentBuildID)
1239	if err != nil {
1240		return err
1241	}
1242
1243	p.lastUpdated = lastUpdated.Time
1244	p.parentJobID = int(parentJobID.Int64)
1245	p.parentBuildID = int(parentBuildID.Int64)
1246
1247	if groups.Valid {
1248		var pipelineGroups atc.GroupConfigs
1249		err = json.Unmarshal([]byte(groups.String), &pipelineGroups)
1250		if err != nil {
1251			return err
1252		}
1253
1254		p.groups = pipelineGroups
1255	}
1256
1257	if nonce.Valid {
1258		nonceStr = &nonce.String
1259	}
1260
1261	if display.Valid {
1262		var displayConfig *atc.DisplayConfig
1263		err = json.Unmarshal([]byte(display.String), &displayConfig)
1264		if err != nil {
1265			return err
1266		}
1267
1268		p.display = displayConfig
1269	}
1270
1271	if varSources.Valid {
1272		var pipelineVarSources atc.VarSourceConfigs
1273		decryptedVarSource, err := p.conn.EncryptionStrategy().Decrypt(varSources.String, nonceStr)
1274		if err != nil {
1275			return err
1276		}
1277		err = json.Unmarshal([]byte(decryptedVarSource), &pipelineVarSources)
1278		if err != nil {
1279			return err
1280		}
1281
1282		p.varSources = pipelineVarSources
1283	}
1284
1285	return nil
1286}
1287
1288func scanPipelines(conn Conn, lockFactory lock.LockFactory, rows *sql.Rows) ([]Pipeline, error) {
1289	defer Close(rows)
1290
1291	pipelines := []Pipeline{}
1292
1293	for rows.Next() {
1294		pipeline := newPipeline(conn, lockFactory)
1295
1296		err := scanPipeline(pipeline, rows)
1297		if err != nil {
1298			return nil, err
1299		}
1300
1301		pipelines = append(pipelines, pipeline)
1302	}
1303
1304	return pipelines, nil
1305}
1306
1307func scanContainers(rows *sql.Rows, conn Conn, initContainers []Container) ([]Container, error) {
1308	containers := initContainers
1309
1310	defer Close(rows)
1311
1312	for rows.Next() {
1313		creating, created, destroying, _, err := scanContainer(rows, conn)
1314		if err != nil {
1315			return []Container{}, err
1316		}
1317
1318		if creating != nil {
1319			containers = append(containers, creating)
1320		}
1321
1322		if created != nil {
1323			containers = append(containers, created)
1324		}
1325
1326		if destroying != nil {
1327			containers = append(containers, destroying)
1328		}
1329	}
1330
1331	return containers, nil
1332}
1333
1334func (t *team) queryTeam(tx Tx, query string, params ...interface{}) error {
1335	var providerAuth, nonce sql.NullString
1336
1337	err := tx.QueryRow(query, params...).Scan(
1338		&t.id,
1339		&t.name,
1340		&t.admin,
1341		&providerAuth,
1342		&nonce,
1343	)
1344	if err != nil {
1345		return err
1346	}
1347
1348	if providerAuth.Valid {
1349		var auth atc.TeamAuth
1350		err = json.Unmarshal([]byte(providerAuth.String), &auth)
1351		if err != nil {
1352			return err
1353		}
1354		t.auth = auth
1355	}
1356
1357	return nil
1358}
1359
1360func resetDependentTableStates(tx Tx, pipelineID int) error {
1361	_, err := psql.Delete("jobs_serial_groups").
1362		Where(sq.Expr(`job_id in (
1363        SELECT j.id
1364        FROM jobs j
1365        WHERE j.pipeline_id = $1
1366      )`, pipelineID)).
1367		RunWith(tx).
1368		Exec()
1369	if err != nil {
1370		return err
1371	}
1372
1373	tableNames := []string{"jobs", "resources", "resource_types"}
1374	for _, table := range tableNames {
1375		err = inactivateTableForPipeline(tx, pipelineID, table)
1376		if err != nil {
1377			return err
1378		}
1379	}
1380	return err
1381}
1382
1383func inactivateTableForPipeline(tx Tx, pipelineID int, tableName string) error {
1384	_, err := psql.Update(tableName).
1385		Set("active", false).
1386		Where(sq.Eq{
1387			"pipeline_id": pipelineID,
1388		}).
1389		RunWith(tx).
1390		Exec()
1391	return err
1392}
1393
1394func saveResources(tx Tx, resources atc.ResourceConfigs, pipelineID int) (map[string]int, error) {
1395	resourceNameToID := make(map[string]int)
1396	for _, resource := range resources {
1397		resourceID, err := saveResource(tx, resource, pipelineID)
1398		if err != nil {
1399			return nil, err
1400		}
1401
1402		resourceNameToID[resource.Name] = resourceID
1403	}
1404
1405	return resourceNameToID, nil
1406}
1407
1408func saveResourceTypes(tx Tx, resourceTypes atc.ResourceTypes, pipelineID int) error {
1409	for _, resourceType := range resourceTypes {
1410		err := saveResourceType(tx, resourceType, pipelineID)
1411		if err != nil {
1412			return err
1413		}
1414	}
1415
1416	return nil
1417}
1418
1419func saveJobsAndSerialGroups(tx Tx, jobs atc.JobConfigs, groups atc.GroupConfigs, pipelineID int) (map[string]int, error) {
1420	jobGroups := make(map[string][]string)
1421	for _, group := range groups {
1422		for _, job := range group.Jobs {
1423			jobGroups[job] = append(jobGroups[job], group.Name)
1424		}
1425	}
1426
1427	jobNameToID := make(map[string]int)
1428	for _, job := range jobs {
1429		jobID, err := saveJob(tx, job, pipelineID, jobGroups[job.Name])
1430		if err != nil {
1431			return nil, err
1432		}
1433
1434		jobNameToID[job.Name] = jobID
1435
1436		if len(job.SerialGroups) != 0 {
1437			for _, sg := range job.SerialGroups {
1438				err = registerSerialGroup(tx, sg, jobID)
1439				if err != nil {
1440					return nil, err
1441				}
1442			}
1443		} else {
1444			if job.Serial || job.RawMaxInFlight > 0 {
1445				err = registerSerialGroup(tx, job.Name, jobID)
1446				if err != nil {
1447					return nil, err
1448				}
1449			}
1450		}
1451	}
1452
1453	return jobNameToID, nil
1454}
1455
1456func insertJobPipes(tx Tx, jobConfigs atc.JobConfigs, resourceNameToID map[string]int, jobNameToID map[string]int, pipelineID int) error {
1457	_, err := psql.Delete("job_inputs").
1458		Where(sq.Expr(`job_id in (
1459        SELECT j.id
1460        FROM jobs j
1461        WHERE j.pipeline_id = $1
1462      )`, pipelineID)).
1463		RunWith(tx).
1464		Exec()
1465	if err != nil {
1466		return err
1467	}
1468
1469	_, err = psql.Delete("job_outputs").
1470		Where(sq.Expr(`job_id in (
1471        SELECT j.id
1472        FROM jobs j
1473        WHERE j.pipeline_id = $1
1474      )`, pipelineID)).
1475		RunWith(tx).
1476		Exec()
1477	if err != nil {
1478		return err
1479	}
1480
1481	for _, jobConfig := range jobConfigs {
1482		err := jobConfig.StepConfig().Visit(atc.StepRecursor{
1483			OnGet: func(step *atc.GetStep) error {
1484				return insertJobInput(tx, step, jobConfig.Name, resourceNameToID, jobNameToID)
1485			},
1486			OnPut: func(step *atc.PutStep) error {
1487				return insertJobOutput(tx, step, jobConfig.Name, resourceNameToID, jobNameToID)
1488			},
1489		})
1490		if err != nil {
1491			return err
1492		}
1493	}
1494
1495	return nil
1496}
1497
1498func insertJobInput(tx Tx, step *atc.GetStep, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error {
1499	if len(step.Passed) != 0 {
1500		for _, passedJob := range step.Passed {
1501			var version sql.NullString
1502			if step.Version != nil {
1503				versionJSON, err := step.Version.MarshalJSON()
1504				if err != nil {
1505					return err
1506				}
1507
1508				version = sql.NullString{Valid: true, String: string(versionJSON)}
1509			}
1510
1511			_, err := psql.Insert("job_inputs").
1512				Columns("name", "job_id", "resource_id", "passed_job_id", "trigger", "version").
1513				Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()], jobNameToID[passedJob], step.Trigger, version).
1514				RunWith(tx).
1515				Exec()
1516			if err != nil {
1517				return err
1518			}
1519		}
1520	} else {
1521		var version sql.NullString
1522		if step.Version != nil {
1523			versionJSON, err := step.Version.MarshalJSON()
1524			if err != nil {
1525				return err
1526			}
1527
1528			version = sql.NullString{Valid: true, String: string(versionJSON)}
1529		}
1530
1531		_, err := psql.Insert("job_inputs").
1532			Columns("name", "job_id", "resource_id", "trigger", "version").
1533			Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()], step.Trigger, version).
1534			RunWith(tx).
1535			Exec()
1536		if err != nil {
1537			return err
1538		}
1539	}
1540
1541	return nil
1542}
1543
1544func insertJobOutput(tx Tx, step *atc.PutStep, jobName string, resourceNameToID map[string]int, jobNameToID map[string]int) error {
1545	_, err := psql.Insert("job_outputs").
1546		Columns("name", "job_id", "resource_id").
1547		Values(step.Name, jobNameToID[jobName], resourceNameToID[step.ResourceName()]).
1548		RunWith(tx).
1549		Exec()
1550	if err != nil {
1551		return err
1552	}
1553
1554	return nil
1555}
1556