1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package local
18
19import (
20	"context"
21	"fmt"
22	"io"
23	"io/ioutil"
24	"math/rand"
25	"os"
26	"path/filepath"
27	"strconv"
28	"strings"
29	"sync"
30	"time"
31
32	"github.com/containerd/containerd/content"
33	"github.com/containerd/containerd/errdefs"
34	"github.com/containerd/containerd/filters"
35	"github.com/containerd/containerd/log"
36	"github.com/sirupsen/logrus"
37
38	digest "github.com/opencontainers/go-digest"
39	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
40	"github.com/pkg/errors"
41)
42
43var bufPool = sync.Pool{
44	New: func() interface{} {
45		buffer := make([]byte, 1<<20)
46		return &buffer
47	},
48}
49
50// LabelStore is used to store mutable labels for digests
51type LabelStore interface {
52	// Get returns all the labels for the given digest
53	Get(digest.Digest) (map[string]string, error)
54
55	// Set sets all the labels for a given digest
56	Set(digest.Digest, map[string]string) error
57
58	// Update replaces the given labels for a digest,
59	// a key with an empty value removes a label.
60	Update(digest.Digest, map[string]string) (map[string]string, error)
61}
62
63// Store is digest-keyed store for content. All data written into the store is
64// stored under a verifiable digest.
65//
66// Store can generally support multi-reader, single-writer ingest of data,
67// including resumable ingest.
68type store struct {
69	root string
70	ls   LabelStore
71}
72
73// NewStore returns a local content store
74func NewStore(root string) (content.Store, error) {
75	return NewLabeledStore(root, nil)
76}
77
78// NewLabeledStore returns a new content store using the provided label store
79//
80// Note: content stores which are used underneath a metadata store may not
81// require labels and should use `NewStore`. `NewLabeledStore` is primarily
82// useful for tests or standalone implementations.
83func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
84	if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
85		return nil, err
86	}
87
88	return &store{
89		root: root,
90		ls:   ls,
91	}, nil
92}
93
94func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
95	p, err := s.blobPath(dgst)
96	if err != nil {
97		return content.Info{}, errors.Wrapf(err, "calculating blob info path")
98	}
99
100	fi, err := os.Stat(p)
101	if err != nil {
102		if os.IsNotExist(err) {
103			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
104		}
105
106		return content.Info{}, err
107	}
108	var labels map[string]string
109	if s.ls != nil {
110		labels, err = s.ls.Get(dgst)
111		if err != nil {
112			return content.Info{}, err
113		}
114	}
115	return s.info(dgst, fi, labels), nil
116}
117
118func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
119	return content.Info{
120		Digest:    dgst,
121		Size:      fi.Size(),
122		CreatedAt: fi.ModTime(),
123		UpdatedAt: getATime(fi),
124		Labels:    labels,
125	}
126}
127
128// ReaderAt returns an io.ReaderAt for the blob.
129func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {
130	p, err := s.blobPath(desc.Digest)
131	if err != nil {
132		return nil, errors.Wrapf(err, "calculating blob path for ReaderAt")
133	}
134
135	reader, err := OpenReader(p)
136	if err != nil {
137		return nil, errors.Wrapf(err, "blob %s expected at %s", desc.Digest, p)
138	}
139
140	return reader, nil
141}
142
143// Delete removes a blob by its digest.
144//
145// While this is safe to do concurrently, safe exist-removal logic must hold
146// some global lock on the store.
147func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
148	bp, err := s.blobPath(dgst)
149	if err != nil {
150		return errors.Wrapf(err, "calculating blob path for delete")
151	}
152
153	if err := os.RemoveAll(bp); err != nil {
154		if !os.IsNotExist(err) {
155			return err
156		}
157
158		return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
159	}
160
161	return nil
162}
163
164func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
165	if s.ls == nil {
166		return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
167	}
168
169	p, err := s.blobPath(info.Digest)
170	if err != nil {
171		return content.Info{}, errors.Wrapf(err, "calculating blob path for update")
172	}
173
174	fi, err := os.Stat(p)
175	if err != nil {
176		if os.IsNotExist(err) {
177			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest)
178		}
179
180		return content.Info{}, err
181	}
182
183	var (
184		all    bool
185		labels map[string]string
186	)
187	if len(fieldpaths) > 0 {
188		for _, path := range fieldpaths {
189			if strings.HasPrefix(path, "labels.") {
190				if labels == nil {
191					labels = map[string]string{}
192				}
193
194				key := strings.TrimPrefix(path, "labels.")
195				labels[key] = info.Labels[key]
196				continue
197			}
198
199			switch path {
200			case "labels":
201				all = true
202				labels = info.Labels
203			default:
204				return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
205			}
206		}
207	} else {
208		all = true
209		labels = info.Labels
210	}
211
212	if all {
213		err = s.ls.Set(info.Digest, labels)
214	} else {
215		labels, err = s.ls.Update(info.Digest, labels)
216	}
217	if err != nil {
218		return content.Info{}, err
219	}
220
221	info = s.info(info.Digest, fi, labels)
222	info.UpdatedAt = time.Now()
223
224	if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
225		log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
226	}
227
228	return info, nil
229}
230
231func (s *store) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
232	root := filepath.Join(s.root, "blobs")
233
234	filter, err := filters.ParseAll(fs...)
235	if err != nil {
236		return err
237	}
238
239	var alg digest.Algorithm
240	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
241		if err != nil {
242			return err
243		}
244		if !fi.IsDir() && !alg.Available() {
245			return nil
246		}
247
248		// TODO(stevvooe): There are few more cases with subdirs that should be
249		// handled in case the layout gets corrupted. This isn't strict enough
250		// and may spew bad data.
251
252		if path == root {
253			return nil
254		}
255		if filepath.Dir(path) == root {
256			alg = digest.Algorithm(filepath.Base(path))
257
258			if !alg.Available() {
259				alg = ""
260				return filepath.SkipDir
261			}
262
263			// descending into a hash directory
264			return nil
265		}
266
267		dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
268		if err := dgst.Validate(); err != nil {
269			// log error but don't report
270			log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
271			// if we see this, it could mean some sort of corruption of the
272			// store or extra paths not expected previously.
273		}
274
275		var labels map[string]string
276		if s.ls != nil {
277			labels, err = s.ls.Get(dgst)
278			if err != nil {
279				return err
280			}
281		}
282
283		info := s.info(dgst, fi, labels)
284		if !filter.Match(content.AdaptInfo(info)) {
285			return nil
286		}
287		return fn(info)
288	})
289}
290
291func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
292	return s.status(s.ingestRoot(ref))
293}
294
295func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
296	fp, err := os.Open(filepath.Join(s.root, "ingest"))
297	if err != nil {
298		return nil, err
299	}
300
301	defer fp.Close()
302
303	fis, err := fp.Readdir(-1)
304	if err != nil {
305		return nil, err
306	}
307
308	filter, err := filters.ParseAll(fs...)
309	if err != nil {
310		return nil, err
311	}
312
313	var active []content.Status
314	for _, fi := range fis {
315		p := filepath.Join(s.root, "ingest", fi.Name())
316		stat, err := s.status(p)
317		if err != nil {
318			if !os.IsNotExist(err) {
319				return nil, err
320			}
321
322			// TODO(stevvooe): This is a common error if uploads are being
323			// completed while making this listing. Need to consider taking a
324			// lock on the whole store to coordinate this aspect.
325			//
326			// Another option is to cleanup downloads asynchronously and
327			// coordinate this method with the cleanup process.
328			//
329			// For now, we just skip them, as they really don't exist.
330			continue
331		}
332
333		if filter.Match(adaptStatus(stat)) {
334			active = append(active, stat)
335		}
336	}
337
338	return active, nil
339}
340
341// WalkStatusRefs is used to walk all status references
342// Failed status reads will be logged and ignored, if
343// this function is called while references are being altered,
344// these error messages may be produced.
345func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
346	fp, err := os.Open(filepath.Join(s.root, "ingest"))
347	if err != nil {
348		return err
349	}
350
351	defer fp.Close()
352
353	fis, err := fp.Readdir(-1)
354	if err != nil {
355		return err
356	}
357
358	for _, fi := range fis {
359		rf := filepath.Join(s.root, "ingest", fi.Name(), "ref")
360
361		ref, err := readFileString(rf)
362		if err != nil {
363			log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
364			continue
365		}
366
367		if err := fn(ref); err != nil {
368			return err
369		}
370	}
371
372	return nil
373}
374
375// status works like stat above except uses the path to the ingest.
376func (s *store) status(ingestPath string) (content.Status, error) {
377	dp := filepath.Join(ingestPath, "data")
378	fi, err := os.Stat(dp)
379	if err != nil {
380		if os.IsNotExist(err) {
381			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
382		}
383		return content.Status{}, err
384	}
385
386	ref, err := readFileString(filepath.Join(ingestPath, "ref"))
387	if err != nil {
388		if os.IsNotExist(err) {
389			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
390		}
391		return content.Status{}, err
392	}
393
394	startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
395	if err != nil {
396		return content.Status{}, errors.Wrapf(err, "could not read startedat")
397	}
398
399	updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
400	if err != nil {
401		return content.Status{}, errors.Wrapf(err, "could not read updatedat")
402	}
403
404	// because we don't write updatedat on every write, the mod time may
405	// actually be more up to date.
406	if fi.ModTime().After(updatedAt) {
407		updatedAt = fi.ModTime()
408	}
409
410	return content.Status{
411		Ref:       ref,
412		Offset:    fi.Size(),
413		Total:     s.total(ingestPath),
414		UpdatedAt: updatedAt,
415		StartedAt: startedAt,
416	}, nil
417}
418
419func adaptStatus(status content.Status) filters.Adaptor {
420	return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
421		if len(fieldpath) == 0 {
422			return "", false
423		}
424		switch fieldpath[0] {
425		case "ref":
426			return status.Ref, true
427		}
428
429		return "", false
430	})
431}
432
433// total attempts to resolve the total expected size for the write.
434func (s *store) total(ingestPath string) int64 {
435	totalS, err := readFileString(filepath.Join(ingestPath, "total"))
436	if err != nil {
437		return 0
438	}
439
440	total, err := strconv.ParseInt(totalS, 10, 64)
441	if err != nil {
442		// represents a corrupted file, should probably remove.
443		return 0
444	}
445
446	return total
447}
448
449// Writer begins or resumes the active writer identified by ref. If the writer
450// is already in use, an error is returned. Only one writer may be in use per
451// ref at a time.
452//
453// The argument `ref` is used to uniquely identify a long-lived writer transaction.
454func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
455	var wOpts content.WriterOpts
456	for _, opt := range opts {
457		if err := opt(&wOpts); err != nil {
458			return nil, err
459		}
460	}
461	// TODO(AkihiroSuda): we could create a random string or one calculated based on the context
462	// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019
463	if wOpts.Ref == "" {
464		return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
465	}
466	var lockErr error
467	for count := uint64(0); count < 10; count++ {
468		if err := tryLock(wOpts.Ref); err != nil {
469			if !errdefs.IsUnavailable(err) {
470				return nil, err
471			}
472
473			lockErr = err
474		} else {
475			lockErr = nil
476			break
477		}
478		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
479	}
480
481	if lockErr != nil {
482		return nil, lockErr
483	}
484
485	w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)
486	if err != nil {
487		unlock(wOpts.Ref)
488		return nil, err
489	}
490
491	return w, nil // lock is now held by w.
492}
493
494func (s *store) resumeStatus(ref string, total int64, digester digest.Digester) (content.Status, error) {
495	path, _, data := s.ingestPaths(ref)
496	status, err := s.status(path)
497	if err != nil {
498		return status, errors.Wrap(err, "failed reading status of resume write")
499	}
500	if ref != status.Ref {
501		// NOTE(stevvooe): This is fairly catastrophic. Either we have some
502		// layout corruption or a hash collision for the ref key.
503		return status, errors.Errorf("ref key does not match: %v != %v", ref, status.Ref)
504	}
505
506	if total > 0 && status.Total > 0 && total != status.Total {
507		return status, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
508	}
509
510	// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
511	fp, err := os.Open(data)
512	if err != nil {
513		return status, err
514	}
515
516	p := bufPool.Get().(*[]byte)
517	status.Offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
518	bufPool.Put(p)
519	fp.Close()
520	return status, err
521}
522
523// writer provides the main implementation of the Writer method. The caller
524// must hold the lock correctly and release on error if there is a problem.
525func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
526	// TODO(stevvooe): Need to actually store expected here. We have
527	// code in the service that shouldn't be dealing with this.
528	if expected != "" {
529		p, err := s.blobPath(expected)
530		if err != nil {
531			return nil, errors.Wrap(err, "calculating expected blob path for writer")
532		}
533		if _, err := os.Stat(p); err == nil {
534			return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
535		}
536	}
537
538	path, refp, data := s.ingestPaths(ref)
539
540	var (
541		digester  = digest.Canonical.Digester()
542		offset    int64
543		startedAt time.Time
544		updatedAt time.Time
545	)
546
547	foundValidIngest := false
548	// ensure that the ingest path has been created.
549	if err := os.Mkdir(path, 0755); err != nil {
550		if !os.IsExist(err) {
551			return nil, err
552		}
553		status, err := s.resumeStatus(ref, total, digester)
554		if err == nil {
555			foundValidIngest = true
556			updatedAt = status.UpdatedAt
557			startedAt = status.StartedAt
558			total = status.Total
559			offset = status.Offset
560		} else {
561			logrus.Infof("failed to resume the status from path %s: %s. will recreate them", path, err.Error())
562		}
563	}
564
565	if !foundValidIngest {
566		startedAt = time.Now()
567		updatedAt = startedAt
568
569		// the ingest is new, we need to setup the target location.
570		// write the ref to a file for later use
571		if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
572			return nil, err
573		}
574
575		if err := writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
576			return nil, err
577		}
578
579		if err := writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
580			return nil, err
581		}
582
583		if total > 0 {
584			if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
585				return nil, err
586			}
587		}
588	}
589
590	fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
591	if err != nil {
592		return nil, errors.Wrap(err, "failed to open data file")
593	}
594
595	if _, err := fp.Seek(offset, io.SeekStart); err != nil {
596		return nil, errors.Wrap(err, "could not seek to current write offset")
597	}
598
599	return &writer{
600		s:         s,
601		fp:        fp,
602		ref:       ref,
603		path:      path,
604		offset:    offset,
605		total:     total,
606		digester:  digester,
607		startedAt: startedAt,
608		updatedAt: updatedAt,
609	}, nil
610}
611
612// Abort an active transaction keyed by ref. If the ingest is active, it will
613// be cancelled. Any resources associated with the ingest will be cleaned.
614func (s *store) Abort(ctx context.Context, ref string) error {
615	root := s.ingestRoot(ref)
616	if err := os.RemoveAll(root); err != nil {
617		if os.IsNotExist(err) {
618			return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref)
619		}
620
621		return err
622	}
623
624	return nil
625}
626
627func (s *store) blobPath(dgst digest.Digest) (string, error) {
628	if err := dgst.Validate(); err != nil {
629		return "", errors.Wrapf(errdefs.ErrInvalidArgument, "cannot calculate blob path from invalid digest: %v", err)
630	}
631
632	return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex()), nil
633}
634
635func (s *store) ingestRoot(ref string) string {
636	// we take a digest of the ref to keep the ingest paths constant length.
637	// Note that this is not the current or potential digest of incoming content.
638	dgst := digest.FromString(ref)
639	return filepath.Join(s.root, "ingest", dgst.Hex())
640}
641
642// ingestPaths are returned. The paths are the following:
643//
644// - root: entire ingest directory
645// - ref: name of the starting ref, must be unique
646// - data: file where data is written
647//
648func (s *store) ingestPaths(ref string) (string, string, string) {
649	var (
650		fp = s.ingestRoot(ref)
651		rp = filepath.Join(fp, "ref")
652		dp = filepath.Join(fp, "data")
653	)
654
655	return fp, rp, dp
656}
657
658func readFileString(path string) (string, error) {
659	p, err := ioutil.ReadFile(path)
660	return string(p), err
661}
662
663// readFileTimestamp reads a file with just a timestamp present.
664func readFileTimestamp(p string) (time.Time, error) {
665	b, err := ioutil.ReadFile(p)
666	if err != nil {
667		if os.IsNotExist(err) {
668			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
669		}
670		return time.Time{}, err
671	}
672
673	var t time.Time
674	if err := t.UnmarshalText(b); err != nil {
675		return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p)
676	}
677
678	return t, nil
679}
680
681func writeTimestampFile(p string, t time.Time) error {
682	b, err := t.MarshalText()
683	if err != nil {
684		return err
685	}
686	return atomicWrite(p, b, 0666)
687}
688
689func atomicWrite(path string, data []byte, mode os.FileMode) error {
690	tmp := fmt.Sprintf("%s.tmp", path)
691	f, err := os.OpenFile(tmp, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, mode)
692	if err != nil {
693		return errors.Wrap(err, "create tmp file")
694	}
695	_, err = f.Write(data)
696	f.Close()
697	if err != nil {
698		return errors.Wrap(err, "write atomic data")
699	}
700	return os.Rename(tmp, path)
701}
702