1package db
2
3import (
4	"database/sql"
5	"encoding/json"
6	"time"
7
8	"code.cloudfoundry.org/lager"
9	sq "github.com/Masterminds/squirrel"
10	"github.com/concourse/concourse/atc"
11	"github.com/concourse/concourse/atc/db/lock"
12)
13
14//go:generate counterfeiter . ResourceConfigScope
15
16// ResourceConfigScope represents the relationship between a possible pipeline resource and a resource config.
17// When a resource is specified to have a unique version history either through its base resource type or its custom
18// resource type, it results in its generated resource config to be scoped to the resource. This relationship is
19// translated into its row in the resource config scopes table to have both the resource id and resource config id
20// populated. When a resource has a shared version history, its resource config is not scoped to the (or any) resource
21// and its row in the resource config scopes table will have the resource config id populated but a NULL value for
22// the resource id. Resource versions will therefore be directly dependent on a resource config scope.
23type ResourceConfigScope interface {
24	ID() int
25	Resource() Resource
26	ResourceConfig() ResourceConfig
27	CheckError() error
28
29	SaveVersions(SpanContext, []atc.Version) error
30	FindVersion(atc.Version) (ResourceConfigVersion, bool, error)
31	LatestVersion() (ResourceConfigVersion, bool, error)
32
33	SetCheckError(error) error
34
35	AcquireResourceCheckingLock(
36		logger lager.Logger,
37	) (lock.Lock, bool, error)
38
39	UpdateLastCheckStartTime(
40		interval time.Duration,
41		immediate bool,
42	) (bool, error)
43
44	UpdateLastCheckEndTime() (bool, error)
45}
46
47type resourceConfigScope struct {
48	id             int
49	resource       Resource
50	resourceConfig ResourceConfig
51	checkError     error
52
53	conn        Conn
54	lockFactory lock.LockFactory
55}
56
57func (r *resourceConfigScope) ID() int                        { return r.id }
58func (r *resourceConfigScope) Resource() Resource             { return r.resource }
59func (r *resourceConfigScope) ResourceConfig() ResourceConfig { return r.resourceConfig }
60func (r *resourceConfigScope) CheckError() error              { return r.checkError }
61
62// SaveVersions stores a list of version in the db for a resource config
63// Each version will also have its check order field updated and the
64// Cache index for pipelines using the resource config will be bumped.
65//
66// In the case of a check resource from an older version, the versions
67// that already exist in the DB will be re-ordered using
68// incrementCheckOrder to input the correct check order
69func (r *resourceConfigScope) SaveVersions(spanContext SpanContext, versions []atc.Version) error {
70	return saveVersions(r.conn, r.ID(), versions, spanContext)
71}
72
73func saveVersions(conn Conn, rcsID int, versions []atc.Version, spanContext SpanContext) error {
74	tx, err := conn.Begin()
75	if err != nil {
76		return err
77	}
78
79	defer Rollback(tx)
80
81	var containsNewVersion bool
82	for _, version := range versions {
83		newVersion, err := saveResourceVersion(tx, rcsID, version, nil, spanContext)
84		if err != nil {
85			return err
86		}
87
88		containsNewVersion = containsNewVersion || newVersion
89	}
90
91	if containsNewVersion {
92		// bump the check order of all the versions returned by the check if there
93		// is at least one new version within the set of returned versions
94		for _, version := range versions {
95			versionJSON, err := json.Marshal(version)
96			if err != nil {
97				return err
98			}
99
100			err = incrementCheckOrder(tx, rcsID, string(versionJSON))
101			if err != nil {
102				return err
103			}
104		}
105
106		err = requestScheduleForJobsUsingResourceConfigScope(tx, rcsID)
107		if err != nil {
108			return err
109		}
110	}
111
112	err = tx.Commit()
113	if err != nil {
114		return err
115	}
116
117	return nil
118}
119
120func (r *resourceConfigScope) FindVersion(v atc.Version) (ResourceConfigVersion, bool, error) {
121	rcv := &resourceConfigVersion{
122		resourceConfigScope: r,
123		conn:                r.conn,
124	}
125
126	versionByte, err := json.Marshal(v)
127	if err != nil {
128		return nil, false, err
129	}
130
131	row := resourceConfigVersionQuery.
132		Where(sq.Eq{
133			"v.resource_config_scope_id": r.id,
134		}).
135		Where(sq.Expr("v.version_md5 = md5(?)", versionByte)).
136		RunWith(r.conn).
137		QueryRow()
138
139	err = scanResourceConfigVersion(rcv, row)
140	if err != nil {
141		if err == sql.ErrNoRows {
142			return nil, false, nil
143		}
144		return nil, false, err
145	}
146
147	return rcv, true, nil
148}
149
150func (r *resourceConfigScope) LatestVersion() (ResourceConfigVersion, bool, error) {
151	rcv := &resourceConfigVersion{
152		conn:                r.conn,
153		resourceConfigScope: r,
154	}
155
156	row := resourceConfigVersionQuery.
157		Where(sq.Eq{"v.resource_config_scope_id": r.id}).
158		OrderBy("v.check_order DESC").
159		Limit(1).
160		RunWith(r.conn).
161		QueryRow()
162
163	err := scanResourceConfigVersion(rcv, row)
164	if err != nil {
165		if err == sql.ErrNoRows {
166			return nil, false, nil
167		}
168		return nil, false, err
169	}
170
171	return rcv, true, nil
172}
173
174func (r *resourceConfigScope) SetCheckError(cause error) error {
175	var err error
176
177	if cause == nil {
178		_, err = psql.Update("resource_config_scopes").
179			Set("check_error", nil).
180			Where(sq.Eq{"id": r.id}).
181			RunWith(r.conn).
182			Exec()
183	} else {
184		_, err = psql.Update("resource_config_scopes").
185			Set("check_error", cause.Error()).
186			Where(sq.Eq{"id": r.id}).
187			RunWith(r.conn).
188			Exec()
189	}
190
191	return err
192}
193
194func (r *resourceConfigScope) AcquireResourceCheckingLock(
195	logger lager.Logger,
196) (lock.Lock, bool, error) {
197	return r.lockFactory.Acquire(
198		logger,
199		lock.NewResourceConfigCheckingLockID(r.resourceConfig.ID()),
200	)
201}
202
203func (r *resourceConfigScope) UpdateLastCheckStartTime(
204	interval time.Duration,
205	immediate bool,
206) (bool, error) {
207	tx, err := r.conn.Begin()
208	if err != nil {
209		return false, err
210	}
211
212	defer Rollback(tx)
213
214	params := []interface{}{r.id}
215
216	condition := ""
217	if !immediate {
218		condition = "AND now() - last_check_start_time > ($2 || ' SECONDS')::INTERVAL"
219		params = append(params, interval.Seconds())
220	}
221
222	updated, err := checkIfRowsUpdated(tx, `
223			UPDATE resource_config_scopes
224			SET last_check_start_time = now()
225			WHERE id = $1
226		`+condition, params...)
227	if err != nil {
228		return false, err
229	}
230
231	if !updated {
232		return false, nil
233	}
234
235	err = tx.Commit()
236	if err != nil {
237		return false, err
238	}
239
240	return true, nil
241}
242
243func (r *resourceConfigScope) UpdateLastCheckEndTime() (bool, error) {
244	tx, err := r.conn.Begin()
245	if err != nil {
246		return false, err
247	}
248
249	defer Rollback(tx)
250
251	updated, err := checkIfRowsUpdated(tx, `
252			UPDATE resource_config_scopes
253			SET last_check_end_time = now()
254			WHERE id = $1
255		`, r.id)
256	if err != nil {
257		return false, err
258	}
259
260	if !updated {
261		return false, nil
262	}
263
264	err = tx.Commit()
265	if err != nil {
266		return false, err
267	}
268
269	return true, nil
270}
271
272func saveResourceVersion(tx Tx, rcsID int, version atc.Version, metadata ResourceConfigMetadataFields, spanContext SpanContext) (bool, error) {
273	versionJSON, err := json.Marshal(version)
274	if err != nil {
275		return false, err
276	}
277
278	metadataJSON, err := json.Marshal(metadata)
279	if err != nil {
280		return false, err
281	}
282
283	spanContextJSON, err := json.Marshal(spanContext)
284	if err != nil {
285		return false, err
286	}
287
288	var checkOrder int
289	err = tx.QueryRow(`
290		INSERT INTO resource_config_versions (resource_config_scope_id, version, version_md5, metadata, span_context)
291		SELECT $1, $2, md5($3), $4, $5
292		ON CONFLICT (resource_config_scope_id, version_md5)
293		DO UPDATE SET metadata = COALESCE(NULLIF(excluded.metadata, 'null'::jsonb), resource_config_versions.metadata)
294		RETURNING check_order
295		`, rcsID, string(versionJSON), string(versionJSON), string(metadataJSON), string(spanContextJSON)).Scan(&checkOrder)
296	if err != nil {
297		return false, err
298	}
299
300	return checkOrder == 0, nil
301}
302
303// increment the check order if the version's check order is less than the
304// current max. This will fix the case of a check from an old version causing
305// the desired order to change; existing versions will be re-ordered since
306// we add them in the desired order.
307func incrementCheckOrder(tx Tx, rcsID int, version string) error {
308	_, err := tx.Exec(`
309		WITH max_checkorder AS (
310			SELECT max(check_order) co
311			FROM resource_config_versions
312			WHERE resource_config_scope_id = $1
313		)
314
315		UPDATE resource_config_versions
316		SET check_order = mc.co + 1
317		FROM max_checkorder mc
318		WHERE resource_config_scope_id = $1
319		AND version_md5 = md5($2)
320		AND check_order <= mc.co;`, rcsID, version)
321	return err
322}
323
324// The SELECT query orders the jobs for updating to prevent deadlocking.
325// Updating multiple rows using a SELECT subquery does not preserve the same
326// order for the updates, which can lead to deadlocking.
327func requestScheduleForJobsUsingResourceConfigScope(tx Tx, rcsID int) error {
328	rows, err := psql.Select("DISTINCT j.job_id").
329		From("job_inputs j").
330		Join("resources r ON r.id = j.resource_id").
331		Where(sq.Eq{
332			"r.resource_config_scope_id": rcsID,
333			"j.passed_job_id":            nil,
334		}).
335		OrderBy("j.job_id DESC").
336		RunWith(tx).
337		Query()
338	if err != nil {
339		return err
340	}
341
342	var jobIDs []int
343	for rows.Next() {
344		var id int
345		err = rows.Scan(&id)
346		if err != nil {
347			return err
348		}
349
350		jobIDs = append(jobIDs, id)
351	}
352
353	for _, jID := range jobIDs {
354		_, err := psql.Update("jobs").
355			Set("schedule_requested", sq.Expr("now()")).
356			Where(sq.Eq{
357				"id": jID,
358			}).
359			RunWith(tx).
360			Exec()
361		if err != nil {
362			return err
363		}
364	}
365
366	return nil
367}
368