1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package proxy
18
19import (
20	"context"
21	"io"
22
23	contentapi "github.com/containerd/containerd/api/services/content/v1"
24	"github.com/containerd/containerd/content"
25	"github.com/containerd/containerd/errdefs"
26	digest "github.com/opencontainers/go-digest"
27	"github.com/pkg/errors"
28)
29
30type remoteWriter struct {
31	ref    string
32	client contentapi.Content_WriteClient
33	offset int64
34	digest digest.Digest
35}
36
37// send performs a synchronous req-resp cycle on the client.
38func (rw *remoteWriter) send(req *contentapi.WriteContentRequest) (*contentapi.WriteContentResponse, error) {
39	if err := rw.client.Send(req); err != nil {
40		return nil, err
41	}
42
43	resp, err := rw.client.Recv()
44
45	if err == nil {
46		// try to keep these in sync
47		if resp.Digest != "" {
48			rw.digest = resp.Digest
49		}
50	}
51
52	return resp, err
53}
54
55func (rw *remoteWriter) Status() (content.Status, error) {
56	resp, err := rw.send(&contentapi.WriteContentRequest{
57		Action: contentapi.WriteActionStat,
58	})
59	if err != nil {
60		return content.Status{}, errors.Wrap(err, "error getting writer status")
61	}
62
63	return content.Status{
64		Ref:       rw.ref,
65		Offset:    resp.Offset,
66		Total:     resp.Total,
67		StartedAt: resp.StartedAt,
68		UpdatedAt: resp.UpdatedAt,
69	}, nil
70}
71
72func (rw *remoteWriter) Digest() digest.Digest {
73	return rw.digest
74}
75
76func (rw *remoteWriter) Write(p []byte) (n int, err error) {
77	offset := rw.offset
78
79	resp, err := rw.send(&contentapi.WriteContentRequest{
80		Action: contentapi.WriteActionWrite,
81		Offset: offset,
82		Data:   p,
83	})
84	if err != nil {
85		return 0, err
86	}
87
88	n = int(resp.Offset - offset)
89	if n < len(p) {
90		err = io.ErrShortWrite
91	}
92
93	rw.offset += int64(n)
94	if resp.Digest != "" {
95		rw.digest = resp.Digest
96	}
97	return
98}
99
100func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
101	var base content.Info
102	for _, opt := range opts {
103		if err := opt(&base); err != nil {
104			return err
105		}
106	}
107	resp, err := rw.send(&contentapi.WriteContentRequest{
108		Action:   contentapi.WriteActionCommit,
109		Total:    size,
110		Offset:   rw.offset,
111		Expected: expected,
112		Labels:   base.Labels,
113	})
114	if err != nil {
115		return errdefs.FromGRPC(err)
116	}
117
118	if size != 0 && resp.Offset != size {
119		return errors.Errorf("unexpected size: %v != %v", resp.Offset, size)
120	}
121
122	if expected != "" && resp.Digest != expected {
123		return errors.Errorf("unexpected digest: %v != %v", resp.Digest, expected)
124	}
125
126	rw.digest = resp.Digest
127	rw.offset = resp.Offset
128	return nil
129}
130
131func (rw *remoteWriter) Truncate(size int64) error {
132	// This truncation won't actually be validated until a write is issued.
133	rw.offset = size
134	return nil
135}
136
137func (rw *remoteWriter) Close() error {
138	return rw.client.CloseSend()
139}
140