1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package local
18
19import (
20	"context"
21	"fmt"
22	"io"
23	"io/ioutil"
24	"math/rand"
25	"os"
26	"path/filepath"
27	"strconv"
28	"strings"
29	"sync"
30	"time"
31
32	"github.com/containerd/containerd/content"
33	"github.com/containerd/containerd/errdefs"
34	"github.com/containerd/containerd/filters"
35	"github.com/containerd/containerd/log"
36
37	"github.com/containerd/continuity"
38	digest "github.com/opencontainers/go-digest"
39	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
40	"github.com/pkg/errors"
41)
42
43var bufPool = sync.Pool{
44	New: func() interface{} {
45		buffer := make([]byte, 1<<20)
46		return &buffer
47	},
48}
49
50// LabelStore is used to store mutable labels for digests
51type LabelStore interface {
52	// Get returns all the labels for the given digest
53	Get(digest.Digest) (map[string]string, error)
54
55	// Set sets all the labels for a given digest
56	Set(digest.Digest, map[string]string) error
57
58	// Update replaces the given labels for a digest,
59	// a key with an empty value removes a label.
60	Update(digest.Digest, map[string]string) (map[string]string, error)
61}
62
63// Store is digest-keyed store for content. All data written into the store is
64// stored under a verifiable digest.
65//
66// Store can generally support multi-reader, single-writer ingest of data,
67// including resumable ingest.
68type store struct {
69	root string
70	ls   LabelStore
71}
72
73// NewStore returns a local content store
74func NewStore(root string) (content.Store, error) {
75	return NewLabeledStore(root, nil)
76}
77
78// NewLabeledStore returns a new content store using the provided label store
79//
80// Note: content stores which are used underneath a metadata store may not
81// require labels and should use `NewStore`. `NewLabeledStore` is primarily
82// useful for tests or standalone implementations.
83func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
84	if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
85		return nil, err
86	}
87
88	return &store{
89		root: root,
90		ls:   ls,
91	}, nil
92}
93
94func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
95	p := s.blobPath(dgst)
96	fi, err := os.Stat(p)
97	if err != nil {
98		if os.IsNotExist(err) {
99			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
100		}
101
102		return content.Info{}, err
103	}
104	var labels map[string]string
105	if s.ls != nil {
106		labels, err = s.ls.Get(dgst)
107		if err != nil {
108			return content.Info{}, err
109		}
110	}
111	return s.info(dgst, fi, labels), nil
112}
113
114func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
115	return content.Info{
116		Digest:    dgst,
117		Size:      fi.Size(),
118		CreatedAt: fi.ModTime(),
119		UpdatedAt: getATime(fi),
120		Labels:    labels,
121	}
122}
123
124// ReaderAt returns an io.ReaderAt for the blob.
125func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
126	p := s.blobPath(desc.Digest)
127	fi, err := os.Stat(p)
128	if err != nil {
129		if !os.IsNotExist(err) {
130			return nil, err
131		}
132
133		return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
134	}
135
136	fp, err := os.Open(p)
137	if err != nil {
138		if !os.IsNotExist(err) {
139			return nil, err
140		}
141
142		return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", desc.Digest, p)
143	}
144
145	return sizeReaderAt{size: fi.Size(), fp: fp}, nil
146}
147
148// Delete removes a blob by its digest.
149//
150// While this is safe to do concurrently, safe exist-removal logic must hold
151// some global lock on the store.
152func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
153	if err := os.RemoveAll(s.blobPath(dgst)); err != nil {
154		if !os.IsNotExist(err) {
155			return err
156		}
157
158		return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
159	}
160
161	return nil
162}
163
164func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
165	if s.ls == nil {
166		return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
167	}
168
169	p := s.blobPath(info.Digest)
170	fi, err := os.Stat(p)
171	if err != nil {
172		if os.IsNotExist(err) {
173			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest)
174		}
175
176		return content.Info{}, err
177	}
178
179	var (
180		all    bool
181		labels map[string]string
182	)
183	if len(fieldpaths) > 0 {
184		for _, path := range fieldpaths {
185			if strings.HasPrefix(path, "labels.") {
186				if labels == nil {
187					labels = map[string]string{}
188				}
189
190				key := strings.TrimPrefix(path, "labels.")
191				labels[key] = info.Labels[key]
192				continue
193			}
194
195			switch path {
196			case "labels":
197				all = true
198				labels = info.Labels
199			default:
200				return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
201			}
202		}
203	} else {
204		all = true
205		labels = info.Labels
206	}
207
208	if all {
209		err = s.ls.Set(info.Digest, labels)
210	} else {
211		labels, err = s.ls.Update(info.Digest, labels)
212	}
213	if err != nil {
214		return content.Info{}, err
215	}
216
217	info = s.info(info.Digest, fi, labels)
218	info.UpdatedAt = time.Now()
219
220	if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
221		log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
222	}
223
224	return info, nil
225}
226
227func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
228	// TODO: Support filters
229	root := filepath.Join(s.root, "blobs")
230	var alg digest.Algorithm
231	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
232		if err != nil {
233			return err
234		}
235		if !fi.IsDir() && !alg.Available() {
236			return nil
237		}
238
239		// TODO(stevvooe): There are few more cases with subdirs that should be
240		// handled in case the layout gets corrupted. This isn't strict enough
241		// and may spew bad data.
242
243		if path == root {
244			return nil
245		}
246		if filepath.Dir(path) == root {
247			alg = digest.Algorithm(filepath.Base(path))
248
249			if !alg.Available() {
250				alg = ""
251				return filepath.SkipDir
252			}
253
254			// descending into a hash directory
255			return nil
256		}
257
258		dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
259		if err := dgst.Validate(); err != nil {
260			// log error but don't report
261			log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
262			// if we see this, it could mean some sort of corruption of the
263			// store or extra paths not expected previously.
264		}
265
266		var labels map[string]string
267		if s.ls != nil {
268			labels, err = s.ls.Get(dgst)
269			if err != nil {
270				return err
271			}
272		}
273		return fn(s.info(dgst, fi, labels))
274	})
275}
276
277func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
278	return s.status(s.ingestRoot(ref))
279}
280
281func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
282	fp, err := os.Open(filepath.Join(s.root, "ingest"))
283	if err != nil {
284		return nil, err
285	}
286
287	defer fp.Close()
288
289	fis, err := fp.Readdir(-1)
290	if err != nil {
291		return nil, err
292	}
293
294	filter, err := filters.ParseAll(fs...)
295	if err != nil {
296		return nil, err
297	}
298
299	var active []content.Status
300	for _, fi := range fis {
301		p := filepath.Join(s.root, "ingest", fi.Name())
302		stat, err := s.status(p)
303		if err != nil {
304			if !os.IsNotExist(err) {
305				return nil, err
306			}
307
308			// TODO(stevvooe): This is a common error if uploads are being
309			// completed while making this listing. Need to consider taking a
310			// lock on the whole store to coordinate this aspect.
311			//
312			// Another option is to cleanup downloads asynchronously and
313			// coordinate this method with the cleanup process.
314			//
315			// For now, we just skip them, as they really don't exist.
316			continue
317		}
318
319		if filter.Match(adaptStatus(stat)) {
320			active = append(active, stat)
321		}
322	}
323
324	return active, nil
325}
326
327// WalkStatusRefs is used to walk all status references
328// Failed status reads will be logged and ignored, if
329// this function is called while references are being altered,
330// these error messages may be produced.
331func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
332	fp, err := os.Open(filepath.Join(s.root, "ingest"))
333	if err != nil {
334		return err
335	}
336
337	defer fp.Close()
338
339	fis, err := fp.Readdir(-1)
340	if err != nil {
341		return err
342	}
343
344	for _, fi := range fis {
345		rf := filepath.Join(s.root, "ingest", fi.Name(), "ref")
346
347		ref, err := readFileString(rf)
348		if err != nil {
349			log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
350			continue
351		}
352
353		if err := fn(ref); err != nil {
354			return err
355		}
356	}
357
358	return nil
359}
360
361// status works like stat above except uses the path to the ingest.
362func (s *store) status(ingestPath string) (content.Status, error) {
363	dp := filepath.Join(ingestPath, "data")
364	fi, err := os.Stat(dp)
365	if err != nil {
366		if os.IsNotExist(err) {
367			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
368		}
369		return content.Status{}, err
370	}
371
372	ref, err := readFileString(filepath.Join(ingestPath, "ref"))
373	if err != nil {
374		if os.IsNotExist(err) {
375			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
376		}
377		return content.Status{}, err
378	}
379
380	startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
381	if err != nil {
382		return content.Status{}, errors.Wrapf(err, "could not read startedat")
383	}
384
385	updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
386	if err != nil {
387		return content.Status{}, errors.Wrapf(err, "could not read updatedat")
388	}
389
390	// because we don't write updatedat on every write, the mod time may
391	// actually be more up to date.
392	if fi.ModTime().After(updatedAt) {
393		updatedAt = fi.ModTime()
394	}
395
396	return content.Status{
397		Ref:       ref,
398		Offset:    fi.Size(),
399		Total:     s.total(ingestPath),
400		UpdatedAt: updatedAt,
401		StartedAt: startedAt,
402	}, nil
403}
404
405func adaptStatus(status content.Status) filters.Adaptor {
406	return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
407		if len(fieldpath) == 0 {
408			return "", false
409		}
410		switch fieldpath[0] {
411		case "ref":
412			return status.Ref, true
413		}
414
415		return "", false
416	})
417}
418
419// total attempts to resolve the total expected size for the write.
420func (s *store) total(ingestPath string) int64 {
421	totalS, err := readFileString(filepath.Join(ingestPath, "total"))
422	if err != nil {
423		return 0
424	}
425
426	total, err := strconv.ParseInt(totalS, 10, 64)
427	if err != nil {
428		// represents a corrupted file, should probably remove.
429		return 0
430	}
431
432	return total
433}
434
435// Writer begins or resumes the active writer identified by ref. If the writer
436// is already in use, an error is returned. Only one writer may be in use per
437// ref at a time.
438//
439// The argument `ref` is used to uniquely identify a long-lived writer transaction.
440func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
441	var wOpts content.WriterOpts
442	for _, opt := range opts {
443		if err := opt(&wOpts); err != nil {
444			return nil, err
445		}
446	}
447	// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
448	// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
449	if wOpts.Ref == "" {
450		return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
451	}
452	var lockErr error
453	for count := uint64(0); count < 10; count++ {
454		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
455		if err := tryLock(wOpts.Ref); err != nil {
456			if !errdefs.IsUnavailable(err) {
457				return nil, err
458			}
459
460			lockErr = err
461		} else {
462			lockErr = nil
463			break
464		}
465	}
466
467	if lockErr != nil {
468		return nil, lockErr
469	}
470
471	w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
472	if err != nil {
473		unlock(wOpts.Ref)
474		return nil, err
475	}
476
477	return w, nil // lock is now held by w.
478}
479
480// writer provides the main implementation of the Writer method. The caller
481// must hold the lock correctly and release on error if there is a problem.
482func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
483	// TODO(stevvooe): Need to actually store expected here. We have
484	// code in the service that shouldn't be dealing with this.
485	if expected != "" {
486		p := s.blobPath(expected)
487		if _, err := os.Stat(p); err == nil {
488			return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
489		}
490	}
491
492	path, refp, data := s.ingestPaths(ref)
493
494	var (
495		digester  = digest.Canonical.Digester()
496		offset    int64
497		startedAt time.Time
498		updatedAt time.Time
499	)
500
501	// ensure that the ingest path has been created.
502	if err := os.Mkdir(path, 0755); err != nil {
503		if !os.IsExist(err) {
504			return nil, err
505		}
506
507		status, err := s.status(path)
508		if err != nil {
509			return nil, errors.Wrap(err, "failed reading status of resume write")
510		}
511
512		if ref != status.Ref {
513			// NOTE(stevvooe): This is fairly catastrophic. Either we have some
514			// layout corruption or a hash collision for the ref key.
515			return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
516		}
517
518		if total > 0 && status.Total > 0 && total != status.Total {
519			return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
520		}
521
522		// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
523		fp, err := os.Open(data)
524		if err != nil {
525			return nil, err
526		}
527
528		p := bufPool.Get().(*[]byte)
529		offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
530		bufPool.Put(p)
531		fp.Close()
532		if err != nil {
533			return nil, err
534		}
535
536		updatedAt = status.UpdatedAt
537		startedAt = status.StartedAt
538		total = status.Total
539	} else {
540		startedAt = time.Now()
541		updatedAt = startedAt
542
543		// the ingest is new, we need to setup the target location.
544		// write the ref to a file for later use
545		if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
546			return nil, err
547		}
548
549		if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
550			return nil, err
551		}
552
553		if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
554			return nil, err
555		}
556
557		if total > 0 {
558			if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
559				return nil, err
560			}
561		}
562	}
563
564	fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
565	if err != nil {
566		return nil, errors.Wrap(err, "failed to open data file")
567	}
568
569	if _, err := fp.Seek(offset, io.SeekStart); err != nil {
570		return nil, errors.Wrap(err, "could not seek to current write offset")
571	}
572
573	return &writer{
574		s:         s,
575		fp:        fp,
576		ref:       ref,
577		path:      path,
578		offset:    offset,
579		total:     total,
580		digester:  digester,
581		startedAt: startedAt,
582		updatedAt: updatedAt,
583	}, nil
584}
585
586// Abort an active transaction keyed by ref. If the ingest is active, it will
587// be cancelled. Any resources associated with the ingest will be cleaned.
588func (s *store) Abort(ctx context.Context, ref string) error {
589	root := s.ingestRoot(ref)
590	if err := os.RemoveAll(root); err != nil {
591		if os.IsNotExist(err) {
592			return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref)
593		}
594
595		return err
596	}
597
598	return nil
599}
600
601func (s *store) blobPath(dgst digest.Digest) string {
602	return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
603}
604
605func (s *store) ingestRoot(ref string) string {
606	dgst := digest.FromString(ref)
607	return filepath.Join(s.root, "ingest", dgst.Hex())
608}
609
610// ingestPaths are returned. The paths are the following:
611//
612// - root: entire ingest directory
613// - ref: name of the starting ref, must be unique
614// - data: file where data is written
615//
616func (s *store) ingestPaths(ref string) (string, string, string) {
617	var (
618		fp = s.ingestRoot(ref)
619		rp = filepath.Join(fp, "ref")
620		dp = filepath.Join(fp, "data")
621	)
622
623	return fp, rp, dp
624}
625
626func readFileString(path string) (string, error) {
627	p, err := ioutil.ReadFile(path)
628	return string(p), err
629}
630
631// readFileTimestamp reads a file with just a timestamp present.
632func readFileTimestamp(p string) (time.Time, error) {
633	b, err := ioutil.ReadFile(p)
634	if err != nil {
635		if os.IsNotExist(err) {
636			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
637		}
638		return time.Time{}, err
639	}
640
641	var t time.Time
642	if err := t.UnmarshalText(b); err != nil {
643		return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p)
644	}
645
646	return t, nil
647}
648
649func writeTimestampFile(p string, t time.Time) error {
650	b, err := t.MarshalText()
651	if err != nil {
652		return err
653	}
654
655	return continuity.AtomicWriteFile(p, b, 0666)
656}
657