1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package content
18
19import (
20	"context"
21	"io"
22	"io/ioutil"
23	"math/rand"
24	"sync"
25	"time"
26
27	"github.com/containerd/containerd/errdefs"
28	"github.com/opencontainers/go-digest"
29	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
30	"github.com/pkg/errors"
31)
32
33var bufPool = sync.Pool{
34	New: func() interface{} {
35		buffer := make([]byte, 1<<20)
36		return &buffer
37	},
38}
39
40// NewReader returns a io.Reader from a ReaderAt
41func NewReader(ra ReaderAt) io.Reader {
42	rd := io.NewSectionReader(ra, 0, ra.Size())
43	return rd
44}
45
46// ReadBlob retrieves the entire contents of the blob from the provider.
47//
48// Avoid using this for large blobs, such as layers.
49func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) {
50	ra, err := provider.ReaderAt(ctx, desc)
51	if err != nil {
52		return nil, err
53	}
54	defer ra.Close()
55
56	p := make([]byte, ra.Size())
57
58	_, err = ra.ReadAt(p, 0)
59	return p, err
60}
61
62// WriteBlob writes data with the expected digest into the content store. If
63// expected already exists, the method returns immediately and the reader will
64// not be consumed.
65//
66// This is useful when the digest and size are known beforehand.
67//
68// Copy is buffered, so no need to wrap reader in buffered io.
69func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error {
70	cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc))
71	if err != nil {
72		if !errdefs.IsAlreadyExists(err) {
73			return errors.Wrap(err, "failed to open writer")
74		}
75
76		return nil // all ready present
77	}
78	defer cw.Close()
79
80	return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
81}
82
83// OpenWriter opens a new writer for the given reference, retrying if the writer
84// is locked until the reference is available or returns an error.
85func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) {
86	var (
87		cw    Writer
88		err   error
89		retry = 16
90	)
91	for {
92		cw, err = cs.Writer(ctx, opts...)
93		if err != nil {
94			if !errdefs.IsUnavailable(err) {
95				return nil, err
96			}
97
98			// TODO: Check status to determine if the writer is active,
99			// continue waiting while active, otherwise return lock
100			// error or abort. Requires asserting for an ingest manager
101
102			select {
103			case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))):
104				if retry < 2048 {
105					retry = retry << 1
106				}
107				continue
108			case <-ctx.Done():
109				// Propagate lock error
110				return nil, err
111			}
112
113		}
114		break
115	}
116
117	return cw, err
118}
119
120// Copy copies data with the expected digest from the reader into the
121// provided content store writer. This copy commits the writer.
122//
123// This is useful when the digest and size are known beforehand. When
124// the size or digest is unknown, these values may be empty.
125//
126// Copy is buffered, so no need to wrap reader in buffered io.
127func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
128	ws, err := cw.Status()
129	if err != nil {
130		return errors.Wrap(err, "failed to get status")
131	}
132
133	if ws.Offset > 0 {
134		r, err = seekReader(r, ws.Offset, size)
135		if err != nil {
136			return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
137		}
138	}
139
140	if _, err := copyWithBuffer(cw, r); err != nil {
141		return errors.Wrap(err, "failed to copy")
142	}
143
144	if err := cw.Commit(ctx, size, expected, opts...); err != nil {
145		if !errdefs.IsAlreadyExists(err) {
146			return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
147		}
148	}
149
150	return nil
151}
152
153// CopyReaderAt copies to a writer from a given reader at for the given
154// number of bytes. This copy does not commit the writer.
155func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error {
156	ws, err := cw.Status()
157	if err != nil {
158		return err
159	}
160
161	_, err = copyWithBuffer(cw, io.NewSectionReader(ra, ws.Offset, n))
162	return err
163}
164
165// seekReader attempts to seek the reader to the given offset, either by
166// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
167// up to the given offset.
168func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
169	// attempt to resolve r as a seeker and setup the offset.
170	seeker, ok := r.(io.Seeker)
171	if ok {
172		nn, err := seeker.Seek(offset, io.SeekStart)
173		if nn != offset {
174			return nil, errors.Wrapf(err, "failed to seek to offset %v", offset)
175		}
176
177		if err != nil {
178			return nil, err
179		}
180
181		return r, nil
182	}
183
184	// ok, let's try io.ReaderAt!
185	readerAt, ok := r.(io.ReaderAt)
186	if ok && size > offset {
187		sr := io.NewSectionReader(readerAt, offset, size)
188		return sr, nil
189	}
190
191	// well then, let's just discard up to the offset
192	n, err := copyWithBuffer(ioutil.Discard, io.LimitReader(r, offset))
193	if err != nil {
194		return nil, errors.Wrap(err, "failed to discard to offset")
195	}
196	if n != offset {
197		return nil, errors.Errorf("unable to discard to offset")
198	}
199
200	return r, nil
201}
202
203func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) {
204	buf := bufPool.Get().(*[]byte)
205	written, err = io.CopyBuffer(dst, src, *buf)
206	bufPool.Put(buf)
207	return
208}
209