1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17package contentserver
18
19import (
20	"context"
21	"io"
22	"sync"
23
24	api "github.com/containerd/containerd/api/services/content/v1"
25	"github.com/containerd/containerd/content"
26	"github.com/containerd/containerd/errdefs"
27	"github.com/containerd/containerd/log"
28	ptypes "github.com/gogo/protobuf/types"
29	digest "github.com/opencontainers/go-digest"
30	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
31	"github.com/pkg/errors"
32	"github.com/sirupsen/logrus"
33	"google.golang.org/grpc"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/status"
36)
37
38type service struct {
39	store content.Store
40}
41
42var bufPool = sync.Pool{
43	New: func() interface{} {
44		buffer := make([]byte, 1<<20)
45		return &buffer
46	},
47}
48
49// New returns the content GRPC server
50func New(cs content.Store) api.ContentServer {
51	return &service{store: cs}
52}
53
54func (s *service) Register(server *grpc.Server) error {
55	api.RegisterContentServer(server, s)
56	return nil
57}
58
59func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) {
60	if err := req.Digest.Validate(); err != nil {
61		return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
62	}
63
64	bi, err := s.store.Info(ctx, req.Digest)
65	if err != nil {
66		return nil, errdefs.ToGRPC(err)
67	}
68
69	return &api.InfoResponse{
70		Info: infoToGRPC(bi),
71	}, nil
72}
73
74func (s *service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) {
75	if err := req.Info.Digest.Validate(); err != nil {
76		return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest)
77	}
78
79	info, err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...)
80	if err != nil {
81		return nil, errdefs.ToGRPC(err)
82	}
83
84	return &api.UpdateResponse{
85		Info: infoToGRPC(info),
86	}, nil
87}
88
89func (s *service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
90	var (
91		buffer    []api.Info
92		sendBlock = func(block []api.Info) error {
93			// send last block
94			return session.Send(&api.ListContentResponse{
95				Info: block,
96			})
97		}
98	)
99
100	if err := s.store.Walk(session.Context(), func(info content.Info) error {
101		buffer = append(buffer, api.Info{
102			Digest:    info.Digest,
103			Size_:     info.Size,
104			CreatedAt: info.CreatedAt,
105			Labels:    info.Labels,
106		})
107
108		if len(buffer) >= 100 {
109			if err := sendBlock(buffer); err != nil {
110				return err
111			}
112
113			buffer = buffer[:0]
114		}
115
116		return nil
117	}, req.Filters...); err != nil {
118		return err
119	}
120
121	if len(buffer) > 0 {
122		// send last block
123		if err := sendBlock(buffer); err != nil {
124			return err
125		}
126	}
127
128	return nil
129}
130
131func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*ptypes.Empty, error) {
132	log.G(ctx).WithField("digest", req.Digest).Debugf("delete content")
133	if err := req.Digest.Validate(); err != nil {
134		return nil, status.Errorf(codes.InvalidArgument, err.Error())
135	}
136
137	if err := s.store.Delete(ctx, req.Digest); err != nil {
138		return nil, errdefs.ToGRPC(err)
139	}
140
141	return &ptypes.Empty{}, nil
142}
143
144func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error {
145	if err := req.Digest.Validate(); err != nil {
146		return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
147	}
148
149	oi, err := s.store.Info(session.Context(), req.Digest)
150	if err != nil {
151		return errdefs.ToGRPC(err)
152	}
153
154	ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest})
155	if err != nil {
156		return errdefs.ToGRPC(err)
157	}
158	defer ra.Close()
159
160	var (
161		offset = req.Offset
162		// size is read size, not the expected size of the blob (oi.Size), which the caller might not be aware of.
163		// offset+size can be larger than oi.Size.
164		size = req.Size_
165
166		// TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably
167		// little inefficient for work over a fast network. We can tune this later.
168		p = bufPool.Get().(*[]byte)
169	)
170	defer bufPool.Put(p)
171
172	if offset < 0 {
173		offset = 0
174	}
175
176	if offset > oi.Size {
177		return status.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size)
178	}
179
180	if size <= 0 || offset+size > oi.Size {
181		size = oi.Size - offset
182	}
183
184	_, err = io.CopyBuffer(
185		&readResponseWriter{session: session},
186		io.NewSectionReader(ra, offset, size), *p)
187	return errdefs.ToGRPC(err)
188}
189
190// readResponseWriter is a writer that places the output into ReadContentRequest messages.
191//
192// This allows io.CopyBuffer to do the heavy lifting of chunking the responses
193// into the buffer size.
194type readResponseWriter struct {
195	offset  int64
196	session api.Content_ReadServer
197}
198
199func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
200	if err := rw.session.Send(&api.ReadContentResponse{
201		Offset: rw.offset,
202		Data:   p,
203	}); err != nil {
204		return 0, err
205	}
206
207	rw.offset += int64(len(p))
208	return len(p), nil
209}
210
211func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
212	status, err := s.store.Status(ctx, req.Ref)
213	if err != nil {
214		return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref)
215	}
216
217	var resp api.StatusResponse
218	resp.Status = &api.Status{
219		StartedAt: status.StartedAt,
220		UpdatedAt: status.UpdatedAt,
221		Ref:       status.Ref,
222		Offset:    status.Offset,
223		Total:     status.Total,
224		Expected:  status.Expected,
225	}
226
227	return &resp, nil
228}
229
230func (s *service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) {
231	statuses, err := s.store.ListStatuses(ctx, req.Filters...)
232	if err != nil {
233		return nil, errdefs.ToGRPC(err)
234	}
235
236	var resp api.ListStatusesResponse
237	for _, status := range statuses {
238		resp.Statuses = append(resp.Statuses, api.Status{
239			StartedAt: status.StartedAt,
240			UpdatedAt: status.UpdatedAt,
241			Ref:       status.Ref,
242			Offset:    status.Offset,
243			Total:     status.Total,
244			Expected:  status.Expected,
245		})
246	}
247
248	return &resp, nil
249}
250
251func (s *service) Write(session api.Content_WriteServer) (err error) {
252	var (
253		ctx      = session.Context()
254		msg      api.WriteContentResponse
255		req      *api.WriteContentRequest
256		ref      string
257		total    int64
258		expected digest.Digest
259	)
260
261	defer func(msg *api.WriteContentResponse) {
262		// pump through the last message if no error was encountered
263		if err != nil {
264			if s, ok := status.FromError(err); ok && s.Code() != codes.AlreadyExists {
265				// TODO(stevvooe): Really need a log line here to track which
266				// errors are actually causing failure on the server side. May want
267				// to configure the service with an interceptor to make this work
268				// identically across all GRPC methods.
269				//
270				// This is pretty noisy, so we can remove it but leave it for now.
271				log.G(ctx).WithError(err).Error("(*service).Write failed")
272			}
273
274			return
275		}
276
277		err = session.Send(msg)
278	}(&msg)
279
280	// handle the very first request!
281	req, err = session.Recv()
282	if err != nil {
283		return err
284	}
285
286	ref = req.Ref
287
288	if ref == "" {
289		return status.Errorf(codes.InvalidArgument, "first message must have a reference")
290	}
291
292	fields := logrus.Fields{
293		"ref": ref,
294	}
295	total = req.Total
296	expected = req.Expected
297	if total > 0 {
298		fields["total"] = total
299	}
300
301	if expected != "" {
302		fields["expected"] = expected
303	}
304
305	ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields))
306
307	log.G(ctx).Debug("(*service).Write started")
308	// this action locks the writer for the session.
309	wr, err := s.store.Writer(ctx,
310		content.WithRef(ref),
311		content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected}))
312	if err != nil {
313		return errdefs.ToGRPC(err)
314	}
315	defer wr.Close()
316
317	for {
318		msg.Action = req.Action
319		ws, err := wr.Status()
320		if err != nil {
321			return errdefs.ToGRPC(err)
322		}
323
324		msg.Offset = ws.Offset // always set the offset.
325
326		// NOTE(stevvooe): In general, there are two cases underwhich a remote
327		// writer is used.
328		//
329		// For pull, we almost always have this before fetching large content,
330		// through descriptors. We allow predeclaration of the expected size
331		// and digest.
332		//
333		// For push, it is more complex. If we want to cut through content into
334		// storage, we may have no expectation until we are done processing the
335		// content. The case here is the following:
336		//
337		// 	1. Start writing content.
338		// 	2. Compress inline.
339		// 	3. Validate digest and size (maybe).
340		//
341		// Supporting these two paths is quite awkward but it lets both API
342		// users use the same writer style for each with a minimum of overhead.
343		if req.Expected != "" {
344			if expected != "" && expected != req.Expected {
345				log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected)
346			}
347			expected = req.Expected
348
349			if _, err := s.store.Info(session.Context(), req.Expected); err == nil {
350				if err := wr.Close(); err != nil {
351					log.G(ctx).WithError(err).Error("failed to close writer")
352				}
353				if err := s.store.Abort(session.Context(), ref); err != nil {
354					log.G(ctx).WithError(err).Error("failed to abort write")
355				}
356
357				return status.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected)
358			}
359		}
360
361		if req.Total > 0 {
362			// Update the expected total. Typically, this could be seen at
363			// negotiation time or on a commit message.
364			if total > 0 && req.Total != total {
365				log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total)
366			}
367			total = req.Total
368		}
369
370		switch req.Action {
371		case api.WriteActionStat:
372			msg.Digest = wr.Digest()
373			msg.StartedAt = ws.StartedAt
374			msg.UpdatedAt = ws.UpdatedAt
375			msg.Total = total
376		case api.WriteActionWrite, api.WriteActionCommit:
377			if req.Offset > 0 {
378				// validate the offset if provided
379				if req.Offset != ws.Offset {
380					return status.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset)
381				}
382			}
383
384			if req.Offset == 0 && ws.Offset > 0 {
385				if err := wr.Truncate(req.Offset); err != nil {
386					return errors.Wrapf(err, "truncate failed")
387				}
388				msg.Offset = req.Offset
389			}
390
391			// issue the write if we actually have data.
392			if len(req.Data) > 0 {
393				// While this looks like we could use io.WriterAt here, because we
394				// maintain the offset as append only, we just issue the write.
395				n, err := wr.Write(req.Data)
396				if err != nil {
397					return errdefs.ToGRPC(err)
398				}
399
400				if n != len(req.Data) {
401					// TODO(stevvooe): Perhaps, we can recover this by including it
402					// in the offset on the write return.
403					return status.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data))
404				}
405
406				msg.Offset += int64(n)
407			}
408
409			if req.Action == api.WriteActionCommit {
410				var opts []content.Opt
411				if req.Labels != nil {
412					opts = append(opts, content.WithLabels(req.Labels))
413				}
414				if err := wr.Commit(ctx, total, expected, opts...); err != nil {
415					return errdefs.ToGRPC(err)
416				}
417			}
418
419			msg.Digest = wr.Digest()
420		}
421
422		if err := session.Send(&msg); err != nil {
423			return err
424		}
425
426		req, err = session.Recv()
427		if err != nil {
428			if err == io.EOF {
429				return nil
430			}
431
432			return err
433		}
434	}
435}
436
437func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*ptypes.Empty, error) {
438	if err := s.store.Abort(ctx, req.Ref); err != nil {
439		return nil, errdefs.ToGRPC(err)
440	}
441
442	return &ptypes.Empty{}, nil
443}
444
445func infoToGRPC(info content.Info) api.Info {
446	return api.Info{
447		Digest:    info.Digest,
448		Size_:     info.Size,
449		CreatedAt: info.CreatedAt,
450		UpdatedAt: info.UpdatedAt,
451		Labels:    info.Labels,
452	}
453}
454
455func infoFromGRPC(info api.Info) content.Info {
456	return content.Info{
457		Digest:    info.Digest,
458		Size:      info.Size_,
459		CreatedAt: info.CreatedAt,
460		UpdatedAt: info.UpdatedAt,
461		Labels:    info.Labels,
462	}
463}
464