1package volume 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "os" 8 "path/filepath" 9 10 "code.cloudfoundry.org/lager" 11 "code.cloudfoundry.org/lager/lagerctx" 12 "github.com/concourse/baggageclaim/uidgid" 13) 14 15var ErrVolumeDoesNotExist = errors.New("volume does not exist") 16var ErrVolumeIsCorrupted = errors.New("volume is corrupted") 17var ErrUnsupportedStreamEncoding = errors.New("unsupported stream encoding") 18 19const GzipEncoding string = "gzip" 20const ZstdEncoding string = "zstd" 21 22//go:generate counterfeiter . Repository 23 24type Repository interface { 25 ListVolumes(ctx context.Context, queryProperties Properties) (Volumes, []string, error) 26 GetVolume(ctx context.Context, handle string) (Volume, bool, error) 27 CreateVolume(ctx context.Context, handle string, strategy Strategy, properties Properties, isPrivileged bool) (Volume, error) 28 DestroyVolume(ctx context.Context, handle string) error 29 DestroyVolumeAndDescendants(ctx context.Context, handle string) error 30 31 SetProperty(ctx context.Context, handle string, propertyName string, propertyValue string) error 32 GetPrivileged(ctx context.Context, handle string) (bool, error) 33 SetPrivileged(ctx context.Context, handle string, privileged bool) error 34 35 StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error) 36 StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error 37 38 VolumeParent(ctx context.Context, handle string) (Volume, bool, error) 39} 40 41type repository struct { 42 filesystem Filesystem 43 44 locker LockManager 45 46 gzipStreamer Streamer 47 zstdStreamer Streamer 48 namespacer func(bool) uidgid.Namespacer 49} 50 51func NewRepository( 52 filesystem Filesystem, 53 locker LockManager, 54 privilegedNamespacer uidgid.Namespacer, 55 unprivilegedNamespacer uidgid.Namespacer, 56) Repository { 57 return &repository{ 58 filesystem: filesystem, 59 locker: locker, 60 61 gzipStreamer: &tarGzipStreamer{ 62 namespacer: unprivilegedNamespacer, 63 }, 64 65 zstdStreamer: &tarZstdStreamer{ 66 namespacer: unprivilegedNamespacer, 67 }, 68 69 namespacer: func(privileged bool) uidgid.Namespacer { 70 if privileged { 71 return privilegedNamespacer 72 } else { 73 return unprivilegedNamespacer 74 } 75 }, 76 } 77} 78 79func (repo *repository) DestroyVolume(ctx context.Context, handle string) error { 80 repo.locker.Lock(handle) 81 defer repo.locker.Unlock(handle) 82 83 logger := lagerctx.FromContext(ctx).Session("destroy-volume", lager.Data{ 84 "volume": handle, 85 }) 86 87 volume, found, err := repo.filesystem.LookupVolume(handle) 88 if err != nil { 89 logger.Error("failed-to-lookup-volume", err) 90 return err 91 } 92 93 if !found { 94 logger.Info("volume-not-found") 95 return ErrVolumeDoesNotExist 96 } 97 98 err = volume.Destroy() 99 if err != nil { 100 logger.Error("failed-to-destroy", err) 101 return err 102 } 103 104 logger.Info("destroyed") 105 106 return nil 107} 108 109func (repo *repository) DestroyVolumeAndDescendants(ctx context.Context, handle string) error { 110 allVolumes, err := repo.filesystem.ListVolumes() 111 if err != nil { 112 return err 113 } 114 115 found := false 116 for _, candidate := range allVolumes { 117 if candidate.Handle() == handle { 118 found = true 119 } 120 } 121 if !found { 122 return ErrVolumeDoesNotExist 123 } 124 125 for _, candidate := range allVolumes { 126 candidateParent, found, err := candidate.Parent() 127 if err != nil { 128 continue 129 } 130 if !found { 131 continue 132 } 133 134 if candidateParent.Handle() == handle { 135 err = repo.DestroyVolumeAndDescendants(ctx, candidate.Handle()) 136 if err != nil { 137 return err 138 } 139 } 140 } 141 142 return repo.DestroyVolume(ctx, handle) 143} 144 145func (repo *repository) CreateVolume(ctx context.Context, handle string, strategy Strategy, properties Properties, isPrivileged bool) (Volume, error) { 146 logger := lagerctx.FromContext(ctx).Session("create-volume", lager.Data{"handle": handle}) 147 148 // only the import strategy uses the gzip streamer as, 149 // base resource type rootfs' are available locally as .tgz 150 initVolume, err := strategy.Materialize(logger, handle, repo.filesystem, repo.gzipStreamer) 151 if err != nil { 152 logger.Error("failed-to-materialize-strategy", err) 153 return Volume{}, err 154 } 155 156 var initialized bool 157 defer func() { 158 if !initialized { 159 initVolume.Destroy() 160 } 161 }() 162 163 err = initVolume.StoreProperties(properties) 164 if err != nil { 165 logger.Error("failed-to-set-properties", err) 166 return Volume{}, err 167 } 168 169 err = initVolume.StorePrivileged(isPrivileged) 170 if err != nil { 171 logger.Error("failed-to-set-privileged", err) 172 return Volume{}, err 173 } 174 175 err = repo.namespacer(isPrivileged).NamespacePath(logger, initVolume.DataPath()) 176 if err != nil { 177 logger.Error("failed-to-namespace-data", err) 178 return Volume{}, err 179 } 180 181 liveVolume, err := initVolume.Initialize() 182 if err != nil { 183 logger.Error("failed-to-initialize-volume", err) 184 return Volume{}, err 185 } 186 187 initialized = true 188 189 return Volume{ 190 Handle: liveVolume.Handle(), 191 Path: liveVolume.DataPath(), 192 Properties: properties, 193 }, nil 194} 195 196func (repo *repository) ListVolumes(ctx context.Context, queryProperties Properties) (Volumes, []string, error) { 197 logger := lagerctx.FromContext(ctx).Session("list-volumes") 198 199 liveVolumes, err := repo.filesystem.ListVolumes() 200 if err != nil { 201 logger.Error("failed-to-list-volumes", err) 202 return nil, nil, err 203 } 204 205 healthyVolumes := make(Volumes, 0, len(liveVolumes)) 206 corruptedVolumeHandles := []string{} 207 208 for _, liveVolume := range liveVolumes { 209 volume, err := repo.volumeFrom(liveVolume) 210 if err == ErrVolumeDoesNotExist { 211 continue 212 } 213 214 if err != nil { 215 corruptedVolumeHandles = append(corruptedVolumeHandles, liveVolume.Handle()) 216 logger.Error("failed-hydrating-volume", err) 217 continue 218 } 219 220 if volume.Properties.HasProperties(queryProperties) { 221 healthyVolumes = append(healthyVolumes, volume) 222 } 223 } 224 225 return healthyVolumes, corruptedVolumeHandles, nil 226} 227 228func (repo *repository) GetVolume(ctx context.Context, handle string) (Volume, bool, error) { 229 logger := lagerctx.FromContext(ctx).Session("get-volume", lager.Data{ 230 "volume": handle, 231 }) 232 233 liveVolume, found, err := repo.filesystem.LookupVolume(handle) 234 if err != nil { 235 logger.Error("failed-to-lookup-volume", err) 236 return Volume{}, false, err 237 } 238 239 if !found { 240 logger.Info("volume-not-found") 241 return Volume{}, false, nil 242 } 243 244 volume, err := repo.volumeFrom(liveVolume) 245 if err == ErrVolumeDoesNotExist { 246 return Volume{}, false, nil 247 } 248 249 if err != nil { 250 logger.Error("failed-to-hydrate-volume", err) 251 return Volume{}, false, err 252 } 253 254 return volume, true, nil 255} 256 257func (repo *repository) SetProperty(ctx context.Context, handle string, propertyName string, propertyValue string) error { 258 repo.locker.Lock(handle) 259 defer repo.locker.Unlock(handle) 260 261 logger := lagerctx.FromContext(ctx).Session("set-property", lager.Data{ 262 "volume": handle, 263 "property": propertyName, 264 }) 265 266 volume, found, err := repo.filesystem.LookupVolume(handle) 267 if err != nil { 268 logger.Error("failed-to-lookup-volume", err) 269 return err 270 } 271 272 if !found { 273 logger.Info("volume-not-found") 274 return ErrVolumeDoesNotExist 275 } 276 277 properties, err := volume.LoadProperties() 278 if err != nil { 279 logger.Error("failed-to-read-properties", err, lager.Data{ 280 "volume": handle, 281 }) 282 return err 283 } 284 285 properties = properties.UpdateProperty(propertyName, propertyValue) 286 287 err = volume.StoreProperties(properties) 288 if err != nil { 289 logger.Error("failed-to-store-properties", err) 290 return err 291 } 292 293 return nil 294} 295 296func (repo *repository) GetPrivileged(ctx context.Context, handle string) (bool, error) { 297 repo.locker.Lock(handle) 298 defer repo.locker.Unlock(handle) 299 300 logger := lagerctx.FromContext(ctx).Session("get-privileged", lager.Data{ 301 "volume": handle, 302 }) 303 304 volume, found, err := repo.filesystem.LookupVolume(handle) 305 if err != nil { 306 logger.Error("failed-to-lookup-volume", err) 307 return false, err 308 } 309 310 if !found { 311 logger.Info("volume-not-found") 312 return false, ErrVolumeDoesNotExist 313 } 314 315 privileged, err := volume.LoadPrivileged() 316 if err != nil { 317 logger.Error("failed-to-load-privileged", err) 318 return false, err 319 } 320 321 return privileged, nil 322} 323 324func (repo *repository) SetPrivileged(ctx context.Context, handle string, privileged bool) error { 325 repo.locker.Lock(handle) 326 defer repo.locker.Unlock(handle) 327 328 logger := lagerctx.FromContext(ctx).Session("set-privileged", lager.Data{ 329 "volume": handle, 330 }) 331 332 volume, found, err := repo.filesystem.LookupVolume(handle) 333 if err != nil { 334 logger.Error("failed-to-lookup-volume", err) 335 return err 336 } 337 338 if !found { 339 logger.Info("volume-not-found") 340 return ErrVolumeDoesNotExist 341 } 342 343 err = repo.namespacer(privileged).NamespacePath(logger, volume.DataPath()) 344 if err != nil { 345 logger.Error("failed-to-namespace-volume", err) 346 return err 347 } 348 349 err = volume.StorePrivileged(privileged) 350 if err != nil { 351 logger.Error("failed-to-store-privileged", err) 352 return err 353 } 354 355 return nil 356} 357 358func (repo *repository) StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error) { 359 logger := lagerctx.FromContext(ctx).Session("stream-in", lager.Data{ 360 "volume": handle, 361 "sub-path": path, 362 }) 363 364 volume, found, err := repo.filesystem.LookupVolume(handle) 365 if err != nil { 366 logger.Error("failed-to-lookup-volume", err) 367 return false, err 368 } 369 370 if !found { 371 logger.Info("volume-not-found") 372 return false, ErrVolumeDoesNotExist 373 } 374 375 destinationPath := filepath.Join(volume.DataPath(), path) 376 377 logger = logger.WithData(lager.Data{ 378 "full-path": destinationPath, 379 }) 380 381 err = os.MkdirAll(destinationPath, 0755) 382 if err != nil { 383 logger.Error("failed-to-create-destination-path", err) 384 return false, err 385 } 386 387 privileged, err := volume.LoadPrivileged() 388 if err != nil { 389 logger.Error("failed-to-check-if-volume-is-privileged", err) 390 return false, err 391 } 392 393 err = repo.namespacer(privileged).NamespacePath(logger, volume.DataPath()) 394 if err != nil { 395 logger.Error("failed-to-namespace-path", err) 396 return false, err 397 } 398 399 switch encoding { 400 case ZstdEncoding: 401 return repo.zstdStreamer.In(stream, destinationPath, privileged) 402 case GzipEncoding: 403 return repo.gzipStreamer.In(stream, destinationPath, privileged) 404 } 405 406 return false, ErrUnsupportedStreamEncoding 407} 408 409func (repo *repository) StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error { 410 logger := lagerctx.FromContext(ctx).Session("stream-in", lager.Data{ 411 "volume": handle, 412 "sub-path": path, 413 }) 414 415 volume, found, err := repo.filesystem.LookupVolume(handle) 416 if err != nil { 417 logger.Error("failed-to-lookup-volume", err) 418 return err 419 } 420 421 if !found { 422 logger.Info("volume-not-found") 423 return ErrVolumeDoesNotExist 424 } 425 426 srcPath := filepath.Join(volume.DataPath(), path) 427 428 logger = logger.WithData(lager.Data{ 429 "full-path": srcPath, 430 }) 431 432 isPrivileged, err := volume.LoadPrivileged() 433 if err != nil { 434 logger.Error("failed-to-check-if-volume-is-privileged", err) 435 return err 436 } 437 438 switch encoding { 439 case ZstdEncoding: 440 return repo.zstdStreamer.Out(dest, srcPath, isPrivileged) 441 case GzipEncoding: 442 return repo.gzipStreamer.Out(dest, srcPath, isPrivileged) 443 } 444 445 return ErrUnsupportedStreamEncoding 446} 447 448func (repo *repository) VolumeParent(ctx context.Context, handle string) (Volume, bool, error) { 449 logger := lagerctx.FromContext(ctx).Session("volume-parent") 450 451 liveVolume, found, err := repo.filesystem.LookupVolume(handle) 452 if err != nil { 453 logger.Error("failed-to-lookup-volume", err) 454 return Volume{}, false, err 455 } 456 457 if !found { 458 logger.Info("volume-not-found") 459 return Volume{}, false, ErrVolumeDoesNotExist 460 } 461 462 parentVolume, found, err := liveVolume.Parent() 463 if err != nil { 464 logger.Error("failed-to-get-parent-volume", err) 465 return Volume{}, false, err 466 } 467 468 if !found { 469 return Volume{}, false, nil 470 } 471 472 volume, err := repo.volumeFrom(parentVolume) 473 if err != nil { 474 logger.Error("failed-to-hydrate-parent-volume", err) 475 return Volume{}, true, ErrVolumeIsCorrupted 476 } 477 478 return volume, true, nil 479} 480 481func (repo *repository) volumeFrom(liveVolume FilesystemLiveVolume) (Volume, error) { 482 properties, err := liveVolume.LoadProperties() 483 if err != nil { 484 return Volume{}, err 485 } 486 487 isPrivileged, err := liveVolume.LoadPrivileged() 488 if err != nil { 489 return Volume{}, err 490 } 491 492 return Volume{ 493 Handle: liveVolume.Handle(), 494 Path: liveVolume.DataPath(), 495 Properties: properties, 496 Privileged: isPrivileged, 497 }, nil 498} 499