1package worker 2 3import ( 4 "errors" 5 "fmt" 6 "time" 7 8 "code.cloudfoundry.org/clock" 9 "code.cloudfoundry.org/lager" 10 "github.com/concourse/baggageclaim" 11 "github.com/concourse/concourse/atc/db" 12 "github.com/concourse/concourse/atc/db/lock" 13 "github.com/concourse/concourse/atc/metric" 14) 15 16const creatingVolumeRetryDelay = 1 * time.Second 17 18//go:generate counterfeiter . VolumeClient 19 20type VolumeClient interface { 21 FindOrCreateVolumeForContainer( 22 lager.Logger, 23 VolumeSpec, 24 db.CreatingContainer, 25 int, 26 string, 27 ) (Volume, error) 28 FindOrCreateCOWVolumeForContainer( 29 lager.Logger, 30 VolumeSpec, 31 db.CreatingContainer, 32 Volume, 33 int, 34 string, 35 ) (Volume, error) 36 FindOrCreateVolumeForBaseResourceType( 37 lager.Logger, 38 VolumeSpec, 39 int, 40 string, 41 ) (Volume, error) 42 CreateVolume( 43 lager.Logger, 44 VolumeSpec, 45 int, 46 string, 47 db.VolumeType, 48 ) (Volume, error) 49 FindVolumeForResourceCache( 50 lager.Logger, 51 db.UsedResourceCache, 52 ) (Volume, bool, error) 53 FindVolumeForTaskCache( 54 logger lager.Logger, 55 teamID int, 56 jobID int, 57 stepName string, 58 path string, 59 ) (Volume, bool, error) 60 CreateVolumeForTaskCache( 61 logger lager.Logger, 62 volumeSpec VolumeSpec, 63 teamID int, 64 jobID int, 65 stepName string, 66 path string, 67 ) (Volume, error) 68 FindOrCreateVolumeForResourceCerts( 69 logger lager.Logger, 70 ) (volume Volume, found bool, err error) 71 72 LookupVolume(lager.Logger, string) (Volume, bool, error) 73} 74 75type VolumeSpec struct { 76 Strategy baggageclaim.Strategy 77 Properties VolumeProperties 78 Privileged bool 79 TTL time.Duration 80} 81 82func (spec VolumeSpec) baggageclaimVolumeSpec() baggageclaim.VolumeSpec { 83 return baggageclaim.VolumeSpec{ 84 Strategy: spec.Strategy, 85 Privileged: spec.Privileged, 86 Properties: baggageclaim.VolumeProperties(spec.Properties), 87 } 88} 89 90type VolumeProperties map[string]string 91 92type ErrCreatedVolumeNotFound struct { 93 Handle string 94 WorkerName string 95} 96 97func (e ErrCreatedVolumeNotFound) Error() string { 98 return fmt.Sprintf("volume '%s' disappeared from worker '%s'", e.Handle, e.WorkerName) 99} 100 101var ErrBaseResourceTypeNotFound = errors.New("base resource type not found") 102 103type volumeClient struct { 104 baggageclaimClient baggageclaim.Client 105 lockFactory lock.LockFactory 106 dbVolumeRepository db.VolumeRepository 107 dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory 108 dbTaskCacheFactory db.TaskCacheFactory 109 dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory 110 clock clock.Clock 111 dbWorker db.Worker 112} 113 114func NewVolumeClient( 115 baggageclaimClient baggageclaim.Client, 116 dbWorker db.Worker, 117 clock clock.Clock, 118 119 lockFactory lock.LockFactory, 120 dbVolumeRepository db.VolumeRepository, 121 dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory, 122 dbTaskCacheFactory db.TaskCacheFactory, 123 dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory, 124) VolumeClient { 125 return &volumeClient{ 126 baggageclaimClient: baggageclaimClient, 127 lockFactory: lockFactory, 128 dbVolumeRepository: dbVolumeRepository, 129 dbWorkerBaseResourceTypeFactory: dbWorkerBaseResourceTypeFactory, 130 dbTaskCacheFactory: dbTaskCacheFactory, 131 dbWorkerTaskCacheFactory: dbWorkerTaskCacheFactory, 132 clock: clock, 133 dbWorker: dbWorker, 134 } 135} 136 137func (c *volumeClient) FindOrCreateVolumeForContainer( 138 logger lager.Logger, 139 volumeSpec VolumeSpec, 140 container db.CreatingContainer, 141 teamID int, 142 mountPath string, 143) (Volume, error) { 144 return c.findOrCreateVolume( 145 logger.Session("find-or-create-volume-for-container"), 146 volumeSpec, 147 func() (db.CreatingVolume, db.CreatedVolume, error) { 148 return c.dbVolumeRepository.FindContainerVolume(teamID, c.dbWorker.Name(), container, mountPath) 149 }, 150 func() (db.CreatingVolume, error) { 151 return c.dbVolumeRepository.CreateContainerVolume(teamID, c.dbWorker.Name(), container, mountPath) 152 }, 153 ) 154} 155 156func (c *volumeClient) FindOrCreateCOWVolumeForContainer( 157 logger lager.Logger, 158 volumeSpec VolumeSpec, 159 container db.CreatingContainer, 160 parent Volume, 161 teamID int, 162 mountPath string, 163) (Volume, error) { 164 return c.findOrCreateVolume( 165 logger.Session("find-or-create-cow-volume-for-container"), 166 volumeSpec, 167 func() (db.CreatingVolume, db.CreatedVolume, error) { 168 return c.dbVolumeRepository.FindContainerVolume(teamID, c.dbWorker.Name(), container, mountPath) 169 }, 170 func() (db.CreatingVolume, error) { 171 return parent.CreateChildForContainer(container, mountPath) 172 }, 173 ) 174} 175 176func (c *volumeClient) CreateVolume( 177 logger lager.Logger, 178 volumeSpec VolumeSpec, 179 teamID int, 180 workerName string, 181 volumeType db.VolumeType, 182) (Volume, error) { 183 return c.findOrCreateVolume( 184 logger.Session("find-or-create-volume-for-artifact"), 185 volumeSpec, 186 func() (db.CreatingVolume, db.CreatedVolume, error) { 187 return nil, nil, nil 188 }, 189 func() (db.CreatingVolume, error) { 190 return c.dbVolumeRepository.CreateVolume(teamID, workerName, volumeType) 191 }, 192 ) 193} 194 195func (c *volumeClient) FindOrCreateVolumeForBaseResourceType( 196 logger lager.Logger, 197 volumeSpec VolumeSpec, 198 teamID int, 199 resourceTypeName string, 200) (Volume, error) { 201 workerBaseResourceType, found, err := c.dbWorkerBaseResourceTypeFactory.Find(resourceTypeName, c.dbWorker) 202 if err != nil { 203 return nil, err 204 } 205 206 if !found { 207 logger.Error("base-resource-type-not-found", ErrBaseResourceTypeNotFound, lager.Data{"resource-type-name": resourceTypeName}) 208 return nil, ErrBaseResourceTypeNotFound 209 } 210 211 return c.findOrCreateVolume( 212 logger.Session("find-or-create-volume-for-base-resource-type"), 213 volumeSpec, 214 func() (db.CreatingVolume, db.CreatedVolume, error) { 215 return c.dbVolumeRepository.FindBaseResourceTypeVolume(workerBaseResourceType) 216 }, 217 func() (db.CreatingVolume, error) { 218 return c.dbVolumeRepository.CreateBaseResourceTypeVolume(workerBaseResourceType) 219 }, 220 ) 221} 222 223func (c *volumeClient) FindVolumeForResourceCache( 224 logger lager.Logger, 225 usedResourceCache db.UsedResourceCache, 226) (Volume, bool, error) { 227 dbVolume, found, err := c.dbVolumeRepository.FindResourceCacheVolume(c.dbWorker.Name(), usedResourceCache) 228 if err != nil { 229 logger.Error("failed-to-lookup-resource-cache-volume-in-db", err) 230 return nil, false, err 231 } 232 233 if !found { 234 return nil, false, nil 235 } 236 237 bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, dbVolume.Handle()) 238 if err != nil { 239 logger.Error("failed-to-lookup-volume-in-bc", err) 240 return nil, false, err 241 } 242 243 if !found { 244 return nil, false, nil 245 } 246 247 return NewVolume(bcVolume, dbVolume, c), true, nil 248} 249 250func (c *volumeClient) CreateVolumeForTaskCache( 251 logger lager.Logger, 252 volumeSpec VolumeSpec, 253 teamID int, 254 jobID int, 255 stepName string, 256 path string, 257) (Volume, error) { 258 usedTaskCache, err := c.dbTaskCacheFactory.FindOrCreate(jobID, stepName, path) 259 if err != nil { 260 logger.Error("failed-to-find-or-create-task-cache-in-db", err) 261 return nil, err 262 } 263 264 workerTaskCache := db.WorkerTaskCache{ 265 WorkerName: c.dbWorker.Name(), 266 TaskCache: usedTaskCache, 267 } 268 269 usedWorkerTaskCache, err := c.dbWorkerTaskCacheFactory.FindOrCreate(workerTaskCache) 270 271 return c.findOrCreateVolume( 272 logger.Session("find-or-create-volume-for-container"), 273 volumeSpec, 274 func() (db.CreatingVolume, db.CreatedVolume, error) { 275 return nil, nil, nil 276 }, 277 func() (db.CreatingVolume, error) { 278 return c.dbVolumeRepository.CreateTaskCacheVolume(teamID, usedWorkerTaskCache) 279 }, 280 ) 281} 282 283func (c *volumeClient) FindOrCreateVolumeForResourceCerts(logger lager.Logger) (Volume, bool, error) { 284 285 logger.Debug("finding-worker-resource-certs") 286 usedResourceCerts, found, err := c.dbWorker.ResourceCerts() 287 if err != nil { 288 logger.Error("failed-to-find-worker-resource-certs", err) 289 return nil, false, err 290 } 291 292 if !found { 293 logger.Debug("worker-resource-certs-not-found") 294 return nil, false, nil 295 } 296 297 certsPath := c.dbWorker.CertsPath() 298 if certsPath == nil { 299 logger.Debug("worker-certs-path-is-empty") 300 return nil, false, nil 301 } 302 303 volume, err := c.findOrCreateVolume( 304 logger.Session("find-or-create-volume-for-resource-certs"), 305 VolumeSpec{ 306 Strategy: baggageclaim.ImportStrategy{ 307 Path: *certsPath, 308 FollowSymlinks: true, 309 }, 310 }, 311 func() (db.CreatingVolume, db.CreatedVolume, error) { 312 return c.dbVolumeRepository.FindResourceCertsVolume(c.dbWorker.Name(), usedResourceCerts) 313 }, 314 func() (db.CreatingVolume, error) { 315 return c.dbVolumeRepository.CreateResourceCertsVolume(c.dbWorker.Name(), usedResourceCerts) 316 }, 317 ) 318 319 return volume, true, err 320} 321 322func (c *volumeClient) FindVolumeForTaskCache( 323 logger lager.Logger, 324 teamID int, 325 jobID int, 326 stepName string, 327 path string, 328) (Volume, bool, error) { 329 usedTaskCache, found, err := c.dbTaskCacheFactory.Find(jobID, stepName, path) 330 if err != nil { 331 logger.Error("failed-to-lookup-task-cache-in-db", err) 332 return nil, false, err 333 } 334 335 if !found { 336 return nil, false, nil 337 } 338 339 dbVolume, found, err := c.dbVolumeRepository.FindTaskCacheVolume(teamID, c.dbWorker.Name(), usedTaskCache) 340 if err != nil { 341 logger.Error("failed-to-lookup-task-cache-volume-in-db", err) 342 return nil, false, err 343 } 344 345 if !found { 346 return nil, false, nil 347 } 348 349 bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, dbVolume.Handle()) 350 if err != nil { 351 logger.Error("failed-to-lookup-volume-in-bc", err) 352 return nil, false, err 353 } 354 355 if !found { 356 return nil, false, nil 357 } 358 359 return NewVolume(bcVolume, dbVolume, c), true, nil 360} 361 362func (c *volumeClient) LookupVolume(logger lager.Logger, handle string) (Volume, bool, error) { 363 dbVolume, found, err := c.dbVolumeRepository.FindCreatedVolume(handle) 364 if err != nil { 365 logger.Error("failed-to-lookup-volume-in-db", err) 366 return nil, false, err 367 } 368 369 if !found { 370 return nil, false, nil 371 } 372 373 bcVolume, found, err := c.baggageclaimClient.LookupVolume(logger, handle) 374 if err != nil { 375 logger.Error("failed-to-lookup-volume-in-bc", err) 376 return nil, false, err 377 } 378 379 if !found { 380 return nil, false, nil 381 } 382 383 return NewVolume(bcVolume, dbVolume, c), true, nil 384} 385 386func (c *volumeClient) findOrCreateVolume( 387 logger lager.Logger, 388 volumeSpec VolumeSpec, 389 findVolumeFunc func() (db.CreatingVolume, db.CreatedVolume, error), 390 createVolumeFunc func() (db.CreatingVolume, error), 391) (Volume, error) { 392 creatingVolume, createdVolume, err := findVolumeFunc() 393 if err != nil { 394 logger.Error("failed-to-find-volume-in-db", err) 395 return nil, err 396 } 397 398 if createdVolume != nil { 399 logger = logger.WithData(lager.Data{"volume": createdVolume.Handle()}) 400 401 bcVolume, bcVolumeFound, err := c.baggageclaimClient.LookupVolume( 402 logger.Session("lookup-volume"), 403 createdVolume.Handle(), 404 ) 405 if err != nil { 406 logger.Error("failed-to-lookup-volume-in-baggageclaim", err) 407 return nil, err 408 } 409 410 if !bcVolumeFound { 411 logger.Info("created-volume-not-found") 412 return nil, ErrCreatedVolumeNotFound{Handle: createdVolume.Handle(), WorkerName: createdVolume.WorkerName()} 413 } 414 415 logger.Debug("found-created-volume") 416 417 return NewVolume(bcVolume, createdVolume, c), nil 418 } 419 420 if creatingVolume != nil { 421 logger = logger.WithData(lager.Data{"volume": creatingVolume.Handle()}) 422 logger.Debug("found-creating-volume") 423 } else { 424 creatingVolume, err = createVolumeFunc() 425 if err != nil { 426 logger.Error("failed-to-create-volume-in-db", err) 427 return nil, err 428 } 429 430 logger = logger.WithData(lager.Data{"volume": creatingVolume.Handle()}) 431 432 logger.Debug("created-creating-volume") 433 } 434 435 lock, acquired, err := c.lockFactory.Acquire(logger, lock.NewVolumeCreatingLockID(creatingVolume.ID())) 436 if err != nil { 437 logger.Error("failed-to-acquire-volume-creating-lock", err) 438 return nil, err 439 } 440 441 if !acquired { 442 c.clock.Sleep(creatingVolumeRetryDelay) 443 return c.findOrCreateVolume(logger, volumeSpec, findVolumeFunc, createVolumeFunc) 444 } 445 446 defer lock.Release() 447 448 bcVolume, bcVolumeFound, err := c.baggageclaimClient.LookupVolume( 449 logger.Session("create-volume"), 450 creatingVolume.Handle(), 451 ) 452 if err != nil { 453 logger.Error("failed-to-lookup-volume-in-baggageclaim", err) 454 return nil, err 455 } 456 457 if bcVolumeFound { 458 logger.Debug("real-volume-exists") 459 } else { 460 logger.Debug("creating-real-volume") 461 462 bcVolume, err = c.baggageclaimClient.CreateVolume( 463 logger.Session("create-volume"), 464 creatingVolume.Handle(), 465 volumeSpec.baggageclaimVolumeSpec(), 466 ) 467 if err != nil { 468 logger.Error("failed-to-create-volume-in-baggageclaim", err) 469 470 _, failedErr := creatingVolume.Failed() 471 if failedErr != nil { 472 logger.Error("failed-to-mark-volume-as-failed", failedErr) 473 } 474 475 metric.Metrics.FailedVolumes.Inc() 476 477 return nil, err 478 } 479 480 metric.Metrics.VolumesCreated.Inc() 481 } 482 483 createdVolume, err = creatingVolume.Created() 484 if err != nil { 485 logger.Error("failed-to-initialize-volume", err) 486 return nil, err 487 } 488 489 logger.Debug("created") 490 491 return NewVolume(bcVolume, createdVolume, c), nil 492} 493