1/* 2 Copyright The containerd Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15*/ 16 17package proxy 18 19import ( 20 "context" 21 "io" 22 23 contentapi "github.com/containerd/containerd/api/services/content/v1" 24 "github.com/containerd/containerd/content" 25 "github.com/containerd/containerd/errdefs" 26 digest "github.com/opencontainers/go-digest" 27 "github.com/pkg/errors" 28) 29 30type remoteWriter struct { 31 ref string 32 client contentapi.Content_WriteClient 33 offset int64 34 digest digest.Digest 35} 36 37// send performs a synchronous req-resp cycle on the client. 38func (rw *remoteWriter) send(req *contentapi.WriteContentRequest) (*contentapi.WriteContentResponse, error) { 39 if err := rw.client.Send(req); err != nil { 40 return nil, err 41 } 42 43 resp, err := rw.client.Recv() 44 45 if err == nil { 46 // try to keep these in sync 47 if resp.Digest != "" { 48 rw.digest = resp.Digest 49 } 50 } 51 52 return resp, err 53} 54 55func (rw *remoteWriter) Status() (content.Status, error) { 56 resp, err := rw.send(&contentapi.WriteContentRequest{ 57 Action: contentapi.WriteActionStat, 58 }) 59 if err != nil { 60 return content.Status{}, errors.Wrap(err, "error getting writer status") 61 } 62 63 return content.Status{ 64 Ref: rw.ref, 65 Offset: resp.Offset, 66 Total: resp.Total, 67 StartedAt: resp.StartedAt, 68 UpdatedAt: resp.UpdatedAt, 69 }, nil 70} 71 72func (rw *remoteWriter) Digest() digest.Digest { 73 return rw.digest 74} 75 76func (rw *remoteWriter) Write(p []byte) (n int, err error) { 77 offset := rw.offset 78 79 resp, err := rw.send(&contentapi.WriteContentRequest{ 80 Action: contentapi.WriteActionWrite, 81 Offset: offset, 82 Data: p, 83 }) 84 if err != nil { 85 return 0, err 86 } 87 88 n = int(resp.Offset - offset) 89 if n < len(p) { 90 err = io.ErrShortWrite 91 } 92 93 rw.offset += int64(n) 94 if resp.Digest != "" { 95 rw.digest = resp.Digest 96 } 97 return 98} 99 100func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { 101 var base content.Info 102 for _, opt := range opts { 103 if err := opt(&base); err != nil { 104 return err 105 } 106 } 107 resp, err := rw.send(&contentapi.WriteContentRequest{ 108 Action: contentapi.WriteActionCommit, 109 Total: size, 110 Offset: rw.offset, 111 Expected: expected, 112 Labels: base.Labels, 113 }) 114 if err != nil { 115 return errdefs.FromGRPC(err) 116 } 117 118 if size != 0 && resp.Offset != size { 119 return errors.Errorf("unexpected size: %v != %v", resp.Offset, size) 120 } 121 122 if expected != "" && resp.Digest != expected { 123 return errors.Errorf("unexpected digest: %v != %v", resp.Digest, expected) 124 } 125 126 rw.digest = resp.Digest 127 rw.offset = resp.Offset 128 return nil 129} 130 131func (rw *remoteWriter) Truncate(size int64) error { 132 // This truncation won't actually be validated until a write is issued. 133 rw.offset = size 134 return nil 135} 136 137func (rw *remoteWriter) Close() error { 138 return rw.client.CloseSend() 139} 140