1package db 2 3import ( 4 "database/sql" 5 "encoding/json" 6 "time" 7 8 "code.cloudfoundry.org/lager" 9 sq "github.com/Masterminds/squirrel" 10 "github.com/concourse/concourse/atc" 11 "github.com/concourse/concourse/atc/db/lock" 12) 13 14//go:generate counterfeiter . ResourceConfigScope 15 16// ResourceConfigScope represents the relationship between a possible pipeline resource and a resource config. 17// When a resource is specified to have a unique version history either through its base resource type or its custom 18// resource type, it results in its generated resource config to be scoped to the resource. This relationship is 19// translated into its row in the resource config scopes table to have both the resource id and resource config id 20// populated. When a resource has a shared version history, its resource config is not scoped to the (or any) resource 21// and its row in the resource config scopes table will have the resource config id populated but a NULL value for 22// the resource id. Resource versions will therefore be directly dependent on a resource config scope. 23type ResourceConfigScope interface { 24 ID() int 25 Resource() Resource 26 ResourceConfig() ResourceConfig 27 CheckError() error 28 29 SaveVersions(SpanContext, []atc.Version) error 30 FindVersion(atc.Version) (ResourceConfigVersion, bool, error) 31 LatestVersion() (ResourceConfigVersion, bool, error) 32 33 SetCheckError(error) error 34 35 AcquireResourceCheckingLock( 36 logger lager.Logger, 37 ) (lock.Lock, bool, error) 38 39 UpdateLastCheckStartTime( 40 interval time.Duration, 41 immediate bool, 42 ) (bool, error) 43 44 UpdateLastCheckEndTime() (bool, error) 45} 46 47type resourceConfigScope struct { 48 id int 49 resource Resource 50 resourceConfig ResourceConfig 51 checkError error 52 53 conn Conn 54 lockFactory lock.LockFactory 55} 56 57func (r *resourceConfigScope) ID() int { return r.id } 58func (r *resourceConfigScope) Resource() Resource { return r.resource } 59func (r *resourceConfigScope) ResourceConfig() ResourceConfig { return r.resourceConfig } 60func (r *resourceConfigScope) CheckError() error { return r.checkError } 61 62// SaveVersions stores a list of version in the db for a resource config 63// Each version will also have its check order field updated and the 64// Cache index for pipelines using the resource config will be bumped. 65// 66// In the case of a check resource from an older version, the versions 67// that already exist in the DB will be re-ordered using 68// incrementCheckOrder to input the correct check order 69func (r *resourceConfigScope) SaveVersions(spanContext SpanContext, versions []atc.Version) error { 70 return saveVersions(r.conn, r.ID(), versions, spanContext) 71} 72 73func saveVersions(conn Conn, rcsID int, versions []atc.Version, spanContext SpanContext) error { 74 tx, err := conn.Begin() 75 if err != nil { 76 return err 77 } 78 79 defer Rollback(tx) 80 81 var containsNewVersion bool 82 for _, version := range versions { 83 newVersion, err := saveResourceVersion(tx, rcsID, version, nil, spanContext) 84 if err != nil { 85 return err 86 } 87 88 containsNewVersion = containsNewVersion || newVersion 89 } 90 91 if containsNewVersion { 92 // bump the check order of all the versions returned by the check if there 93 // is at least one new version within the set of returned versions 94 for _, version := range versions { 95 versionJSON, err := json.Marshal(version) 96 if err != nil { 97 return err 98 } 99 100 err = incrementCheckOrder(tx, rcsID, string(versionJSON)) 101 if err != nil { 102 return err 103 } 104 } 105 106 err = requestScheduleForJobsUsingResourceConfigScope(tx, rcsID) 107 if err != nil { 108 return err 109 } 110 } 111 112 err = tx.Commit() 113 if err != nil { 114 return err 115 } 116 117 return nil 118} 119 120func (r *resourceConfigScope) FindVersion(v atc.Version) (ResourceConfigVersion, bool, error) { 121 rcv := &resourceConfigVersion{ 122 resourceConfigScope: r, 123 conn: r.conn, 124 } 125 126 versionByte, err := json.Marshal(v) 127 if err != nil { 128 return nil, false, err 129 } 130 131 row := resourceConfigVersionQuery. 132 Where(sq.Eq{ 133 "v.resource_config_scope_id": r.id, 134 }). 135 Where(sq.Expr("v.version_md5 = md5(?)", versionByte)). 136 RunWith(r.conn). 137 QueryRow() 138 139 err = scanResourceConfigVersion(rcv, row) 140 if err != nil { 141 if err == sql.ErrNoRows { 142 return nil, false, nil 143 } 144 return nil, false, err 145 } 146 147 return rcv, true, nil 148} 149 150func (r *resourceConfigScope) LatestVersion() (ResourceConfigVersion, bool, error) { 151 rcv := &resourceConfigVersion{ 152 conn: r.conn, 153 resourceConfigScope: r, 154 } 155 156 row := resourceConfigVersionQuery. 157 Where(sq.Eq{"v.resource_config_scope_id": r.id}). 158 OrderBy("v.check_order DESC"). 159 Limit(1). 160 RunWith(r.conn). 161 QueryRow() 162 163 err := scanResourceConfigVersion(rcv, row) 164 if err != nil { 165 if err == sql.ErrNoRows { 166 return nil, false, nil 167 } 168 return nil, false, err 169 } 170 171 return rcv, true, nil 172} 173 174func (r *resourceConfigScope) SetCheckError(cause error) error { 175 var err error 176 177 if cause == nil { 178 _, err = psql.Update("resource_config_scopes"). 179 Set("check_error", nil). 180 Where(sq.Eq{"id": r.id}). 181 RunWith(r.conn). 182 Exec() 183 } else { 184 _, err = psql.Update("resource_config_scopes"). 185 Set("check_error", cause.Error()). 186 Where(sq.Eq{"id": r.id}). 187 RunWith(r.conn). 188 Exec() 189 } 190 191 return err 192} 193 194func (r *resourceConfigScope) AcquireResourceCheckingLock( 195 logger lager.Logger, 196) (lock.Lock, bool, error) { 197 return r.lockFactory.Acquire( 198 logger, 199 lock.NewResourceConfigCheckingLockID(r.resourceConfig.ID()), 200 ) 201} 202 203func (r *resourceConfigScope) UpdateLastCheckStartTime( 204 interval time.Duration, 205 immediate bool, 206) (bool, error) { 207 tx, err := r.conn.Begin() 208 if err != nil { 209 return false, err 210 } 211 212 defer Rollback(tx) 213 214 params := []interface{}{r.id} 215 216 condition := "" 217 if !immediate { 218 condition = "AND now() - last_check_start_time > ($2 || ' SECONDS')::INTERVAL" 219 params = append(params, interval.Seconds()) 220 } 221 222 updated, err := checkIfRowsUpdated(tx, ` 223 UPDATE resource_config_scopes 224 SET last_check_start_time = now() 225 WHERE id = $1 226 `+condition, params...) 227 if err != nil { 228 return false, err 229 } 230 231 if !updated { 232 return false, nil 233 } 234 235 err = tx.Commit() 236 if err != nil { 237 return false, err 238 } 239 240 return true, nil 241} 242 243func (r *resourceConfigScope) UpdateLastCheckEndTime() (bool, error) { 244 tx, err := r.conn.Begin() 245 if err != nil { 246 return false, err 247 } 248 249 defer Rollback(tx) 250 251 updated, err := checkIfRowsUpdated(tx, ` 252 UPDATE resource_config_scopes 253 SET last_check_end_time = now() 254 WHERE id = $1 255 `, r.id) 256 if err != nil { 257 return false, err 258 } 259 260 if !updated { 261 return false, nil 262 } 263 264 err = tx.Commit() 265 if err != nil { 266 return false, err 267 } 268 269 return true, nil 270} 271 272func saveResourceVersion(tx Tx, rcsID int, version atc.Version, metadata ResourceConfigMetadataFields, spanContext SpanContext) (bool, error) { 273 versionJSON, err := json.Marshal(version) 274 if err != nil { 275 return false, err 276 } 277 278 metadataJSON, err := json.Marshal(metadata) 279 if err != nil { 280 return false, err 281 } 282 283 spanContextJSON, err := json.Marshal(spanContext) 284 if err != nil { 285 return false, err 286 } 287 288 var checkOrder int 289 err = tx.QueryRow(` 290 INSERT INTO resource_config_versions (resource_config_scope_id, version, version_md5, metadata, span_context) 291 SELECT $1, $2, md5($3), $4, $5 292 ON CONFLICT (resource_config_scope_id, version_md5) 293 DO UPDATE SET metadata = COALESCE(NULLIF(excluded.metadata, 'null'::jsonb), resource_config_versions.metadata) 294 RETURNING check_order 295 `, rcsID, string(versionJSON), string(versionJSON), string(metadataJSON), string(spanContextJSON)).Scan(&checkOrder) 296 if err != nil { 297 return false, err 298 } 299 300 return checkOrder == 0, nil 301} 302 303// increment the check order if the version's check order is less than the 304// current max. This will fix the case of a check from an old version causing 305// the desired order to change; existing versions will be re-ordered since 306// we add them in the desired order. 307func incrementCheckOrder(tx Tx, rcsID int, version string) error { 308 _, err := tx.Exec(` 309 WITH max_checkorder AS ( 310 SELECT max(check_order) co 311 FROM resource_config_versions 312 WHERE resource_config_scope_id = $1 313 ) 314 315 UPDATE resource_config_versions 316 SET check_order = mc.co + 1 317 FROM max_checkorder mc 318 WHERE resource_config_scope_id = $1 319 AND version_md5 = md5($2) 320 AND check_order <= mc.co;`, rcsID, version) 321 return err 322} 323 324// The SELECT query orders the jobs for updating to prevent deadlocking. 325// Updating multiple rows using a SELECT subquery does not preserve the same 326// order for the updates, which can lead to deadlocking. 327func requestScheduleForJobsUsingResourceConfigScope(tx Tx, rcsID int) error { 328 rows, err := psql.Select("DISTINCT j.job_id"). 329 From("job_inputs j"). 330 Join("resources r ON r.id = j.resource_id"). 331 Where(sq.Eq{ 332 "r.resource_config_scope_id": rcsID, 333 "j.passed_job_id": nil, 334 }). 335 OrderBy("j.job_id DESC"). 336 RunWith(tx). 337 Query() 338 if err != nil { 339 return err 340 } 341 342 var jobIDs []int 343 for rows.Next() { 344 var id int 345 err = rows.Scan(&id) 346 if err != nil { 347 return err 348 } 349 350 jobIDs = append(jobIDs, id) 351 } 352 353 for _, jID := range jobIDs { 354 _, err := psql.Update("jobs"). 355 Set("schedule_requested", sq.Expr("now()")). 356 Where(sq.Eq{ 357 "id": jID, 358 }). 359 RunWith(tx). 360 Exec() 361 if err != nil { 362 return err 363 } 364 } 365 366 return nil 367} 368