1package local
2
3import (
4	"context"
5	"fmt"
6	"io"
7	"io/ioutil"
8	"math/rand"
9	"os"
10	"path/filepath"
11	"strconv"
12	"strings"
13	"sync"
14	"time"
15
16	"github.com/containerd/containerd/content"
17	"github.com/containerd/containerd/errdefs"
18	"github.com/containerd/containerd/filters"
19	"github.com/containerd/containerd/log"
20	digest "github.com/opencontainers/go-digest"
21	"github.com/pkg/errors"
22)
23
24var bufPool = sync.Pool{
25	New: func() interface{} {
26		buffer := make([]byte, 1<<20)
27		return &buffer
28	},
29}
30
31// LabelStore is used to store mutable labels for digests
32type LabelStore interface {
33	// Get returns all the labels for the given digest
34	Get(digest.Digest) (map[string]string, error)
35
36	// Set sets all the labels for a given digest
37	Set(digest.Digest, map[string]string) error
38
39	// Update replaces the given labels for a digest,
40	// a key with an empty value removes a label.
41	Update(digest.Digest, map[string]string) (map[string]string, error)
42}
43
44// Store is digest-keyed store for content. All data written into the store is
45// stored under a verifiable digest.
46//
47// Store can generally support multi-reader, single-writer ingest of data,
48// including resumable ingest.
49type store struct {
50	root string
51	ls   LabelStore
52}
53
54// NewStore returns a local content store
55func NewStore(root string) (content.Store, error) {
56	return NewLabeledStore(root, nil)
57}
58
59// NewLabeledStore returns a new content store using the provided label store
60//
61// Note: content stores which are used underneath a metadata store may not
62// require labels and should use `NewStore`. `NewLabeledStore` is primarily
63// useful for tests or standalone implementations.
64func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
65	if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
66		return nil, err
67	}
68
69	return &store{
70		root: root,
71		ls:   ls,
72	}, nil
73}
74
75func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
76	p := s.blobPath(dgst)
77	fi, err := os.Stat(p)
78	if err != nil {
79		if os.IsNotExist(err) {
80			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
81		}
82
83		return content.Info{}, err
84	}
85	var labels map[string]string
86	if s.ls != nil {
87		labels, err = s.ls.Get(dgst)
88		if err != nil {
89			return content.Info{}, err
90		}
91	}
92	return s.info(dgst, fi, labels), nil
93}
94
95func (s *store) info(dgst digest.Digest, fi os.FileInfo, labels map[string]string) content.Info {
96	return content.Info{
97		Digest:    dgst,
98		Size:      fi.Size(),
99		CreatedAt: fi.ModTime(),
100		UpdatedAt: getATime(fi),
101		Labels:    labels,
102	}
103}
104
105// ReaderAt returns an io.ReaderAt for the blob.
106func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
107	p := s.blobPath(dgst)
108	fi, err := os.Stat(p)
109	if err != nil {
110		if !os.IsNotExist(err) {
111			return nil, err
112		}
113
114		return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
115	}
116
117	fp, err := os.Open(p)
118	if err != nil {
119		if !os.IsNotExist(err) {
120			return nil, err
121		}
122
123		return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
124	}
125
126	return sizeReaderAt{size: fi.Size(), fp: fp}, nil
127}
128
129// Delete removes a blob by its digest.
130//
131// While this is safe to do concurrently, safe exist-removal logic must hold
132// some global lock on the store.
133func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {
134	if err := os.RemoveAll(s.blobPath(dgst)); err != nil {
135		if !os.IsNotExist(err) {
136			return err
137		}
138
139		return errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
140	}
141
142	return nil
143}
144
145func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
146	if s.ls == nil {
147		return content.Info{}, errors.Wrapf(errdefs.ErrFailedPrecondition, "update not supported on immutable content store")
148	}
149
150	p := s.blobPath(info.Digest)
151	fi, err := os.Stat(p)
152	if err != nil {
153		if os.IsNotExist(err) {
154			err = errors.Wrapf(errdefs.ErrNotFound, "content %v", info.Digest)
155		}
156
157		return content.Info{}, err
158	}
159
160	var (
161		all    bool
162		labels map[string]string
163	)
164	if len(fieldpaths) > 0 {
165		for _, path := range fieldpaths {
166			if strings.HasPrefix(path, "labels.") {
167				if labels == nil {
168					labels = map[string]string{}
169				}
170
171				key := strings.TrimPrefix(path, "labels.")
172				labels[key] = info.Labels[key]
173				continue
174			}
175
176			switch path {
177			case "labels":
178				all = true
179				labels = info.Labels
180			default:
181				return content.Info{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
182			}
183		}
184	} else {
185		all = true
186		labels = info.Labels
187	}
188
189	if all {
190		err = s.ls.Set(info.Digest, labels)
191	} else {
192		labels, err = s.ls.Update(info.Digest, labels)
193	}
194	if err != nil {
195		return content.Info{}, err
196	}
197
198	info = s.info(info.Digest, fi, labels)
199	info.UpdatedAt = time.Now()
200
201	if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {
202		log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)
203	}
204
205	return info, nil
206}
207
208func (s *store) Walk(ctx context.Context, fn content.WalkFunc, filters ...string) error {
209	// TODO: Support filters
210	root := filepath.Join(s.root, "blobs")
211	var alg digest.Algorithm
212	return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {
213		if err != nil {
214			return err
215		}
216		if !fi.IsDir() && !alg.Available() {
217			return nil
218		}
219
220		// TODO(stevvooe): There are few more cases with subdirs that should be
221		// handled in case the layout gets corrupted. This isn't strict enough
222		// and may spew bad data.
223
224		if path == root {
225			return nil
226		}
227		if filepath.Dir(path) == root {
228			alg = digest.Algorithm(filepath.Base(path))
229
230			if !alg.Available() {
231				alg = ""
232				return filepath.SkipDir
233			}
234
235			// descending into a hash directory
236			return nil
237		}
238
239		dgst := digest.NewDigestFromHex(alg.String(), filepath.Base(path))
240		if err := dgst.Validate(); err != nil {
241			// log error but don't report
242			log.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")
243			// if we see this, it could mean some sort of corruption of the
244			// store or extra paths not expected previously.
245		}
246
247		var labels map[string]string
248		if s.ls != nil {
249			labels, err = s.ls.Get(dgst)
250			if err != nil {
251				return err
252			}
253		}
254		return fn(s.info(dgst, fi, labels))
255	})
256}
257
258func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {
259	return s.status(s.ingestRoot(ref))
260}
261
262func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
263	fp, err := os.Open(filepath.Join(s.root, "ingest"))
264	if err != nil {
265		return nil, err
266	}
267
268	defer fp.Close()
269
270	fis, err := fp.Readdir(-1)
271	if err != nil {
272		return nil, err
273	}
274
275	filter, err := filters.ParseAll(fs...)
276	if err != nil {
277		return nil, err
278	}
279
280	var active []content.Status
281	for _, fi := range fis {
282		p := filepath.Join(s.root, "ingest", fi.Name())
283		stat, err := s.status(p)
284		if err != nil {
285			if !os.IsNotExist(err) {
286				return nil, err
287			}
288
289			// TODO(stevvooe): This is a common error if uploads are being
290			// completed while making this listing. Need to consider taking a
291			// lock on the whole store to coordinate this aspect.
292			//
293			// Another option is to cleanup downloads asynchronously and
294			// coordinate this method with the cleanup process.
295			//
296			// For now, we just skip them, as they really don't exist.
297			continue
298		}
299
300		if filter.Match(adaptStatus(stat)) {
301			active = append(active, stat)
302		}
303	}
304
305	return active, nil
306}
307
308// status works like stat above except uses the path to the ingest.
309func (s *store) status(ingestPath string) (content.Status, error) {
310	dp := filepath.Join(ingestPath, "data")
311	fi, err := os.Stat(dp)
312	if err != nil {
313		if os.IsNotExist(err) {
314			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
315		}
316		return content.Status{}, err
317	}
318
319	ref, err := readFileString(filepath.Join(ingestPath, "ref"))
320	if err != nil {
321		if os.IsNotExist(err) {
322			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
323		}
324		return content.Status{}, err
325	}
326
327	startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))
328	if err != nil {
329		return content.Status{}, errors.Wrapf(err, "could not read startedat")
330	}
331
332	updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))
333	if err != nil {
334		return content.Status{}, errors.Wrapf(err, "could not read updatedat")
335	}
336
337	// because we don't write updatedat on every write, the mod time may
338	// actually be more up to date.
339	if fi.ModTime().After(updatedAt) {
340		updatedAt = fi.ModTime()
341	}
342
343	return content.Status{
344		Ref:       ref,
345		Offset:    fi.Size(),
346		Total:     s.total(ingestPath),
347		UpdatedAt: updatedAt,
348		StartedAt: startedAt,
349	}, nil
350}
351
352func adaptStatus(status content.Status) filters.Adaptor {
353	return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
354		if len(fieldpath) == 0 {
355			return "", false
356		}
357		switch fieldpath[0] {
358		case "ref":
359			return status.Ref, true
360		}
361
362		return "", false
363	})
364}
365
366// total attempts to resolve the total expected size for the write.
367func (s *store) total(ingestPath string) int64 {
368	totalS, err := readFileString(filepath.Join(ingestPath, "total"))
369	if err != nil {
370		return 0
371	}
372
373	total, err := strconv.ParseInt(totalS, 10, 64)
374	if err != nil {
375		// represents a corrupted file, should probably remove.
376		return 0
377	}
378
379	return total
380}
381
382// Writer begins or resumes the active writer identified by ref. If the writer
383// is already in use, an error is returned. Only one writer may be in use per
384// ref at a time.
385//
386// The argument `ref` is used to uniquely identify a long-lived writer transaction.
387func (s *store) Writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
388	var lockErr error
389	for count := uint64(0); count < 10; count++ {
390		time.Sleep(time.Millisecond * time.Duration(rand.Intn(1<<count)))
391		if err := tryLock(ref); err != nil {
392			if !errdefs.IsUnavailable(err) {
393				return nil, err
394			}
395
396			lockErr = err
397		} else {
398			lockErr = nil
399			break
400		}
401	}
402
403	if lockErr != nil {
404		return nil, lockErr
405	}
406
407	w, err := s.writer(ctx, ref, total, expected)
408	if err != nil {
409		unlock(ref)
410		return nil, err
411	}
412
413	return w, nil // lock is now held by w.
414}
415
416// writer provides the main implementation of the Writer method. The caller
417// must hold the lock correctly and release on error if there is a problem.
418func (s *store) writer(ctx context.Context, ref string, total int64, expected digest.Digest) (content.Writer, error) {
419	// TODO(stevvooe): Need to actually store expected here. We have
420	// code in the service that shouldn't be dealing with this.
421	if expected != "" {
422		p := s.blobPath(expected)
423		if _, err := os.Stat(p); err == nil {
424			return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
425		}
426	}
427
428	path, refp, data := s.ingestPaths(ref)
429
430	var (
431		digester  = digest.Canonical.Digester()
432		offset    int64
433		startedAt time.Time
434		updatedAt time.Time
435	)
436
437	// ensure that the ingest path has been created.
438	if err := os.Mkdir(path, 0755); err != nil {
439		if !os.IsExist(err) {
440			return nil, err
441		}
442
443		status, err := s.status(path)
444		if err != nil {
445			return nil, errors.Wrap(err, "failed reading status of resume write")
446		}
447
448		if ref != status.Ref {
449			// NOTE(stevvooe): This is fairly catastrophic. Either we have some
450			// layout corruption or a hash collision for the ref key.
451			return nil, errors.Wrapf(err, "ref key does not match: %v != %v", ref, status.Ref)
452		}
453
454		if total > 0 && status.Total > 0 && total != status.Total {
455			return nil, errors.Errorf("provided total differs from status: %v != %v", total, status.Total)
456		}
457
458		// TODO(stevvooe): slow slow slow!!, send to goroutine or use resumable hashes
459		fp, err := os.Open(data)
460		if err != nil {
461			return nil, err
462		}
463		defer fp.Close()
464
465		p := bufPool.Get().(*[]byte)
466		defer bufPool.Put(p)
467
468		offset, err = io.CopyBuffer(digester.Hash(), fp, *p)
469		if err != nil {
470			return nil, err
471		}
472
473		updatedAt = status.UpdatedAt
474		startedAt = status.StartedAt
475		total = status.Total
476	} else {
477		startedAt = time.Now()
478		updatedAt = startedAt
479
480		// the ingest is new, we need to setup the target location.
481		// write the ref to a file for later use
482		if err := ioutil.WriteFile(refp, []byte(ref), 0666); err != nil {
483			return nil, err
484		}
485
486		if writeTimestampFile(filepath.Join(path, "startedat"), startedAt); err != nil {
487			return nil, err
488		}
489
490		if writeTimestampFile(filepath.Join(path, "updatedat"), startedAt); err != nil {
491			return nil, err
492		}
493
494		if total > 0 {
495			if err := ioutil.WriteFile(filepath.Join(path, "total"), []byte(fmt.Sprint(total)), 0666); err != nil {
496				return nil, err
497			}
498		}
499	}
500
501	fp, err := os.OpenFile(data, os.O_WRONLY|os.O_CREATE, 0666)
502	if err != nil {
503		return nil, errors.Wrap(err, "failed to open data file")
504	}
505
506	if _, err := fp.Seek(offset, io.SeekStart); err != nil {
507		return nil, errors.Wrap(err, "could not seek to current write offset")
508	}
509
510	return &writer{
511		s:         s,
512		fp:        fp,
513		ref:       ref,
514		path:      path,
515		offset:    offset,
516		total:     total,
517		digester:  digester,
518		startedAt: startedAt,
519		updatedAt: updatedAt,
520	}, nil
521}
522
523// Abort an active transaction keyed by ref. If the ingest is active, it will
524// be cancelled. Any resources associated with the ingest will be cleaned.
525func (s *store) Abort(ctx context.Context, ref string) error {
526	root := s.ingestRoot(ref)
527	if err := os.RemoveAll(root); err != nil {
528		if os.IsNotExist(err) {
529			return errors.Wrapf(errdefs.ErrNotFound, "ingest ref %q", ref)
530		}
531
532		return err
533	}
534
535	return nil
536}
537
538func (s *store) blobPath(dgst digest.Digest) string {
539	return filepath.Join(s.root, "blobs", dgst.Algorithm().String(), dgst.Hex())
540}
541
542func (s *store) ingestRoot(ref string) string {
543	dgst := digest.FromString(ref)
544	return filepath.Join(s.root, "ingest", dgst.Hex())
545}
546
547// ingestPaths are returned. The paths are the following:
548//
549// - root: entire ingest directory
550// - ref: name of the starting ref, must be unique
551// - data: file where data is written
552//
553func (s *store) ingestPaths(ref string) (string, string, string) {
554	var (
555		fp = s.ingestRoot(ref)
556		rp = filepath.Join(fp, "ref")
557		dp = filepath.Join(fp, "data")
558	)
559
560	return fp, rp, dp
561}
562
563func readFileString(path string) (string, error) {
564	p, err := ioutil.ReadFile(path)
565	return string(p), err
566}
567
568// readFileTimestamp reads a file with just a timestamp present.
569func readFileTimestamp(p string) (time.Time, error) {
570	b, err := ioutil.ReadFile(p)
571	if err != nil {
572		if os.IsNotExist(err) {
573			err = errors.Wrap(errdefs.ErrNotFound, err.Error())
574		}
575		return time.Time{}, err
576	}
577
578	var t time.Time
579	if err := t.UnmarshalText(b); err != nil {
580		return time.Time{}, errors.Wrapf(err, "could not parse timestamp file %v", p)
581	}
582
583	return t, nil
584}
585
586func writeTimestampFile(p string, t time.Time) error {
587	b, err := t.MarshalText()
588	if err != nil {
589		return err
590	}
591
592	return ioutil.WriteFile(p, b, 0666)
593}
594