1package db 2 3import ( 4 "database/sql" 5 "errors" 6 "fmt" 7 "strings" 8 "time" 9 10 sq "github.com/Masterminds/squirrel" 11 "github.com/concourse/concourse/atc" 12 "github.com/lib/pq" 13 uuid "github.com/nu7hatch/gouuid" 14) 15 16var ( 17 ErrWorkerNotPresent = errors.New("worker not present in db") 18 ErrCannotPruneRunningWorker = errors.New("worker not stalled for pruning") 19) 20 21type ContainerOwnerDisappearedError struct { 22 owner ContainerOwner 23} 24 25func (e ContainerOwnerDisappearedError) Error() string { 26 return fmt.Sprintf("container owner %T disappeared", e.owner) 27} 28 29type WorkerState string 30 31const ( 32 WorkerStateRunning = WorkerState("running") 33 WorkerStateStalled = WorkerState("stalled") 34 WorkerStateLanding = WorkerState("landing") 35 WorkerStateLanded = WorkerState("landed") 36 WorkerStateRetiring = WorkerState("retiring") 37) 38 39func AllWorkerStates() []WorkerState { 40 return []WorkerState{ 41 WorkerStateRunning, 42 WorkerStateStalled, 43 WorkerStateLanding, 44 WorkerStateLanded, 45 WorkerStateRetiring, 46 } 47} 48 49//go:generate counterfeiter . Worker 50 51type Worker interface { 52 Name() string 53 Version() *string 54 State() WorkerState 55 GardenAddr() *string 56 BaggageclaimURL() *string 57 CertsPath() *string 58 ResourceCerts() (*UsedWorkerResourceCerts, bool, error) 59 HTTPProxyURL() string 60 HTTPSProxyURL() string 61 NoProxy() string 62 ActiveContainers() int 63 ActiveVolumes() int 64 ResourceTypes() []atc.WorkerResourceType 65 Platform() string 66 Tags() []string 67 TeamID() int 68 TeamName() string 69 StartTime() time.Time 70 ExpiresAt() time.Time 71 Ephemeral() bool 72 73 Reload() (bool, error) 74 75 Land() error 76 Retire() error 77 Prune() error 78 Delete() error 79 80 ActiveTasks() (int, error) 81 IncreaseActiveTasks() error 82 DecreaseActiveTasks() error 83 84 FindContainer(owner ContainerOwner) (CreatingContainer, CreatedContainer, error) 85 CreateContainer(owner ContainerOwner, meta ContainerMetadata) (CreatingContainer, error) 86} 87 88type worker struct { 89 conn Conn 90 91 name string 92 version *string 93 state WorkerState 94 gardenAddr *string 95 baggageclaimURL *string 96 httpProxyURL string 97 httpsProxyURL string 98 noProxy string 99 activeContainers int 100 activeVolumes int 101 activeTasks int 102 resourceTypes []atc.WorkerResourceType 103 platform string 104 tags []string 105 teamID int 106 teamName string 107 startTime time.Time 108 expiresAt time.Time 109 certsPath *string 110 ephemeral bool 111} 112 113func (worker *worker) Name() string { return worker.name } 114func (worker *worker) Version() *string { return worker.version } 115func (worker *worker) State() WorkerState { return worker.state } 116func (worker *worker) GardenAddr() *string { return worker.gardenAddr } 117func (worker *worker) CertsPath() *string { return worker.certsPath } 118func (worker *worker) BaggageclaimURL() *string { return worker.baggageclaimURL } 119 120func (worker *worker) HTTPProxyURL() string { return worker.httpProxyURL } 121func (worker *worker) HTTPSProxyURL() string { return worker.httpsProxyURL } 122func (worker *worker) NoProxy() string { return worker.noProxy } 123func (worker *worker) ActiveContainers() int { return worker.activeContainers } 124func (worker *worker) ActiveVolumes() int { return worker.activeVolumes } 125func (worker *worker) ResourceTypes() []atc.WorkerResourceType { return worker.resourceTypes } 126func (worker *worker) Platform() string { return worker.platform } 127func (worker *worker) Tags() []string { return worker.tags } 128func (worker *worker) TeamID() int { return worker.teamID } 129func (worker *worker) TeamName() string { return worker.teamName } 130func (worker *worker) Ephemeral() bool { return worker.ephemeral } 131 132func (worker *worker) StartTime() time.Time { return worker.startTime } 133func (worker *worker) ExpiresAt() time.Time { return worker.expiresAt } 134 135func (worker *worker) Reload() (bool, error) { 136 row := workersQuery.Where(sq.Eq{"w.name": worker.name}). 137 RunWith(worker.conn). 138 QueryRow() 139 140 err := scanWorker(worker, row) 141 if err != nil { 142 if err == sql.ErrNoRows { 143 return false, nil 144 } 145 return false, err 146 } 147 148 return true, nil 149} 150 151func (worker *worker) Land() error { 152 cSQL, _, err := sq.Case("state"). 153 When("'landed'::worker_state", "'landed'::worker_state"). 154 Else("'landing'::worker_state"). 155 ToSql() 156 if err != nil { 157 return err 158 } 159 160 result, err := psql.Update("workers"). 161 Set("state", sq.Expr("("+cSQL+")")). 162 Where(sq.Eq{"name": worker.name}). 163 RunWith(worker.conn). 164 Exec() 165 166 if err != nil { 167 return err 168 } 169 170 count, err := result.RowsAffected() 171 if err != nil { 172 return err 173 } 174 175 if count == 0 { 176 return ErrWorkerNotPresent 177 } 178 179 return nil 180} 181 182func (worker *worker) Retire() error { 183 result, err := psql.Update("workers"). 184 SetMap(map[string]interface{}{ 185 "state": string(WorkerStateRetiring), 186 }). 187 Where(sq.Eq{"name": worker.name}). 188 RunWith(worker.conn). 189 Exec() 190 if err != nil { 191 return err 192 } 193 194 count, err := result.RowsAffected() 195 if err != nil { 196 return err 197 } 198 199 if count == 0 { 200 return ErrWorkerNotPresent 201 } 202 203 return nil 204} 205 206func (worker *worker) Prune() error { 207 tx, err := worker.conn.Begin() 208 if err != nil { 209 return err 210 } 211 212 defer Rollback(tx) 213 214 rows, err := sq.Delete("workers"). 215 Where(sq.Eq{ 216 "name": worker.name, 217 }). 218 Where(sq.NotEq{ 219 "state": string(WorkerStateRunning), 220 }). 221 PlaceholderFormat(sq.Dollar). 222 RunWith(tx). 223 Exec() 224 225 if err != nil { 226 return err 227 } 228 229 affected, err := rows.RowsAffected() 230 if err != nil { 231 return err 232 } 233 234 if affected == 0 { 235 //check whether the worker exists in the database at all 236 var one int 237 err := psql.Select("1").From("workers").Where(sq.Eq{"name": worker.name}). 238 RunWith(tx). 239 QueryRow(). 240 Scan(&one) 241 if err != nil { 242 if err == sql.ErrNoRows { 243 return ErrWorkerNotPresent 244 } 245 return err 246 } 247 248 return ErrCannotPruneRunningWorker 249 } 250 251 return tx.Commit() 252} 253 254func (worker *worker) Delete() error { 255 _, err := sq.Delete("workers"). 256 Where(sq.Eq{ 257 "name": worker.name, 258 }). 259 PlaceholderFormat(sq.Dollar). 260 RunWith(worker.conn). 261 Exec() 262 263 return err 264} 265 266func (worker *worker) ResourceCerts() (*UsedWorkerResourceCerts, bool, error) { 267 if worker.certsPath != nil { 268 wrc := &WorkerResourceCerts{ 269 WorkerName: worker.name, 270 CertsPath: *worker.certsPath, 271 } 272 273 return wrc.Find(worker.conn) 274 } 275 276 return nil, false, nil 277} 278 279func (worker *worker) FindContainer(owner ContainerOwner) (CreatingContainer, CreatedContainer, error) { 280 ownerQuery, found, err := owner.Find(worker.conn) 281 if err != nil { 282 return nil, nil, err 283 } 284 285 if !found { 286 return nil, nil, nil 287 } 288 289 return worker.findContainer(sq.And{ 290 sq.Eq{"worker_name": worker.name}, 291 ownerQuery, 292 }) 293} 294 295func (worker *worker) CreateContainer(owner ContainerOwner, meta ContainerMetadata) (CreatingContainer, error) { 296 handle, err := uuid.NewV4() 297 if err != nil { 298 return nil, err 299 } 300 301 var containerID int 302 cols := []interface{}{&containerID} 303 304 metadata := &ContainerMetadata{} 305 cols = append(cols, metadata.ScanTargets()...) 306 307 tx, err := worker.conn.Begin() 308 if err != nil { 309 return nil, err 310 } 311 312 defer Rollback(tx) 313 314 insMap := meta.SQLMap() 315 insMap["worker_name"] = worker.name 316 insMap["handle"] = handle.String() 317 318 ownerCols, err := owner.Create(tx, worker.name) 319 if err != nil { 320 return nil, err 321 } 322 323 for k, v := range ownerCols { 324 insMap[k] = v 325 } 326 327 err = psql.Insert("containers"). 328 SetMap(insMap). 329 Suffix("RETURNING id, " + strings.Join(containerMetadataColumns, ", ")). 330 RunWith(tx). 331 QueryRow(). 332 Scan(cols...) 333 if err != nil { 334 if pqErr, ok := err.(*pq.Error); ok && pqErr.Code.Name() == pqFKeyViolationErrCode { 335 return nil, ContainerOwnerDisappearedError{owner} 336 } 337 338 return nil, err 339 } 340 341 err = tx.Commit() 342 if err != nil { 343 return nil, err 344 } 345 346 return newCreatingContainer( 347 containerID, 348 handle.String(), 349 worker.name, 350 *metadata, 351 worker.conn, 352 ), nil 353} 354 355func (worker *worker) findContainer(whereClause sq.Sqlizer) (CreatingContainer, CreatedContainer, error) { 356 creating, created, destroying, _, err := scanContainer( 357 selectContainers(). 358 Where(whereClause). 359 RunWith(worker.conn). 360 QueryRow(), 361 worker.conn, 362 ) 363 if err != nil { 364 if err == sql.ErrNoRows { 365 return nil, nil, nil 366 } 367 return nil, nil, err 368 } 369 370 if destroying != nil { 371 return nil, nil, nil 372 } 373 374 return creating, created, nil 375} 376 377func (worker *worker) ActiveTasks() (int, error) { 378 err := psql.Select("active_tasks").From("workers").Where(sq.Eq{"name": worker.name}). 379 RunWith(worker.conn). 380 QueryRow(). 381 Scan(&worker.activeTasks) 382 if err != nil { 383 return 0, err 384 } 385 return worker.activeTasks, nil 386} 387 388func (worker *worker) IncreaseActiveTasks() error { 389 result, err := psql.Update("workers"). 390 Set("active_tasks", sq.Expr("active_tasks+1")). 391 Where(sq.Eq{"name": worker.name}). 392 RunWith(worker.conn). 393 Exec() 394 if err != nil { 395 return err 396 } 397 398 count, err := result.RowsAffected() 399 if err != nil { 400 return err 401 } 402 403 if count == 0 { 404 return ErrWorkerNotPresent 405 } 406 407 return nil 408} 409 410func (worker *worker) DecreaseActiveTasks() error { 411 result, err := psql.Update("workers"). 412 Set("active_tasks", sq.Expr("active_tasks-1")). 413 Where(sq.Eq{"name": worker.name}). 414 RunWith(worker.conn). 415 Exec() 416 if err != nil { 417 return err 418 } 419 420 count, err := result.RowsAffected() 421 if err != nil { 422 return err 423 } 424 425 if count == 0 { 426 return ErrWorkerNotPresent 427 } 428 429 return nil 430} 431