1package db
2
3import (
4	"database/sql"
5	"encoding/json"
6	"errors"
7	"fmt"
8	"strconv"
9	"time"
10
11	sq "github.com/Masterminds/squirrel"
12	"github.com/lib/pq"
13
14	"github.com/concourse/concourse/atc"
15	"github.com/concourse/concourse/atc/db/lock"
16)
17
18var ErrPinnedThroughConfig = errors.New("resource is pinned through config")
19
20//go:generate counterfeiter . Resource
21
22type Resource interface {
23	PipelineRef
24
25	ID() int
26	Name() string
27	Public() bool
28	TeamID() int
29	TeamName() string
30	Type() string
31	Source() atc.Source
32	CheckEvery() string
33	CheckTimeout() string
34	LastCheckStartTime() time.Time
35	LastCheckEndTime() time.Time
36	Tags() atc.Tags
37	CheckSetupError() error
38	CheckError() error
39	WebhookToken() string
40	Config() atc.ResourceConfig
41	ConfigPinnedVersion() atc.Version
42	APIPinnedVersion() atc.Version
43	PinComment() string
44	SetPinComment(string) error
45	ResourceConfigID() int
46	ResourceConfigScopeID() int
47	Icon() string
48
49	HasWebhook() bool
50
51	CurrentPinnedVersion() atc.Version
52
53	ResourceConfigVersionID(atc.Version) (int, bool, error)
54	Versions(page Page, versionFilter atc.Version) ([]atc.ResourceVersion, Pagination, bool, error)
55	SaveUncheckedVersion(atc.Version, ResourceConfigMetadataFields, ResourceConfig, atc.VersionedResourceTypes) (bool, error)
56	UpdateMetadata(atc.Version, ResourceConfigMetadataFields) (bool, error)
57
58	EnableVersion(rcvID int) error
59	DisableVersion(rcvID int) error
60
61	PinVersion(rcvID int) (bool, error)
62	UnpinVersion() error
63
64	SetResourceConfig(atc.Source, atc.VersionedResourceTypes) (ResourceConfigScope, error)
65	SetCheckSetupError(error) error
66	NotifyScan() error
67
68	Reload() (bool, error)
69}
70
71var resourcesQuery = psql.Select(
72	"r.id",
73	"r.name",
74	"r.type",
75	"r.config",
76	"r.check_error",
77	"rs.last_check_start_time",
78	"rs.last_check_end_time",
79	"r.pipeline_id",
80	"r.nonce",
81	"r.resource_config_id",
82	"r.resource_config_scope_id",
83	"p.name",
84	"t.id",
85	"t.name",
86	"rs.check_error",
87	"rp.version",
88	"rp.comment_text",
89	"rp.config",
90).
91	From("resources r").
92	Join("pipelines p ON p.id = r.pipeline_id").
93	Join("teams t ON t.id = p.team_id").
94	LeftJoin("resource_config_scopes rs ON r.resource_config_scope_id = rs.id").
95	LeftJoin("resource_pins rp ON rp.resource_id = r.id").
96	Where(sq.Eq{"r.active": true})
97
98type resource struct {
99	pipelineRef
100
101	id                    int
102	name                  string
103	teamID                int
104	teamName              string
105	type_                 string
106	lastCheckStartTime    time.Time
107	lastCheckEndTime      time.Time
108	checkSetupError       error
109	checkError            error
110	config                atc.ResourceConfig
111	configPinnedVersion   atc.Version
112	apiPinnedVersion      atc.Version
113	pinComment            string
114	resourceConfigID      int
115	resourceConfigScopeID int
116}
117
118func newEmptyResource(conn Conn, lockFactory lock.LockFactory) *resource {
119	return &resource{pipelineRef: pipelineRef{conn: conn, lockFactory: lockFactory}}
120}
121
122type ResourceNotFoundError struct {
123	ID int
124}
125
126func (e ResourceNotFoundError) Error() string {
127	return fmt.Sprintf("resource '%d' not found", e.ID)
128}
129
130type Resources []Resource
131
132func (resources Resources) Lookup(name string) (Resource, bool) {
133	for _, resource := range resources {
134		if resource.Name() == name {
135			return resource, true
136		}
137	}
138
139	return nil, false
140}
141
142func (resources Resources) Configs() atc.ResourceConfigs {
143	var configs atc.ResourceConfigs
144	for _, r := range resources {
145		configs = append(configs, r.Config())
146	}
147	return configs
148}
149
150func (r *resource) ID() int                          { return r.id }
151func (r *resource) Name() string                     { return r.name }
152func (r *resource) Public() bool                     { return r.config.Public }
153func (r *resource) TeamID() int                      { return r.teamID }
154func (r *resource) TeamName() string                 { return r.teamName }
155func (r *resource) Type() string                     { return r.type_ }
156func (r *resource) Source() atc.Source               { return r.config.Source }
157func (r *resource) CheckEvery() string               { return r.config.CheckEvery }
158func (r *resource) CheckTimeout() string             { return r.config.CheckTimeout }
159func (r *resource) LastCheckStartTime() time.Time    { return r.lastCheckStartTime }
160func (r *resource) LastCheckEndTime() time.Time      { return r.lastCheckEndTime }
161func (r *resource) Tags() atc.Tags                   { return r.config.Tags }
162func (r *resource) CheckSetupError() error           { return r.checkSetupError }
163func (r *resource) CheckError() error                { return r.checkError }
164func (r *resource) WebhookToken() string             { return r.config.WebhookToken }
165func (r *resource) Config() atc.ResourceConfig       { return r.config }
166func (r *resource) ConfigPinnedVersion() atc.Version { return r.configPinnedVersion }
167func (r *resource) APIPinnedVersion() atc.Version    { return r.apiPinnedVersion }
168func (r *resource) PinComment() string               { return r.pinComment }
169func (r *resource) ResourceConfigID() int            { return r.resourceConfigID }
170func (r *resource) ResourceConfigScopeID() int       { return r.resourceConfigScopeID }
171func (r *resource) Icon() string                     { return r.config.Icon }
172
173func (r *resource) HasWebhook() bool { return r.WebhookToken() != "" }
174
175func (r *resource) Reload() (bool, error) {
176	row := resourcesQuery.Where(sq.Eq{"r.id": r.id}).
177		RunWith(r.conn).
178		QueryRow()
179
180	err := scanResource(r, row)
181	if err != nil {
182		if err == sql.ErrNoRows {
183			return false, nil
184		}
185		return false, err
186	}
187
188	return true, nil
189}
190
191func (r *resource) SetResourceConfig(source atc.Source, resourceTypes atc.VersionedResourceTypes) (ResourceConfigScope, error) {
192	resourceConfigDescriptor, err := constructResourceConfigDescriptor(r.type_, source, resourceTypes)
193	if err != nil {
194		return nil, err
195	}
196
197	tx, err := r.conn.Begin()
198	if err != nil {
199		return nil, err
200	}
201
202	defer Rollback(tx)
203
204	resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, r.lockFactory, r.conn)
205	if err != nil {
206		return nil, err
207	}
208
209	_, err = psql.Update("resources").
210		Set("resource_config_id", resourceConfig.ID()).
211		Where(sq.Eq{"id": r.id}).
212		Where(sq.Or{
213			sq.Eq{"resource_config_id": nil},
214			sq.NotEq{"resource_config_id": resourceConfig.ID()},
215		}).
216		RunWith(tx).
217		Exec()
218	if err != nil {
219		return nil, err
220	}
221
222	resourceConfigScope, err := findOrCreateResourceConfigScope(tx, r.conn, r.lockFactory, resourceConfig, r, r.type_, resourceTypes)
223	if err != nil {
224		return nil, err
225	}
226
227	results, err := psql.Update("resources").
228		Set("resource_config_scope_id", resourceConfigScope.ID()).
229		Where(sq.Eq{"id": r.id}).
230		Where(sq.Or{
231			sq.Eq{"resource_config_scope_id": nil},
232			sq.NotEq{"resource_config_scope_id": resourceConfigScope.ID()},
233		}).
234		RunWith(tx).
235		Exec()
236	if err != nil {
237		return nil, err
238	}
239
240	rowsAffected, err := results.RowsAffected()
241	if err != nil {
242		return nil, err
243	}
244
245	if rowsAffected > 0 {
246		err = requestScheduleForJobsUsingResource(tx, r.id)
247		if err != nil {
248			return nil, err
249		}
250	}
251
252	err = tx.Commit()
253	if err != nil {
254		return nil, err
255	}
256
257	return resourceConfigScope, nil
258}
259
260func (r *resource) SetCheckSetupError(cause error) error {
261	var err error
262
263	if cause == nil {
264		_, err = psql.Update("resources").
265			Set("check_error", nil).
266			Where(sq.And{
267				sq.Eq{"id": r.ID()},
268				sq.NotEq{"check_error": nil},
269			}).
270			RunWith(r.conn).
271			Exec()
272	} else {
273		_, err = psql.Update("resources").
274			Set("check_error", cause.Error()).
275			Where(sq.Eq{"id": r.ID()}).
276			RunWith(r.conn).
277			Exec()
278	}
279
280	return err
281}
282
283// XXX: only used for tests
284func (r *resource) SaveUncheckedVersion(version atc.Version, metadata ResourceConfigMetadataFields, resourceConfig ResourceConfig, resourceTypes atc.VersionedResourceTypes) (bool, error) {
285	tx, err := r.conn.Begin()
286	if err != nil {
287		return false, err
288	}
289
290	defer Rollback(tx)
291
292	resourceConfigScope, err := findOrCreateResourceConfigScope(tx, r.conn, r.lockFactory, resourceConfig, r, r.type_, resourceTypes)
293	if err != nil {
294		return false, err
295	}
296
297	newVersion, err := saveResourceVersion(tx, resourceConfigScope.ID(), version, metadata, nil)
298	if err != nil {
299		return false, err
300	}
301
302	return newVersion, tx.Commit()
303}
304
305func (r *resource) UpdateMetadata(version atc.Version, metadata ResourceConfigMetadataFields) (bool, error) {
306	versionJSON, err := json.Marshal(version)
307	if err != nil {
308		return false, err
309	}
310
311	metadataJSON, err := json.Marshal(metadata)
312	if err != nil {
313		return false, err
314	}
315
316	_, err = psql.Update("resource_config_versions").
317		Set("metadata", string(metadataJSON)).
318		Where(sq.Eq{
319			"resource_config_scope_id": r.ResourceConfigScopeID(),
320		}).
321		Where(sq.Expr(
322			"version_md5 = md5(?)", versionJSON,
323		)).
324		RunWith(r.conn).
325		Exec()
326
327	if err != nil {
328		if err == sql.ErrNoRows {
329			return false, nil
330		}
331		return false, err
332	}
333	return true, nil
334}
335
336func (r *resource) ResourceConfigVersionID(version atc.Version) (int, bool, error) {
337	requestedVersion, err := json.Marshal(version)
338	if err != nil {
339		return 0, false, err
340	}
341
342	var id int
343
344	err = psql.Select("rcv.id").
345		From("resource_config_versions rcv").
346		Join("resources r ON rcv.resource_config_scope_id = r.resource_config_scope_id").
347		Where(sq.Eq{"r.id": r.ID()}).
348		Where(sq.Expr("version @> ?", requestedVersion)).
349		Where(sq.NotEq{"rcv.check_order": 0}).
350		OrderBy("rcv.check_order DESC").
351		RunWith(r.conn).
352		QueryRow().
353		Scan(&id)
354
355	if err != nil {
356		if err == sql.ErrNoRows {
357			return 0, false, nil
358		}
359		return 0, false, err
360	}
361
362	return id, true, nil
363}
364
365func (r *resource) SetPinComment(comment string) error {
366	_, err := psql.Update("resource_pins").
367		Set("comment_text", comment).
368		Where(sq.Eq{"resource_id": r.ID()}).
369		RunWith(r.conn).
370		Exec()
371
372	return err
373}
374
375func (r *resource) CurrentPinnedVersion() atc.Version {
376	if r.configPinnedVersion != nil {
377		return r.configPinnedVersion
378	} else if r.apiPinnedVersion != nil {
379		return r.apiPinnedVersion
380	}
381	return nil
382}
383
384func (r *resource) Versions(page Page, versionFilter atc.Version) ([]atc.ResourceVersion, Pagination, bool, error) {
385	tx, err := r.conn.Begin()
386	if err != nil {
387		return nil, Pagination{}, false, err
388	}
389
390	defer Rollback(tx)
391
392	query := `
393		SELECT v.id, v.version, v.metadata, v.check_order,
394			NOT EXISTS (
395				SELECT 1
396				FROM resource_disabled_versions d
397				WHERE v.version_md5 = d.version_md5
398				AND r.resource_config_scope_id = v.resource_config_scope_id
399				AND r.id = d.resource_id
400			)
401		FROM resource_config_versions v, resources r
402		WHERE r.id = $1 AND r.resource_config_scope_id = v.resource_config_scope_id AND v.check_order != 0
403	`
404
405	filterJSON := "{}"
406	if len(versionFilter) != 0 {
407		filterBytes, err := json.Marshal(versionFilter)
408		if err != nil {
409			return nil, Pagination{}, false, err
410		}
411
412		filterJSON = string(filterBytes)
413	}
414
415	var rows *sql.Rows
416	if page.From != nil {
417		rows, err = tx.Query(fmt.Sprintf(`
418			SELECT sub.*
419				FROM (
420						%s
421					AND version @> $4
422					AND v.check_order >= (SELECT check_order FROM resource_config_versions WHERE id = $2)
423				ORDER BY v.check_order ASC
424				LIMIT $3
425			) sub
426			ORDER BY sub.check_order DESC
427		`, query), r.id, *page.From, page.Limit, filterJSON)
428		if err != nil {
429			return nil, Pagination{}, false, err
430		}
431	} else if page.To != nil {
432		rows, err = tx.Query(fmt.Sprintf(`
433			%s
434				AND version @> $4
435				AND v.check_order <= (SELECT check_order FROM resource_config_versions WHERE id = $2)
436			ORDER BY v.check_order DESC
437			LIMIT $3
438		`, query), r.id, *page.To, page.Limit, filterJSON)
439		if err != nil {
440			return nil, Pagination{}, false, err
441		}
442	} else {
443		rows, err = tx.Query(fmt.Sprintf(`
444			%s
445			AND version @> $3
446			ORDER BY v.check_order DESC
447			LIMIT $2
448		`, query), r.id, page.Limit, filterJSON)
449		if err != nil {
450			return nil, Pagination{}, false, err
451		}
452	}
453
454	defer Close(rows)
455
456	type rcvCheckOrder struct {
457		ResourceConfigVersionID int
458		CheckOrder              int
459	}
460
461	rvs := make([]atc.ResourceVersion, 0)
462	checkOrderRVs := make([]rcvCheckOrder, 0)
463	for rows.Next() {
464		var (
465			metadataBytes sql.NullString
466			versionBytes  string
467			checkOrder    int
468		)
469
470		rv := atc.ResourceVersion{}
471		err := rows.Scan(&rv.ID, &versionBytes, &metadataBytes, &checkOrder, &rv.Enabled)
472		if err != nil {
473			return nil, Pagination{}, false, err
474		}
475
476		err = json.Unmarshal([]byte(versionBytes), &rv.Version)
477		if err != nil {
478			return nil, Pagination{}, false, err
479		}
480
481		if metadataBytes.Valid {
482			err = json.Unmarshal([]byte(metadataBytes.String), &rv.Metadata)
483			if err != nil {
484				return nil, Pagination{}, false, err
485			}
486		}
487
488		checkOrderRV := rcvCheckOrder{
489			ResourceConfigVersionID: rv.ID,
490			CheckOrder:              checkOrder,
491		}
492
493		rvs = append(rvs, rv)
494		checkOrderRVs = append(checkOrderRVs, checkOrderRV)
495	}
496
497	if len(rvs) == 0 {
498		return nil, Pagination{}, true, nil
499	}
500
501	newestRCVCheckOrder := checkOrderRVs[0]
502	oldestRCVCheckOrder := checkOrderRVs[len(checkOrderRVs)-1]
503
504	var pagination Pagination
505
506	var olderRCVId int
507	err = tx.QueryRow(`
508		SELECT v.id
509		FROM resource_config_versions v, resources r
510		WHERE v.check_order < $2 AND r.id = $1 AND v.resource_config_scope_id = r.resource_config_scope_id
511		ORDER BY v.check_order DESC
512		LIMIT 1
513	`, r.id, oldestRCVCheckOrder.CheckOrder).Scan(&olderRCVId)
514	if err != nil && err != sql.ErrNoRows {
515		return nil, Pagination{}, false, err
516	} else if err == nil {
517		pagination.Older = &Page{
518			To:    &olderRCVId,
519			Limit: page.Limit,
520		}
521	}
522
523	var newerRCVId int
524	err = tx.QueryRow(`
525		SELECT v.id
526		FROM resource_config_versions v, resources r
527		WHERE v.check_order > $2 AND r.id = $1 AND v.resource_config_scope_id = r.resource_config_scope_id
528		ORDER BY v.check_order ASC
529		LIMIT 1
530	`, r.id, newestRCVCheckOrder.CheckOrder).Scan(&newerRCVId)
531	if err != nil && err != sql.ErrNoRows {
532		return nil, Pagination{}, false, err
533	} else if err == nil {
534		pagination.Newer = &Page{
535			From:  &newerRCVId,
536			Limit: page.Limit,
537		}
538	}
539
540	err = tx.Commit()
541	if err != nil {
542		return nil, Pagination{}, false, nil
543	}
544
545	return rvs, pagination, true, nil
546}
547
548func (r *resource) EnableVersion(rcvID int) error {
549	return r.toggleVersion(rcvID, true)
550}
551
552func (r *resource) DisableVersion(rcvID int) error {
553	return r.toggleVersion(rcvID, false)
554}
555
556func (r *resource) PinVersion(rcvID int) (bool, error) {
557	tx, err := r.conn.Begin()
558	if err != nil {
559		return false, err
560	}
561	defer Rollback(tx)
562	var pinnedThroughConfig bool
563	err = tx.QueryRow(`
564		SELECT EXISTS (
565			SELECT 1
566			FROM resource_pins
567			WHERE resource_id = $1
568			AND config
569		)`, r.id).Scan(&pinnedThroughConfig)
570	if err != nil {
571		return false, err
572	}
573
574	if pinnedThroughConfig {
575		return false, ErrPinnedThroughConfig
576	}
577
578	results, err := tx.Exec(`
579	    INSERT INTO resource_pins(resource_id, version, comment_text, config)
580			VALUES ($1,
581				( SELECT rcv.version
582				FROM resource_config_versions rcv
583				WHERE rcv.id = $2 ),
584				'', false)
585			ON CONFLICT (resource_id) DO UPDATE SET version=EXCLUDED.version`, r.id, rcvID)
586	if err != nil {
587		if err == sql.ErrNoRows {
588			return false, nil
589		}
590		return false, err
591	}
592
593	rowsAffected, err := results.RowsAffected()
594	if err != nil {
595		return false, err
596	}
597
598	if rowsAffected != 1 {
599		return false, nil
600	}
601
602	err = requestScheduleForJobsUsingResource(tx, r.id)
603	if err != nil {
604		return false, err
605	}
606
607	err = tx.Commit()
608	if err != nil {
609		return false, err
610	}
611
612	return true, nil
613}
614
615func (r *resource) UnpinVersion() error {
616	tx, err := r.conn.Begin()
617	if err != nil {
618		return err
619	}
620
621	defer tx.Rollback()
622
623	results, err := psql.Delete("resource_pins").
624		Where(sq.Eq{"resource_pins.resource_id": r.id}).
625		RunWith(tx).
626		Exec()
627	if err != nil {
628		return err
629	}
630
631	rowsAffected, err := results.RowsAffected()
632	if err != nil {
633		return err
634	}
635
636	if rowsAffected != 1 {
637		return NonOneRowAffectedError{rowsAffected}
638	}
639
640	err = requestScheduleForJobsUsingResource(tx, r.id)
641	if err != nil {
642		return err
643	}
644
645	err = tx.Commit()
646	if err != nil {
647		return err
648	}
649
650	return nil
651}
652
653func (r *resource) toggleVersion(rcvID int, enable bool) error {
654	tx, err := r.conn.Begin()
655	if err != nil {
656		return err
657	}
658
659	defer Rollback(tx)
660
661	var results sql.Result
662	if enable {
663		results, err = tx.Exec(`
664			DELETE FROM resource_disabled_versions
665			WHERE resource_id = $1
666			AND version_md5 = (SELECT version_md5 FROM resource_config_versions rcv WHERE rcv.id = $2)
667			`, r.id, rcvID)
668	} else {
669		results, err = tx.Exec(`
670			INSERT INTO resource_disabled_versions (resource_id, version_md5)
671			SELECT $1, rcv.version_md5
672			FROM resource_config_versions rcv
673			WHERE rcv.id = $2
674			`, r.id, rcvID)
675	}
676	if err != nil {
677		return err
678	}
679
680	rowsAffected, err := results.RowsAffected()
681	if err != nil {
682		return err
683	}
684
685	if rowsAffected != 1 {
686		return NonOneRowAffectedError{rowsAffected}
687	}
688
689	err = requestScheduleForJobsUsingResource(tx, r.id)
690	if err != nil {
691		return err
692	}
693
694	return tx.Commit()
695}
696
697func (r *resource) NotifyScan() error {
698	return r.conn.Bus().Notify(fmt.Sprintf("resource_scan_%d", r.id))
699}
700
701func scanResource(r *resource, row scannable) error {
702	var (
703		configBlob                                                               sql.NullString
704		checkErr, rcsCheckErr, nonce, rcID, rcScopeID, pinnedVersion, pinComment sql.NullString
705		lastCheckStartTime, lastCheckEndTime                                     pq.NullTime
706		pinnedThroughConfig                                                      sql.NullBool
707	)
708
709	err := row.Scan(&r.id, &r.name, &r.type_, &configBlob, &checkErr, &lastCheckStartTime, &lastCheckEndTime, &r.pipelineID, &nonce, &rcID, &rcScopeID, &r.pipelineName, &r.teamID, &r.teamName, &rcsCheckErr, &pinnedVersion, &pinComment, &pinnedThroughConfig)
710	if err != nil {
711		return err
712	}
713
714	r.lastCheckStartTime = lastCheckStartTime.Time
715	r.lastCheckEndTime = lastCheckEndTime.Time
716
717	es := r.conn.EncryptionStrategy()
718
719	var noncense *string
720	if nonce.Valid {
721		noncense = &nonce.String
722	}
723
724	if configBlob.Valid {
725		decryptedConfig, err := es.Decrypt(configBlob.String, noncense)
726		if err != nil {
727			return err
728		}
729
730		err = json.Unmarshal(decryptedConfig, &r.config)
731		if err != nil {
732			return err
733		}
734	} else {
735		r.config = atc.ResourceConfig{}
736	}
737
738	if pinnedVersion.Valid {
739		var version atc.Version
740		err = json.Unmarshal([]byte(pinnedVersion.String), &version)
741		if err != nil {
742			return err
743		}
744
745		if pinnedThroughConfig.Valid && pinnedThroughConfig.Bool {
746			r.configPinnedVersion = version
747			r.apiPinnedVersion = nil
748		} else {
749			r.configPinnedVersion = nil
750			r.apiPinnedVersion = version
751		}
752	} else {
753		r.apiPinnedVersion = nil
754		r.configPinnedVersion = nil
755	}
756
757	if pinComment.Valid {
758		r.pinComment = pinComment.String
759	} else {
760		r.pinComment = ""
761	}
762
763	if checkErr.Valid {
764		r.checkSetupError = errors.New(checkErr.String)
765	} else {
766		r.checkSetupError = nil
767	}
768
769	if rcsCheckErr.Valid {
770		r.checkError = errors.New(rcsCheckErr.String)
771	} else {
772		r.checkError = nil
773	}
774
775	if rcID.Valid {
776		r.resourceConfigID, err = strconv.Atoi(rcID.String)
777		if err != nil {
778			return err
779		}
780	}
781
782	if rcScopeID.Valid {
783		r.resourceConfigScopeID, err = strconv.Atoi(rcScopeID.String)
784		if err != nil {
785			return err
786		}
787	}
788
789	return nil
790}
791
792// The SELECT query orders the jobs for updating to prevent deadlocking.
793// Updating multiple rows using a SELECT subquery does not preserve the same
794// order for the updates, which can lead to deadlocking.
795func requestScheduleForJobsUsingResource(tx Tx, resourceID int) error {
796	rows, err := psql.Select("DISTINCT job_id").
797		From("job_inputs").
798		Where(sq.Eq{
799			"resource_id": resourceID,
800		}).
801		OrderBy("job_id DESC").
802		RunWith(tx).
803		Query()
804	if err != nil {
805		return err
806	}
807
808	var jobs []int
809	for rows.Next() {
810		var jid int
811		err = rows.Scan(&jid)
812		if err != nil {
813			return err
814		}
815
816		jobs = append(jobs, jid)
817	}
818
819	for _, j := range jobs {
820		_, err := psql.Update("jobs").
821			Set("schedule_requested", sq.Expr("now()")).
822			Where(sq.Eq{
823				"id": j,
824			}).
825			RunWith(tx).
826			Exec()
827		if err != nil {
828			return err
829		}
830	}
831
832	return nil
833}
834