1package volume
2
3import (
4	"context"
5	"errors"
6	"io"
7	"os"
8	"path/filepath"
9
10	"code.cloudfoundry.org/lager"
11	"code.cloudfoundry.org/lager/lagerctx"
12	"github.com/concourse/baggageclaim/uidgid"
13)
14
15var ErrVolumeDoesNotExist = errors.New("volume does not exist")
16var ErrVolumeIsCorrupted = errors.New("volume is corrupted")
17var ErrUnsupportedStreamEncoding = errors.New("unsupported stream encoding")
18
19const GzipEncoding string = "gzip"
20const ZstdEncoding string = "zstd"
21
22//go:generate counterfeiter . Repository
23
24type Repository interface {
25	ListVolumes(ctx context.Context, queryProperties Properties) (Volumes, []string, error)
26	GetVolume(ctx context.Context, handle string) (Volume, bool, error)
27	CreateVolume(ctx context.Context, handle string, strategy Strategy, properties Properties, isPrivileged bool) (Volume, error)
28	DestroyVolume(ctx context.Context, handle string) error
29	DestroyVolumeAndDescendants(ctx context.Context, handle string) error
30
31	SetProperty(ctx context.Context, handle string, propertyName string, propertyValue string) error
32	GetPrivileged(ctx context.Context, handle string) (bool, error)
33	SetPrivileged(ctx context.Context, handle string, privileged bool) error
34
35	StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error)
36	StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error
37
38	VolumeParent(ctx context.Context, handle string) (Volume, bool, error)
39}
40
41type repository struct {
42	filesystem Filesystem
43
44	locker LockManager
45
46	gzipStreamer Streamer
47	zstdStreamer Streamer
48	namespacer   func(bool) uidgid.Namespacer
49}
50
51func NewRepository(
52	filesystem Filesystem,
53	locker LockManager,
54	privilegedNamespacer uidgid.Namespacer,
55	unprivilegedNamespacer uidgid.Namespacer,
56) Repository {
57	return &repository{
58		filesystem: filesystem,
59		locker:     locker,
60
61		gzipStreamer: &tarGzipStreamer{
62			namespacer: unprivilegedNamespacer,
63		},
64
65		zstdStreamer: &tarZstdStreamer{
66			namespacer: unprivilegedNamespacer,
67		},
68
69		namespacer: func(privileged bool) uidgid.Namespacer {
70			if privileged {
71				return privilegedNamespacer
72			} else {
73				return unprivilegedNamespacer
74			}
75		},
76	}
77}
78
79func (repo *repository) DestroyVolume(ctx context.Context, handle string) error {
80	repo.locker.Lock(handle)
81	defer repo.locker.Unlock(handle)
82
83	logger := lagerctx.FromContext(ctx).Session("destroy-volume", lager.Data{
84		"volume": handle,
85	})
86
87	volume, found, err := repo.filesystem.LookupVolume(handle)
88	if err != nil {
89		logger.Error("failed-to-lookup-volume", err)
90		return err
91	}
92
93	if !found {
94		logger.Info("volume-not-found")
95		return ErrVolumeDoesNotExist
96	}
97
98	err = volume.Destroy()
99	if err != nil {
100		logger.Error("failed-to-destroy", err)
101		return err
102	}
103
104	logger.Info("destroyed")
105
106	return nil
107}
108
109func (repo *repository) DestroyVolumeAndDescendants(ctx context.Context, handle string) error {
110	allVolumes, err := repo.filesystem.ListVolumes()
111	if err != nil {
112		return err
113	}
114
115	found := false
116	for _, candidate := range allVolumes {
117		if candidate.Handle() == handle {
118			found = true
119		}
120	}
121	if !found {
122		return ErrVolumeDoesNotExist
123	}
124
125	for _, candidate := range allVolumes {
126		candidateParent, found, err := candidate.Parent()
127		if err != nil {
128			continue
129		}
130		if !found {
131			continue
132		}
133
134		if candidateParent.Handle() == handle {
135			err = repo.DestroyVolumeAndDescendants(ctx, candidate.Handle())
136			if err != nil {
137				return err
138			}
139		}
140	}
141
142	return repo.DestroyVolume(ctx, handle)
143}
144
145func (repo *repository) CreateVolume(ctx context.Context, handle string, strategy Strategy, properties Properties, isPrivileged bool) (Volume, error) {
146	logger := lagerctx.FromContext(ctx).Session("create-volume", lager.Data{"handle": handle})
147
148	// only the import strategy uses the gzip streamer as,
149	// base resource type rootfs' are available locally as .tgz
150	initVolume, err := strategy.Materialize(logger, handle, repo.filesystem, repo.gzipStreamer)
151	if err != nil {
152		logger.Error("failed-to-materialize-strategy", err)
153		return Volume{}, err
154	}
155
156	var initialized bool
157	defer func() {
158		if !initialized {
159			initVolume.Destroy()
160		}
161	}()
162
163	err = initVolume.StoreProperties(properties)
164	if err != nil {
165		logger.Error("failed-to-set-properties", err)
166		return Volume{}, err
167	}
168
169	err = initVolume.StorePrivileged(isPrivileged)
170	if err != nil {
171		logger.Error("failed-to-set-privileged", err)
172		return Volume{}, err
173	}
174
175	err = repo.namespacer(isPrivileged).NamespacePath(logger, initVolume.DataPath())
176	if err != nil {
177		logger.Error("failed-to-namespace-data", err)
178		return Volume{}, err
179	}
180
181	liveVolume, err := initVolume.Initialize()
182	if err != nil {
183		logger.Error("failed-to-initialize-volume", err)
184		return Volume{}, err
185	}
186
187	initialized = true
188
189	return Volume{
190		Handle:     liveVolume.Handle(),
191		Path:       liveVolume.DataPath(),
192		Properties: properties,
193	}, nil
194}
195
196func (repo *repository) ListVolumes(ctx context.Context, queryProperties Properties) (Volumes, []string, error) {
197	logger := lagerctx.FromContext(ctx).Session("list-volumes")
198
199	liveVolumes, err := repo.filesystem.ListVolumes()
200	if err != nil {
201		logger.Error("failed-to-list-volumes", err)
202		return nil, nil, err
203	}
204
205	healthyVolumes := make(Volumes, 0, len(liveVolumes))
206	corruptedVolumeHandles := []string{}
207
208	for _, liveVolume := range liveVolumes {
209		volume, err := repo.volumeFrom(liveVolume)
210		if err == ErrVolumeDoesNotExist {
211			continue
212		}
213
214		if err != nil {
215			corruptedVolumeHandles = append(corruptedVolumeHandles, liveVolume.Handle())
216			logger.Error("failed-hydrating-volume", err)
217			continue
218		}
219
220		if volume.Properties.HasProperties(queryProperties) {
221			healthyVolumes = append(healthyVolumes, volume)
222		}
223	}
224
225	return healthyVolumes, corruptedVolumeHandles, nil
226}
227
228func (repo *repository) GetVolume(ctx context.Context, handle string) (Volume, bool, error) {
229	logger := lagerctx.FromContext(ctx).Session("get-volume", lager.Data{
230		"volume": handle,
231	})
232
233	liveVolume, found, err := repo.filesystem.LookupVolume(handle)
234	if err != nil {
235		logger.Error("failed-to-lookup-volume", err)
236		return Volume{}, false, err
237	}
238
239	if !found {
240		logger.Info("volume-not-found")
241		return Volume{}, false, nil
242	}
243
244	volume, err := repo.volumeFrom(liveVolume)
245	if err == ErrVolumeDoesNotExist {
246		return Volume{}, false, nil
247	}
248
249	if err != nil {
250		logger.Error("failed-to-hydrate-volume", err)
251		return Volume{}, false, err
252	}
253
254	return volume, true, nil
255}
256
257func (repo *repository) SetProperty(ctx context.Context, handle string, propertyName string, propertyValue string) error {
258	repo.locker.Lock(handle)
259	defer repo.locker.Unlock(handle)
260
261	logger := lagerctx.FromContext(ctx).Session("set-property", lager.Data{
262		"volume":   handle,
263		"property": propertyName,
264	})
265
266	volume, found, err := repo.filesystem.LookupVolume(handle)
267	if err != nil {
268		logger.Error("failed-to-lookup-volume", err)
269		return err
270	}
271
272	if !found {
273		logger.Info("volume-not-found")
274		return ErrVolumeDoesNotExist
275	}
276
277	properties, err := volume.LoadProperties()
278	if err != nil {
279		logger.Error("failed-to-read-properties", err, lager.Data{
280			"volume": handle,
281		})
282		return err
283	}
284
285	properties = properties.UpdateProperty(propertyName, propertyValue)
286
287	err = volume.StoreProperties(properties)
288	if err != nil {
289		logger.Error("failed-to-store-properties", err)
290		return err
291	}
292
293	return nil
294}
295
296func (repo *repository) GetPrivileged(ctx context.Context, handle string) (bool, error) {
297	repo.locker.Lock(handle)
298	defer repo.locker.Unlock(handle)
299
300	logger := lagerctx.FromContext(ctx).Session("get-privileged", lager.Data{
301		"volume": handle,
302	})
303
304	volume, found, err := repo.filesystem.LookupVolume(handle)
305	if err != nil {
306		logger.Error("failed-to-lookup-volume", err)
307		return false, err
308	}
309
310	if !found {
311		logger.Info("volume-not-found")
312		return false, ErrVolumeDoesNotExist
313	}
314
315	privileged, err := volume.LoadPrivileged()
316	if err != nil {
317		logger.Error("failed-to-load-privileged", err)
318		return false, err
319	}
320
321	return privileged, nil
322}
323
324func (repo *repository) SetPrivileged(ctx context.Context, handle string, privileged bool) error {
325	repo.locker.Lock(handle)
326	defer repo.locker.Unlock(handle)
327
328	logger := lagerctx.FromContext(ctx).Session("set-privileged", lager.Data{
329		"volume": handle,
330	})
331
332	volume, found, err := repo.filesystem.LookupVolume(handle)
333	if err != nil {
334		logger.Error("failed-to-lookup-volume", err)
335		return err
336	}
337
338	if !found {
339		logger.Info("volume-not-found")
340		return ErrVolumeDoesNotExist
341	}
342
343	err = repo.namespacer(privileged).NamespacePath(logger, volume.DataPath())
344	if err != nil {
345		logger.Error("failed-to-namespace-volume", err)
346		return err
347	}
348
349	err = volume.StorePrivileged(privileged)
350	if err != nil {
351		logger.Error("failed-to-store-privileged", err)
352		return err
353	}
354
355	return nil
356}
357
358func (repo *repository) StreamIn(ctx context.Context, handle string, path string, encoding string, stream io.Reader) (bool, error) {
359	logger := lagerctx.FromContext(ctx).Session("stream-in", lager.Data{
360		"volume":   handle,
361		"sub-path": path,
362	})
363
364	volume, found, err := repo.filesystem.LookupVolume(handle)
365	if err != nil {
366		logger.Error("failed-to-lookup-volume", err)
367		return false, err
368	}
369
370	if !found {
371		logger.Info("volume-not-found")
372		return false, ErrVolumeDoesNotExist
373	}
374
375	destinationPath := filepath.Join(volume.DataPath(), path)
376
377	logger = logger.WithData(lager.Data{
378		"full-path": destinationPath,
379	})
380
381	err = os.MkdirAll(destinationPath, 0755)
382	if err != nil {
383		logger.Error("failed-to-create-destination-path", err)
384		return false, err
385	}
386
387	privileged, err := volume.LoadPrivileged()
388	if err != nil {
389		logger.Error("failed-to-check-if-volume-is-privileged", err)
390		return false, err
391	}
392
393	err = repo.namespacer(privileged).NamespacePath(logger, volume.DataPath())
394	if err != nil {
395		logger.Error("failed-to-namespace-path", err)
396		return false, err
397	}
398
399	switch encoding {
400	case ZstdEncoding:
401		return repo.zstdStreamer.In(stream, destinationPath, privileged)
402	case GzipEncoding:
403		return repo.gzipStreamer.In(stream, destinationPath, privileged)
404	}
405
406	return false, ErrUnsupportedStreamEncoding
407}
408
409func (repo *repository) StreamOut(ctx context.Context, handle string, path string, encoding string, dest io.Writer) error {
410	logger := lagerctx.FromContext(ctx).Session("stream-in", lager.Data{
411		"volume":   handle,
412		"sub-path": path,
413	})
414
415	volume, found, err := repo.filesystem.LookupVolume(handle)
416	if err != nil {
417		logger.Error("failed-to-lookup-volume", err)
418		return err
419	}
420
421	if !found {
422		logger.Info("volume-not-found")
423		return ErrVolumeDoesNotExist
424	}
425
426	srcPath := filepath.Join(volume.DataPath(), path)
427
428	logger = logger.WithData(lager.Data{
429		"full-path": srcPath,
430	})
431
432	isPrivileged, err := volume.LoadPrivileged()
433	if err != nil {
434		logger.Error("failed-to-check-if-volume-is-privileged", err)
435		return err
436	}
437
438	switch encoding {
439	case ZstdEncoding:
440		return repo.zstdStreamer.Out(dest, srcPath, isPrivileged)
441	case GzipEncoding:
442		return repo.gzipStreamer.Out(dest, srcPath, isPrivileged)
443	}
444
445	return ErrUnsupportedStreamEncoding
446}
447
448func (repo *repository) VolumeParent(ctx context.Context, handle string) (Volume, bool, error) {
449	logger := lagerctx.FromContext(ctx).Session("volume-parent")
450
451	liveVolume, found, err := repo.filesystem.LookupVolume(handle)
452	if err != nil {
453		logger.Error("failed-to-lookup-volume", err)
454		return Volume{}, false, err
455	}
456
457	if !found {
458		logger.Info("volume-not-found")
459		return Volume{}, false, ErrVolumeDoesNotExist
460	}
461
462	parentVolume, found, err := liveVolume.Parent()
463	if err != nil {
464		logger.Error("failed-to-get-parent-volume", err)
465		return Volume{}, false, err
466	}
467
468	if !found {
469		return Volume{}, false, nil
470	}
471
472	volume, err := repo.volumeFrom(parentVolume)
473	if err != nil {
474		logger.Error("failed-to-hydrate-parent-volume", err)
475		return Volume{}, true, ErrVolumeIsCorrupted
476	}
477
478	return volume, true, nil
479}
480
481func (repo *repository) volumeFrom(liveVolume FilesystemLiveVolume) (Volume, error) {
482	properties, err := liveVolume.LoadProperties()
483	if err != nil {
484		return Volume{}, err
485	}
486
487	isPrivileged, err := liveVolume.LoadPrivileged()
488	if err != nil {
489		return Volume{}, err
490	}
491
492	return Volume{
493		Handle:     liveVolume.Handle(),
494		Path:       liveVolume.DataPath(),
495		Properties: properties,
496		Privileged: isPrivileged,
497	}, nil
498}
499