1package db 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "strconv" 9 "time" 10 11 sq "github.com/Masterminds/squirrel" 12 "github.com/lib/pq" 13 14 "github.com/concourse/concourse/atc" 15 "github.com/concourse/concourse/atc/db/lock" 16) 17 18var ErrPinnedThroughConfig = errors.New("resource is pinned through config") 19 20//go:generate counterfeiter . Resource 21 22type Resource interface { 23 PipelineRef 24 25 ID() int 26 Name() string 27 Public() bool 28 TeamID() int 29 TeamName() string 30 Type() string 31 Source() atc.Source 32 CheckEvery() string 33 CheckTimeout() string 34 LastCheckStartTime() time.Time 35 LastCheckEndTime() time.Time 36 Tags() atc.Tags 37 CheckSetupError() error 38 CheckError() error 39 WebhookToken() string 40 Config() atc.ResourceConfig 41 ConfigPinnedVersion() atc.Version 42 APIPinnedVersion() atc.Version 43 PinComment() string 44 SetPinComment(string) error 45 ResourceConfigID() int 46 ResourceConfigScopeID() int 47 Icon() string 48 49 HasWebhook() bool 50 51 CurrentPinnedVersion() atc.Version 52 53 ResourceConfigVersionID(atc.Version) (int, bool, error) 54 Versions(page Page, versionFilter atc.Version) ([]atc.ResourceVersion, Pagination, bool, error) 55 SaveUncheckedVersion(atc.Version, ResourceConfigMetadataFields, ResourceConfig, atc.VersionedResourceTypes) (bool, error) 56 UpdateMetadata(atc.Version, ResourceConfigMetadataFields) (bool, error) 57 58 EnableVersion(rcvID int) error 59 DisableVersion(rcvID int) error 60 61 PinVersion(rcvID int) (bool, error) 62 UnpinVersion() error 63 64 SetResourceConfig(atc.Source, atc.VersionedResourceTypes) (ResourceConfigScope, error) 65 SetCheckSetupError(error) error 66 NotifyScan() error 67 68 Reload() (bool, error) 69} 70 71var resourcesQuery = psql.Select( 72 "r.id", 73 "r.name", 74 "r.type", 75 "r.config", 76 "r.check_error", 77 "rs.last_check_start_time", 78 "rs.last_check_end_time", 79 "r.pipeline_id", 80 "r.nonce", 81 "r.resource_config_id", 82 "r.resource_config_scope_id", 83 "p.name", 84 "t.id", 85 "t.name", 86 "rs.check_error", 87 "rp.version", 88 "rp.comment_text", 89 "rp.config", 90). 91 From("resources r"). 92 Join("pipelines p ON p.id = r.pipeline_id"). 93 Join("teams t ON t.id = p.team_id"). 94 LeftJoin("resource_config_scopes rs ON r.resource_config_scope_id = rs.id"). 95 LeftJoin("resource_pins rp ON rp.resource_id = r.id"). 96 Where(sq.Eq{"r.active": true}) 97 98type resource struct { 99 pipelineRef 100 101 id int 102 name string 103 teamID int 104 teamName string 105 type_ string 106 lastCheckStartTime time.Time 107 lastCheckEndTime time.Time 108 checkSetupError error 109 checkError error 110 config atc.ResourceConfig 111 configPinnedVersion atc.Version 112 apiPinnedVersion atc.Version 113 pinComment string 114 resourceConfigID int 115 resourceConfigScopeID int 116} 117 118func newEmptyResource(conn Conn, lockFactory lock.LockFactory) *resource { 119 return &resource{pipelineRef: pipelineRef{conn: conn, lockFactory: lockFactory}} 120} 121 122type ResourceNotFoundError struct { 123 ID int 124} 125 126func (e ResourceNotFoundError) Error() string { 127 return fmt.Sprintf("resource '%d' not found", e.ID) 128} 129 130type Resources []Resource 131 132func (resources Resources) Lookup(name string) (Resource, bool) { 133 for _, resource := range resources { 134 if resource.Name() == name { 135 return resource, true 136 } 137 } 138 139 return nil, false 140} 141 142func (resources Resources) Configs() atc.ResourceConfigs { 143 var configs atc.ResourceConfigs 144 for _, r := range resources { 145 configs = append(configs, r.Config()) 146 } 147 return configs 148} 149 150func (r *resource) ID() int { return r.id } 151func (r *resource) Name() string { return r.name } 152func (r *resource) Public() bool { return r.config.Public } 153func (r *resource) TeamID() int { return r.teamID } 154func (r *resource) TeamName() string { return r.teamName } 155func (r *resource) Type() string { return r.type_ } 156func (r *resource) Source() atc.Source { return r.config.Source } 157func (r *resource) CheckEvery() string { return r.config.CheckEvery } 158func (r *resource) CheckTimeout() string { return r.config.CheckTimeout } 159func (r *resource) LastCheckStartTime() time.Time { return r.lastCheckStartTime } 160func (r *resource) LastCheckEndTime() time.Time { return r.lastCheckEndTime } 161func (r *resource) Tags() atc.Tags { return r.config.Tags } 162func (r *resource) CheckSetupError() error { return r.checkSetupError } 163func (r *resource) CheckError() error { return r.checkError } 164func (r *resource) WebhookToken() string { return r.config.WebhookToken } 165func (r *resource) Config() atc.ResourceConfig { return r.config } 166func (r *resource) ConfigPinnedVersion() atc.Version { return r.configPinnedVersion } 167func (r *resource) APIPinnedVersion() atc.Version { return r.apiPinnedVersion } 168func (r *resource) PinComment() string { return r.pinComment } 169func (r *resource) ResourceConfigID() int { return r.resourceConfigID } 170func (r *resource) ResourceConfigScopeID() int { return r.resourceConfigScopeID } 171func (r *resource) Icon() string { return r.config.Icon } 172 173func (r *resource) HasWebhook() bool { return r.WebhookToken() != "" } 174 175func (r *resource) Reload() (bool, error) { 176 row := resourcesQuery.Where(sq.Eq{"r.id": r.id}). 177 RunWith(r.conn). 178 QueryRow() 179 180 err := scanResource(r, row) 181 if err != nil { 182 if err == sql.ErrNoRows { 183 return false, nil 184 } 185 return false, err 186 } 187 188 return true, nil 189} 190 191func (r *resource) SetResourceConfig(source atc.Source, resourceTypes atc.VersionedResourceTypes) (ResourceConfigScope, error) { 192 resourceConfigDescriptor, err := constructResourceConfigDescriptor(r.type_, source, resourceTypes) 193 if err != nil { 194 return nil, err 195 } 196 197 tx, err := r.conn.Begin() 198 if err != nil { 199 return nil, err 200 } 201 202 defer Rollback(tx) 203 204 resourceConfig, err := resourceConfigDescriptor.findOrCreate(tx, r.lockFactory, r.conn) 205 if err != nil { 206 return nil, err 207 } 208 209 _, err = psql.Update("resources"). 210 Set("resource_config_id", resourceConfig.ID()). 211 Where(sq.Eq{"id": r.id}). 212 Where(sq.Or{ 213 sq.Eq{"resource_config_id": nil}, 214 sq.NotEq{"resource_config_id": resourceConfig.ID()}, 215 }). 216 RunWith(tx). 217 Exec() 218 if err != nil { 219 return nil, err 220 } 221 222 resourceConfigScope, err := findOrCreateResourceConfigScope(tx, r.conn, r.lockFactory, resourceConfig, r, r.type_, resourceTypes) 223 if err != nil { 224 return nil, err 225 } 226 227 results, err := psql.Update("resources"). 228 Set("resource_config_scope_id", resourceConfigScope.ID()). 229 Where(sq.Eq{"id": r.id}). 230 Where(sq.Or{ 231 sq.Eq{"resource_config_scope_id": nil}, 232 sq.NotEq{"resource_config_scope_id": resourceConfigScope.ID()}, 233 }). 234 RunWith(tx). 235 Exec() 236 if err != nil { 237 return nil, err 238 } 239 240 rowsAffected, err := results.RowsAffected() 241 if err != nil { 242 return nil, err 243 } 244 245 if rowsAffected > 0 { 246 err = requestScheduleForJobsUsingResource(tx, r.id) 247 if err != nil { 248 return nil, err 249 } 250 } 251 252 err = tx.Commit() 253 if err != nil { 254 return nil, err 255 } 256 257 return resourceConfigScope, nil 258} 259 260func (r *resource) SetCheckSetupError(cause error) error { 261 var err error 262 263 if cause == nil { 264 _, err = psql.Update("resources"). 265 Set("check_error", nil). 266 Where(sq.And{ 267 sq.Eq{"id": r.ID()}, 268 sq.NotEq{"check_error": nil}, 269 }). 270 RunWith(r.conn). 271 Exec() 272 } else { 273 _, err = psql.Update("resources"). 274 Set("check_error", cause.Error()). 275 Where(sq.Eq{"id": r.ID()}). 276 RunWith(r.conn). 277 Exec() 278 } 279 280 return err 281} 282 283// XXX: only used for tests 284func (r *resource) SaveUncheckedVersion(version atc.Version, metadata ResourceConfigMetadataFields, resourceConfig ResourceConfig, resourceTypes atc.VersionedResourceTypes) (bool, error) { 285 tx, err := r.conn.Begin() 286 if err != nil { 287 return false, err 288 } 289 290 defer Rollback(tx) 291 292 resourceConfigScope, err := findOrCreateResourceConfigScope(tx, r.conn, r.lockFactory, resourceConfig, r, r.type_, resourceTypes) 293 if err != nil { 294 return false, err 295 } 296 297 newVersion, err := saveResourceVersion(tx, resourceConfigScope.ID(), version, metadata, nil) 298 if err != nil { 299 return false, err 300 } 301 302 return newVersion, tx.Commit() 303} 304 305func (r *resource) UpdateMetadata(version atc.Version, metadata ResourceConfigMetadataFields) (bool, error) { 306 versionJSON, err := json.Marshal(version) 307 if err != nil { 308 return false, err 309 } 310 311 metadataJSON, err := json.Marshal(metadata) 312 if err != nil { 313 return false, err 314 } 315 316 _, err = psql.Update("resource_config_versions"). 317 Set("metadata", string(metadataJSON)). 318 Where(sq.Eq{ 319 "resource_config_scope_id": r.ResourceConfigScopeID(), 320 }). 321 Where(sq.Expr( 322 "version_md5 = md5(?)", versionJSON, 323 )). 324 RunWith(r.conn). 325 Exec() 326 327 if err != nil { 328 if err == sql.ErrNoRows { 329 return false, nil 330 } 331 return false, err 332 } 333 return true, nil 334} 335 336func (r *resource) ResourceConfigVersionID(version atc.Version) (int, bool, error) { 337 requestedVersion, err := json.Marshal(version) 338 if err != nil { 339 return 0, false, err 340 } 341 342 var id int 343 344 err = psql.Select("rcv.id"). 345 From("resource_config_versions rcv"). 346 Join("resources r ON rcv.resource_config_scope_id = r.resource_config_scope_id"). 347 Where(sq.Eq{"r.id": r.ID()}). 348 Where(sq.Expr("version @> ?", requestedVersion)). 349 Where(sq.NotEq{"rcv.check_order": 0}). 350 OrderBy("rcv.check_order DESC"). 351 RunWith(r.conn). 352 QueryRow(). 353 Scan(&id) 354 355 if err != nil { 356 if err == sql.ErrNoRows { 357 return 0, false, nil 358 } 359 return 0, false, err 360 } 361 362 return id, true, nil 363} 364 365func (r *resource) SetPinComment(comment string) error { 366 _, err := psql.Update("resource_pins"). 367 Set("comment_text", comment). 368 Where(sq.Eq{"resource_id": r.ID()}). 369 RunWith(r.conn). 370 Exec() 371 372 return err 373} 374 375func (r *resource) CurrentPinnedVersion() atc.Version { 376 if r.configPinnedVersion != nil { 377 return r.configPinnedVersion 378 } else if r.apiPinnedVersion != nil { 379 return r.apiPinnedVersion 380 } 381 return nil 382} 383 384func (r *resource) Versions(page Page, versionFilter atc.Version) ([]atc.ResourceVersion, Pagination, bool, error) { 385 tx, err := r.conn.Begin() 386 if err != nil { 387 return nil, Pagination{}, false, err 388 } 389 390 defer Rollback(tx) 391 392 query := ` 393 SELECT v.id, v.version, v.metadata, v.check_order, 394 NOT EXISTS ( 395 SELECT 1 396 FROM resource_disabled_versions d 397 WHERE v.version_md5 = d.version_md5 398 AND r.resource_config_scope_id = v.resource_config_scope_id 399 AND r.id = d.resource_id 400 ) 401 FROM resource_config_versions v, resources r 402 WHERE r.id = $1 AND r.resource_config_scope_id = v.resource_config_scope_id AND v.check_order != 0 403 ` 404 405 filterJSON := "{}" 406 if len(versionFilter) != 0 { 407 filterBytes, err := json.Marshal(versionFilter) 408 if err != nil { 409 return nil, Pagination{}, false, err 410 } 411 412 filterJSON = string(filterBytes) 413 } 414 415 var rows *sql.Rows 416 if page.From != nil { 417 rows, err = tx.Query(fmt.Sprintf(` 418 SELECT sub.* 419 FROM ( 420 %s 421 AND version @> $4 422 AND v.check_order >= (SELECT check_order FROM resource_config_versions WHERE id = $2) 423 ORDER BY v.check_order ASC 424 LIMIT $3 425 ) sub 426 ORDER BY sub.check_order DESC 427 `, query), r.id, *page.From, page.Limit, filterJSON) 428 if err != nil { 429 return nil, Pagination{}, false, err 430 } 431 } else if page.To != nil { 432 rows, err = tx.Query(fmt.Sprintf(` 433 %s 434 AND version @> $4 435 AND v.check_order <= (SELECT check_order FROM resource_config_versions WHERE id = $2) 436 ORDER BY v.check_order DESC 437 LIMIT $3 438 `, query), r.id, *page.To, page.Limit, filterJSON) 439 if err != nil { 440 return nil, Pagination{}, false, err 441 } 442 } else { 443 rows, err = tx.Query(fmt.Sprintf(` 444 %s 445 AND version @> $3 446 ORDER BY v.check_order DESC 447 LIMIT $2 448 `, query), r.id, page.Limit, filterJSON) 449 if err != nil { 450 return nil, Pagination{}, false, err 451 } 452 } 453 454 defer Close(rows) 455 456 type rcvCheckOrder struct { 457 ResourceConfigVersionID int 458 CheckOrder int 459 } 460 461 rvs := make([]atc.ResourceVersion, 0) 462 checkOrderRVs := make([]rcvCheckOrder, 0) 463 for rows.Next() { 464 var ( 465 metadataBytes sql.NullString 466 versionBytes string 467 checkOrder int 468 ) 469 470 rv := atc.ResourceVersion{} 471 err := rows.Scan(&rv.ID, &versionBytes, &metadataBytes, &checkOrder, &rv.Enabled) 472 if err != nil { 473 return nil, Pagination{}, false, err 474 } 475 476 err = json.Unmarshal([]byte(versionBytes), &rv.Version) 477 if err != nil { 478 return nil, Pagination{}, false, err 479 } 480 481 if metadataBytes.Valid { 482 err = json.Unmarshal([]byte(metadataBytes.String), &rv.Metadata) 483 if err != nil { 484 return nil, Pagination{}, false, err 485 } 486 } 487 488 checkOrderRV := rcvCheckOrder{ 489 ResourceConfigVersionID: rv.ID, 490 CheckOrder: checkOrder, 491 } 492 493 rvs = append(rvs, rv) 494 checkOrderRVs = append(checkOrderRVs, checkOrderRV) 495 } 496 497 if len(rvs) == 0 { 498 return nil, Pagination{}, true, nil 499 } 500 501 newestRCVCheckOrder := checkOrderRVs[0] 502 oldestRCVCheckOrder := checkOrderRVs[len(checkOrderRVs)-1] 503 504 var pagination Pagination 505 506 var olderRCVId int 507 err = tx.QueryRow(` 508 SELECT v.id 509 FROM resource_config_versions v, resources r 510 WHERE v.check_order < $2 AND r.id = $1 AND v.resource_config_scope_id = r.resource_config_scope_id 511 ORDER BY v.check_order DESC 512 LIMIT 1 513 `, r.id, oldestRCVCheckOrder.CheckOrder).Scan(&olderRCVId) 514 if err != nil && err != sql.ErrNoRows { 515 return nil, Pagination{}, false, err 516 } else if err == nil { 517 pagination.Older = &Page{ 518 To: &olderRCVId, 519 Limit: page.Limit, 520 } 521 } 522 523 var newerRCVId int 524 err = tx.QueryRow(` 525 SELECT v.id 526 FROM resource_config_versions v, resources r 527 WHERE v.check_order > $2 AND r.id = $1 AND v.resource_config_scope_id = r.resource_config_scope_id 528 ORDER BY v.check_order ASC 529 LIMIT 1 530 `, r.id, newestRCVCheckOrder.CheckOrder).Scan(&newerRCVId) 531 if err != nil && err != sql.ErrNoRows { 532 return nil, Pagination{}, false, err 533 } else if err == nil { 534 pagination.Newer = &Page{ 535 From: &newerRCVId, 536 Limit: page.Limit, 537 } 538 } 539 540 err = tx.Commit() 541 if err != nil { 542 return nil, Pagination{}, false, nil 543 } 544 545 return rvs, pagination, true, nil 546} 547 548func (r *resource) EnableVersion(rcvID int) error { 549 return r.toggleVersion(rcvID, true) 550} 551 552func (r *resource) DisableVersion(rcvID int) error { 553 return r.toggleVersion(rcvID, false) 554} 555 556func (r *resource) PinVersion(rcvID int) (bool, error) { 557 tx, err := r.conn.Begin() 558 if err != nil { 559 return false, err 560 } 561 defer Rollback(tx) 562 var pinnedThroughConfig bool 563 err = tx.QueryRow(` 564 SELECT EXISTS ( 565 SELECT 1 566 FROM resource_pins 567 WHERE resource_id = $1 568 AND config 569 )`, r.id).Scan(&pinnedThroughConfig) 570 if err != nil { 571 return false, err 572 } 573 574 if pinnedThroughConfig { 575 return false, ErrPinnedThroughConfig 576 } 577 578 results, err := tx.Exec(` 579 INSERT INTO resource_pins(resource_id, version, comment_text, config) 580 VALUES ($1, 581 ( SELECT rcv.version 582 FROM resource_config_versions rcv 583 WHERE rcv.id = $2 ), 584 '', false) 585 ON CONFLICT (resource_id) DO UPDATE SET version=EXCLUDED.version`, r.id, rcvID) 586 if err != nil { 587 if err == sql.ErrNoRows { 588 return false, nil 589 } 590 return false, err 591 } 592 593 rowsAffected, err := results.RowsAffected() 594 if err != nil { 595 return false, err 596 } 597 598 if rowsAffected != 1 { 599 return false, nil 600 } 601 602 err = requestScheduleForJobsUsingResource(tx, r.id) 603 if err != nil { 604 return false, err 605 } 606 607 err = tx.Commit() 608 if err != nil { 609 return false, err 610 } 611 612 return true, nil 613} 614 615func (r *resource) UnpinVersion() error { 616 tx, err := r.conn.Begin() 617 if err != nil { 618 return err 619 } 620 621 defer tx.Rollback() 622 623 results, err := psql.Delete("resource_pins"). 624 Where(sq.Eq{"resource_pins.resource_id": r.id}). 625 RunWith(tx). 626 Exec() 627 if err != nil { 628 return err 629 } 630 631 rowsAffected, err := results.RowsAffected() 632 if err != nil { 633 return err 634 } 635 636 if rowsAffected != 1 { 637 return NonOneRowAffectedError{rowsAffected} 638 } 639 640 err = requestScheduleForJobsUsingResource(tx, r.id) 641 if err != nil { 642 return err 643 } 644 645 err = tx.Commit() 646 if err != nil { 647 return err 648 } 649 650 return nil 651} 652 653func (r *resource) toggleVersion(rcvID int, enable bool) error { 654 tx, err := r.conn.Begin() 655 if err != nil { 656 return err 657 } 658 659 defer Rollback(tx) 660 661 var results sql.Result 662 if enable { 663 results, err = tx.Exec(` 664 DELETE FROM resource_disabled_versions 665 WHERE resource_id = $1 666 AND version_md5 = (SELECT version_md5 FROM resource_config_versions rcv WHERE rcv.id = $2) 667 `, r.id, rcvID) 668 } else { 669 results, err = tx.Exec(` 670 INSERT INTO resource_disabled_versions (resource_id, version_md5) 671 SELECT $1, rcv.version_md5 672 FROM resource_config_versions rcv 673 WHERE rcv.id = $2 674 `, r.id, rcvID) 675 } 676 if err != nil { 677 return err 678 } 679 680 rowsAffected, err := results.RowsAffected() 681 if err != nil { 682 return err 683 } 684 685 if rowsAffected != 1 { 686 return NonOneRowAffectedError{rowsAffected} 687 } 688 689 err = requestScheduleForJobsUsingResource(tx, r.id) 690 if err != nil { 691 return err 692 } 693 694 return tx.Commit() 695} 696 697func (r *resource) NotifyScan() error { 698 return r.conn.Bus().Notify(fmt.Sprintf("resource_scan_%d", r.id)) 699} 700 701func scanResource(r *resource, row scannable) error { 702 var ( 703 configBlob sql.NullString 704 checkErr, rcsCheckErr, nonce, rcID, rcScopeID, pinnedVersion, pinComment sql.NullString 705 lastCheckStartTime, lastCheckEndTime pq.NullTime 706 pinnedThroughConfig sql.NullBool 707 ) 708 709 err := row.Scan(&r.id, &r.name, &r.type_, &configBlob, &checkErr, &lastCheckStartTime, &lastCheckEndTime, &r.pipelineID, &nonce, &rcID, &rcScopeID, &r.pipelineName, &r.teamID, &r.teamName, &rcsCheckErr, &pinnedVersion, &pinComment, &pinnedThroughConfig) 710 if err != nil { 711 return err 712 } 713 714 r.lastCheckStartTime = lastCheckStartTime.Time 715 r.lastCheckEndTime = lastCheckEndTime.Time 716 717 es := r.conn.EncryptionStrategy() 718 719 var noncense *string 720 if nonce.Valid { 721 noncense = &nonce.String 722 } 723 724 if configBlob.Valid { 725 decryptedConfig, err := es.Decrypt(configBlob.String, noncense) 726 if err != nil { 727 return err 728 } 729 730 err = json.Unmarshal(decryptedConfig, &r.config) 731 if err != nil { 732 return err 733 } 734 } else { 735 r.config = atc.ResourceConfig{} 736 } 737 738 if pinnedVersion.Valid { 739 var version atc.Version 740 err = json.Unmarshal([]byte(pinnedVersion.String), &version) 741 if err != nil { 742 return err 743 } 744 745 if pinnedThroughConfig.Valid && pinnedThroughConfig.Bool { 746 r.configPinnedVersion = version 747 r.apiPinnedVersion = nil 748 } else { 749 r.configPinnedVersion = nil 750 r.apiPinnedVersion = version 751 } 752 } else { 753 r.apiPinnedVersion = nil 754 r.configPinnedVersion = nil 755 } 756 757 if pinComment.Valid { 758 r.pinComment = pinComment.String 759 } else { 760 r.pinComment = "" 761 } 762 763 if checkErr.Valid { 764 r.checkSetupError = errors.New(checkErr.String) 765 } else { 766 r.checkSetupError = nil 767 } 768 769 if rcsCheckErr.Valid { 770 r.checkError = errors.New(rcsCheckErr.String) 771 } else { 772 r.checkError = nil 773 } 774 775 if rcID.Valid { 776 r.resourceConfigID, err = strconv.Atoi(rcID.String) 777 if err != nil { 778 return err 779 } 780 } 781 782 if rcScopeID.Valid { 783 r.resourceConfigScopeID, err = strconv.Atoi(rcScopeID.String) 784 if err != nil { 785 return err 786 } 787 } 788 789 return nil 790} 791 792// The SELECT query orders the jobs for updating to prevent deadlocking. 793// Updating multiple rows using a SELECT subquery does not preserve the same 794// order for the updates, which can lead to deadlocking. 795func requestScheduleForJobsUsingResource(tx Tx, resourceID int) error { 796 rows, err := psql.Select("DISTINCT job_id"). 797 From("job_inputs"). 798 Where(sq.Eq{ 799 "resource_id": resourceID, 800 }). 801 OrderBy("job_id DESC"). 802 RunWith(tx). 803 Query() 804 if err != nil { 805 return err 806 } 807 808 var jobs []int 809 for rows.Next() { 810 var jid int 811 err = rows.Scan(&jid) 812 if err != nil { 813 return err 814 } 815 816 jobs = append(jobs, jid) 817 } 818 819 for _, j := range jobs { 820 _, err := psql.Update("jobs"). 821 Set("schedule_requested", sq.Expr("now()")). 822 Where(sq.Eq{ 823 "id": j, 824 }). 825 RunWith(tx). 826 Exec() 827 if err != nil { 828 return err 829 } 830 } 831 832 return nil 833} 834