1package buildkit
2
3import (
4	"context"
5	"encoding/json"
6	"io"
7	"strings"
8	"sync"
9	"time"
10
11	"github.com/containerd/containerd/content"
12	"github.com/docker/docker/api/types"
13	"github.com/docker/docker/api/types/backend"
14	"github.com/docker/docker/builder"
15	"github.com/docker/docker/daemon/images"
16	"github.com/docker/docker/pkg/jsonmessage"
17	controlapi "github.com/moby/buildkit/api/services/control"
18	"github.com/moby/buildkit/control"
19	"github.com/moby/buildkit/identity"
20	"github.com/moby/buildkit/session"
21	"github.com/moby/buildkit/util/tracing"
22	"github.com/pkg/errors"
23	"golang.org/x/sync/errgroup"
24	grpcmetadata "google.golang.org/grpc/metadata"
25)
26
27// Opt is option struct required for creating the builder
28type Opt struct {
29	SessionManager *session.Manager
30	Root           string
31	Dist           images.DistributionServices
32}
33
34// Builder can build using BuildKit backend
35type Builder struct {
36	controller     *control.Controller
37	reqBodyHandler *reqBodyHandler
38
39	mu   sync.Mutex
40	jobs map[string]*buildJob
41}
42
43// New creates a new builder
44func New(opt Opt) (*Builder, error) {
45	reqHandler := newReqBodyHandler(tracing.DefaultTransport)
46
47	c, err := newController(reqHandler, opt)
48	if err != nil {
49		return nil, err
50	}
51	b := &Builder{
52		controller:     c,
53		reqBodyHandler: reqHandler,
54		jobs:           map[string]*buildJob{},
55	}
56	return b, nil
57}
58
59// Cancel cancels a build using ID
60func (b *Builder) Cancel(ctx context.Context, id string) error {
61	b.mu.Lock()
62	if j, ok := b.jobs[id]; ok && j.cancel != nil {
63		j.cancel()
64	}
65	b.mu.Unlock()
66	return nil
67}
68
69// DiskUsage returns a report about space used by build cache
70func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) {
71	duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{})
72	if err != nil {
73		return nil, err
74	}
75
76	var items []*types.BuildCache
77	for _, r := range duResp.Record {
78		items = append(items, &types.BuildCache{
79			ID:      r.ID,
80			Mutable: r.Mutable,
81			InUse:   r.InUse,
82			Size:    r.Size_,
83
84			CreatedAt:   r.CreatedAt,
85			LastUsedAt:  r.LastUsedAt,
86			UsageCount:  int(r.UsageCount),
87			Parent:      r.Parent,
88			Description: r.Description,
89		})
90	}
91	return items, nil
92}
93
94// Prune clears all reclaimable build cache
95func (b *Builder) Prune(ctx context.Context) (int64, error) {
96	ch := make(chan *controlapi.UsageRecord)
97
98	eg, ctx := errgroup.WithContext(ctx)
99
100	eg.Go(func() error {
101		defer close(ch)
102		return b.controller.Prune(&controlapi.PruneRequest{}, &pruneProxy{
103			streamProxy: streamProxy{ctx: ctx},
104			ch:          ch,
105		})
106	})
107
108	var size int64
109	eg.Go(func() error {
110		for r := range ch {
111			size += r.Size_
112		}
113		return nil
114	})
115
116	if err := eg.Wait(); err != nil {
117		return 0, err
118	}
119
120	return size, nil
121}
122
123// Build executes a build request
124func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) {
125	var rc = opt.Source
126
127	if buildID := opt.Options.BuildID; buildID != "" {
128		b.mu.Lock()
129
130		upload := false
131		if strings.HasPrefix(buildID, "upload-request:") {
132			upload = true
133			buildID = strings.TrimPrefix(buildID, "upload-request:")
134		}
135
136		if _, ok := b.jobs[buildID]; !ok {
137			b.jobs[buildID] = newBuildJob()
138		}
139		j := b.jobs[buildID]
140		var cancel func()
141		ctx, cancel = context.WithCancel(ctx)
142		j.cancel = cancel
143		b.mu.Unlock()
144
145		if upload {
146			ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
147			defer cancel()
148			err := j.SetUpload(ctx2, rc)
149			return nil, err
150		}
151
152		if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" {
153			ctx2, cancel := context.WithTimeout(ctx, 5*time.Second)
154			defer cancel()
155			var err error
156			rc, err = j.WaitUpload(ctx2)
157			if err != nil {
158				return nil, err
159			}
160			opt.Options.RemoteContext = ""
161		}
162
163		defer func() {
164			delete(b.jobs, buildID)
165		}()
166	}
167
168	var out builder.Result
169
170	id := identity.NewID()
171
172	frontendAttrs := map[string]string{}
173
174	if opt.Options.Target != "" {
175		frontendAttrs["target"] = opt.Options.Target
176	}
177
178	if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." {
179		frontendAttrs["filename"] = opt.Options.Dockerfile
180	}
181
182	if opt.Options.RemoteContext != "" {
183		if opt.Options.RemoteContext != "client-session" {
184			frontendAttrs["context"] = opt.Options.RemoteContext
185		}
186	} else {
187		url, cancel := b.reqBodyHandler.newRequest(rc)
188		defer cancel()
189		frontendAttrs["context"] = url
190	}
191
192	cacheFrom := append([]string{}, opt.Options.CacheFrom...)
193
194	frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",")
195
196	for k, v := range opt.Options.BuildArgs {
197		if v == nil {
198			continue
199		}
200		frontendAttrs["build-arg:"+k] = *v
201	}
202
203	for k, v := range opt.Options.Labels {
204		frontendAttrs["label:"+k] = v
205	}
206
207	if opt.Options.NoCache {
208		frontendAttrs["no-cache"] = ""
209	}
210
211	exporterAttrs := map[string]string{}
212
213	if len(opt.Options.Tags) > 0 {
214		exporterAttrs["name"] = strings.Join(opt.Options.Tags, ",")
215	}
216
217	req := &controlapi.SolveRequest{
218		Ref:           id,
219		Exporter:      "moby",
220		ExporterAttrs: exporterAttrs,
221		Frontend:      "dockerfile.v0",
222		FrontendAttrs: frontendAttrs,
223		Session:       opt.Options.SessionID,
224	}
225
226	eg, ctx := errgroup.WithContext(ctx)
227
228	eg.Go(func() error {
229		resp, err := b.controller.Solve(ctx, req)
230		if err != nil {
231			return err
232		}
233		id, ok := resp.ExporterResponse["containerimage.digest"]
234		if !ok {
235			return errors.Errorf("missing image id")
236		}
237		out.ImageID = id
238		return nil
239	})
240
241	ch := make(chan *controlapi.StatusResponse)
242
243	eg.Go(func() error {
244		defer close(ch)
245		return b.controller.Status(&controlapi.StatusRequest{
246			Ref: id,
247		}, &statusProxy{streamProxy: streamProxy{ctx: ctx}, ch: ch})
248	})
249
250	eg.Go(func() error {
251		for sr := range ch {
252			dt, err := sr.Marshal()
253			if err != nil {
254				return err
255			}
256
257			auxJSONBytes, err := json.Marshal(dt)
258			if err != nil {
259				return err
260			}
261			auxJSON := new(json.RawMessage)
262			*auxJSON = auxJSONBytes
263			msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "moby.buildkit.trace", Aux: auxJSON})
264			if err != nil {
265				return err
266			}
267			msgJSON = append(msgJSON, []byte("\r\n")...)
268			n, err := opt.ProgressWriter.Output.Write(msgJSON)
269			if err != nil {
270				return err
271			}
272			if n != len(msgJSON) {
273				return io.ErrShortWrite
274			}
275		}
276		return nil
277	})
278
279	if err := eg.Wait(); err != nil {
280		return nil, err
281	}
282
283	return &out, nil
284}
285
286type streamProxy struct {
287	ctx context.Context
288}
289
290func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error {
291	return nil
292}
293
294func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error {
295	return nil
296}
297
298func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) {
299}
300
301func (sp *streamProxy) Context() context.Context {
302	return sp.ctx
303}
304func (sp *streamProxy) RecvMsg(m interface{}) error {
305	return io.EOF
306}
307
308type statusProxy struct {
309	streamProxy
310	ch chan *controlapi.StatusResponse
311}
312
313func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error {
314	return sp.SendMsg(resp)
315}
316func (sp *statusProxy) SendMsg(m interface{}) error {
317	if sr, ok := m.(*controlapi.StatusResponse); ok {
318		sp.ch <- sr
319	}
320	return nil
321}
322
323type pruneProxy struct {
324	streamProxy
325	ch chan *controlapi.UsageRecord
326}
327
328func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error {
329	return sp.SendMsg(resp)
330}
331func (sp *pruneProxy) SendMsg(m interface{}) error {
332	if sr, ok := m.(*controlapi.UsageRecord); ok {
333		sp.ch <- sr
334	}
335	return nil
336}
337
338type contentStoreNoLabels struct {
339	content.Store
340}
341
342func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
343	return content.Info{}, nil
344}
345
346type wrapRC struct {
347	io.ReadCloser
348	once   sync.Once
349	err    error
350	waitCh chan struct{}
351}
352
353func (w *wrapRC) Read(b []byte) (int, error) {
354	n, err := w.ReadCloser.Read(b)
355	if err != nil {
356		e := err
357		if e == io.EOF {
358			e = nil
359		}
360		w.close(e)
361	}
362	return n, err
363}
364
365func (w *wrapRC) Close() error {
366	err := w.ReadCloser.Close()
367	w.close(err)
368	return err
369}
370
371func (w *wrapRC) close(err error) {
372	w.once.Do(func() {
373		w.err = err
374		close(w.waitCh)
375	})
376}
377
378func (w *wrapRC) wait() error {
379	<-w.waitCh
380	return w.err
381}
382
383type buildJob struct {
384	cancel func()
385	waitCh chan func(io.ReadCloser) error
386}
387
388func newBuildJob() *buildJob {
389	return &buildJob{waitCh: make(chan func(io.ReadCloser) error)}
390}
391
392func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) {
393	done := make(chan struct{})
394
395	var upload io.ReadCloser
396	fn := func(rc io.ReadCloser) error {
397		w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})}
398		upload = w
399		close(done)
400		return w.wait()
401	}
402
403	select {
404	case <-ctx.Done():
405		return nil, ctx.Err()
406	case j.waitCh <- fn:
407		<-done
408		return upload, nil
409	}
410}
411
412func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error {
413	select {
414	case <-ctx.Done():
415		return ctx.Err()
416	case fn := <-j.waitCh:
417		return fn(rc)
418	}
419}
420