1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package content 18 19import ( 20 "context" 21 "io" 22 "io/ioutil" 23 "math/rand" 24 "sync" 25 "time" 26 27 "github.com/containerd/containerd/errdefs" 28 "github.com/opencontainers/go-digest" 29 ocispec "github.com/opencontainers/image-spec/specs-go/v1" 30 "github.com/pkg/errors" 31) 32 33var bufPool = sync.Pool{ 34 New: func() interface{} { 35 buffer := make([]byte, 1<<20) 36 return &buffer 37 }, 38} 39 40// NewReader returns a io.Reader from a ReaderAt 41func NewReader(ra ReaderAt) io.Reader { 42 rd := io.NewSectionReader(ra, 0, ra.Size()) 43 return rd 44} 45 46// ReadBlob retrieves the entire contents of the blob from the provider. 47// 48// Avoid using this for large blobs, such as layers. 49func ReadBlob(ctx context.Context, provider Provider, desc ocispec.Descriptor) ([]byte, error) { 50 ra, err := provider.ReaderAt(ctx, desc) 51 if err != nil { 52 return nil, err 53 } 54 defer ra.Close() 55 56 p := make([]byte, ra.Size()) 57 58 _, err = ra.ReadAt(p, 0) 59 return p, err 60} 61 62// WriteBlob writes data with the expected digest into the content store. If 63// expected already exists, the method returns immediately and the reader will 64// not be consumed. 65// 66// This is useful when the digest and size are known beforehand. 67// 68// Copy is buffered, so no need to wrap reader in buffered io. 69func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc ocispec.Descriptor, opts ...Opt) error { 70 cw, err := OpenWriter(ctx, cs, WithRef(ref), WithDescriptor(desc)) 71 if err != nil { 72 if !errdefs.IsAlreadyExists(err) { 73 return errors.Wrap(err, "failed to open writer") 74 } 75 76 return nil // all ready present 77 } 78 defer cw.Close() 79 80 return Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) 81} 82 83// OpenWriter opens a new writer for the given reference, retrying if the writer 84// is locked until the reference is available or returns an error. 85func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, error) { 86 var ( 87 cw Writer 88 err error 89 retry = 16 90 ) 91 for { 92 cw, err = cs.Writer(ctx, opts...) 93 if err != nil { 94 if !errdefs.IsUnavailable(err) { 95 return nil, err 96 } 97 98 // TODO: Check status to determine if the writer is active, 99 // continue waiting while active, otherwise return lock 100 // error or abort. Requires asserting for an ingest manager 101 102 select { 103 case <-time.After(time.Millisecond * time.Duration(rand.Intn(retry))): 104 if retry < 2048 { 105 retry = retry << 1 106 } 107 continue 108 case <-ctx.Done(): 109 // Propagate lock error 110 return nil, err 111 } 112 113 } 114 break 115 } 116 117 return cw, err 118} 119 120// Copy copies data with the expected digest from the reader into the 121// provided content store writer. This copy commits the writer. 122// 123// This is useful when the digest and size are known beforehand. When 124// the size or digest is unknown, these values may be empty. 125// 126// Copy is buffered, so no need to wrap reader in buffered io. 127func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { 128 ws, err := cw.Status() 129 if err != nil { 130 return errors.Wrap(err, "failed to get status") 131 } 132 133 if ws.Offset > 0 { 134 r, err = seekReader(r, ws.Offset, size) 135 if err != nil { 136 return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) 137 } 138 } 139 140 if _, err := copyWithBuffer(cw, r); err != nil { 141 return errors.Wrap(err, "failed to copy") 142 } 143 144 if err := cw.Commit(ctx, size, expected, opts...); err != nil { 145 if !errdefs.IsAlreadyExists(err) { 146 return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) 147 } 148 } 149 150 return nil 151} 152 153// CopyReaderAt copies to a writer from a given reader at for the given 154// number of bytes. This copy does not commit the writer. 155func CopyReaderAt(cw Writer, ra ReaderAt, n int64) error { 156 ws, err := cw.Status() 157 if err != nil { 158 return err 159 } 160 161 _, err = copyWithBuffer(cw, io.NewSectionReader(ra, ws.Offset, n)) 162 return err 163} 164 165// seekReader attempts to seek the reader to the given offset, either by 166// resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding 167// up to the given offset. 168func seekReader(r io.Reader, offset, size int64) (io.Reader, error) { 169 // attempt to resolve r as a seeker and setup the offset. 170 seeker, ok := r.(io.Seeker) 171 if ok { 172 nn, err := seeker.Seek(offset, io.SeekStart) 173 if nn != offset { 174 return nil, errors.Wrapf(err, "failed to seek to offset %v", offset) 175 } 176 177 if err != nil { 178 return nil, err 179 } 180 181 return r, nil 182 } 183 184 // ok, let's try io.ReaderAt! 185 readerAt, ok := r.(io.ReaderAt) 186 if ok && size > offset { 187 sr := io.NewSectionReader(readerAt, offset, size) 188 return sr, nil 189 } 190 191 // well then, let's just discard up to the offset 192 n, err := copyWithBuffer(ioutil.Discard, io.LimitReader(r, offset)) 193 if err != nil { 194 return nil, errors.Wrap(err, "failed to discard to offset") 195 } 196 if n != offset { 197 return nil, errors.Errorf("unable to discard to offset") 198 } 199 200 return r, nil 201} 202 203func copyWithBuffer(dst io.Writer, src io.Reader) (written int64, err error) { 204 buf := bufPool.Get().(*[]byte) 205 written, err = io.CopyBuffer(dst, src, *buf) 206 bufPool.Put(buf) 207 return 208} 209